Skip to content
This repository was archived by the owner on Mar 18, 2026. It is now read-only.

Commit ba58431

Browse files
Merge pull request #443 from aibtcdev/duplicates
help ensure duplication doesnt happen
2 parents 8e3862a + 7ba8416 commit ba58431

2 files changed

Lines changed: 106 additions & 6 deletions

File tree

app/backend/supabase.py

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -665,12 +665,66 @@ def attempt_upload(attempt: int) -> Optional[str]:
665665
def create_queue_message(
666666
self, new_queue_message: "QueueMessageCreate"
667667
) -> "QueueMessage":
668-
payload = new_queue_message.model_dump(exclude_unset=True, mode="json")
669-
response = self.client.table("queue").insert(payload).execute()
670-
data = response.data or []
671-
if not data:
672-
raise ValueError("No data returned from Supabase insert for queue message.")
673-
return QueueMessage(**data[0])
668+
"""Create a new queue message with deduplication logic to prevent 5x message multiplication."""
669+
670+
# Check for existing unprocessed messages with same content to prevent duplicates
671+
if new_queue_message.dao_id and new_queue_message.message:
672+
try:
673+
# Use Supabase query to find existing unprocessed messages
674+
query = (
675+
self.client.table("queue")
676+
.select("*")
677+
.eq("type", new_queue_message.type)
678+
.eq("dao_id", str(new_queue_message.dao_id))
679+
.eq("is_processed", False)
680+
)
681+
682+
# Add wallet_id filter if present
683+
if new_queue_message.wallet_id:
684+
query = query.eq("wallet_id", str(new_queue_message.wallet_id))
685+
686+
response = query.execute()
687+
existing_data = response.data or []
688+
689+
# Check for duplicate content in existing messages
690+
new_message_str = (
691+
str(new_queue_message.message) if new_queue_message.message else ""
692+
)
693+
for existing_row in existing_data:
694+
existing_message_str = str(existing_row.get("message", ""))
695+
if existing_message_str == new_message_str:
696+
existing_message = QueueMessage(**existing_row)
697+
logger.debug(
698+
f"Duplicate queue message detected for DAO {new_queue_message.dao_id}, "
699+
f"type {new_queue_message.type}, returning existing message {existing_message.id}"
700+
)
701+
return existing_message
702+
703+
except Exception as e:
704+
# If deduplication check fails, log warning but continue with creation
705+
logger.warning(
706+
f"Deduplication check failed: {str(e)}, proceeding with message creation"
707+
)
708+
709+
# No duplicate found or deduplication skipped, create new message using Supabase
710+
try:
711+
payload = new_queue_message.model_dump(exclude_unset=True, mode="json")
712+
response = self.client.table("queue").insert(payload).execute()
713+
data = response.data or []
714+
if not data:
715+
raise ValueError(
716+
"No data returned from Supabase insert for queue message."
717+
)
718+
719+
created_message = QueueMessage(**data[0])
720+
logger.debug(
721+
f"Created new queue message {created_message.id} for DAO {new_queue_message.dao_id}, type {new_queue_message.type}"
722+
)
723+
return created_message
724+
725+
except Exception as e:
726+
logger.error(f"Failed to create queue message in Supabase: {str(e)}")
727+
raise
674728

675729
def get_queue_message(self, queue_message_id: UUID) -> Optional["QueueMessage"]:
676730
response = (
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
-- Add deduplication constraint to queue table to prevent 5x message multiplication
2+
-- This migration adds a unique constraint to prevent duplicate unprocessed messages
3+
4+
-- Add a computed column for message content hash to enable efficient deduplication
5+
ALTER TABLE public.queue
6+
ADD COLUMN IF NOT EXISTS message_hash TEXT
7+
GENERATED ALWAYS AS (md5(message::text)) STORED;
8+
9+
-- Create index for the message hash for performance
10+
CREATE INDEX IF NOT EXISTS idx_queue_message_hash ON public.queue(message_hash);
11+
12+
-- Create unique constraint to prevent duplicate unprocessed messages
13+
-- This constraint ensures that for any combination of (type, dao_id, wallet_id, message_hash),
14+
-- there can only be one unprocessed message at a time
15+
CREATE UNIQUE INDEX IF NOT EXISTS idx_queue_unique_unprocessed_message
16+
ON public.queue (type, dao_id, wallet_id, message_hash)
17+
WHERE is_processed = false;
18+
19+
-- Add partial index for efficient queries on unprocessed messages
20+
CREATE INDEX IF NOT EXISTS idx_queue_unprocessed_by_type_dao
21+
ON public.queue (type, dao_id, created_at)
22+
WHERE is_processed = false;
23+
24+
-- Add comments for documentation
25+
COMMENT ON COLUMN public.queue.message_hash IS 'MD5 hash of the message content for deduplication';
26+
COMMENT ON INDEX public.idx_queue_unique_unprocessed_message IS 'Prevents duplicate unprocessed messages with same type, DAO, wallet, and content';
27+
28+
-- Create function to handle duplicate message insertions gracefully
29+
CREATE OR REPLACE FUNCTION public.handle_duplicate_queue_message()
30+
RETURNS TRIGGER AS $$
31+
BEGIN
32+
-- If a duplicate is detected (this would be caught by the unique constraint),
33+
-- we could handle it here, but the constraint itself will prevent the insert
34+
RETURN NEW;
35+
END;
36+
$$ LANGUAGE plpgsql;
37+
38+
-- Optional: Add logging for duplicate attempts (uncomment if you want to track these)
39+
-- CREATE OR REPLACE FUNCTION public.log_duplicate_queue_attempt()
40+
-- RETURNS TRIGGER AS $$
41+
-- BEGIN
42+
-- INSERT INTO public.queue_duplicate_log (attempted_type, attempted_dao_id, attempted_message_hash, attempted_at)
43+
-- VALUES (NEW.type, NEW.dao_id, NEW.message_hash, NOW());
44+
-- RETURN NULL; -- Don't actually insert the duplicate
45+
-- END;
46+
-- $$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)