Skip to content
This repository was archived by the owner on Mar 18, 2026. It is now read-only.

Commit 182b774

Browse files
Merge branch 'main' into check-net
2 parents 0fdce91 + f884b82 commit 182b774

4 files changed

Lines changed: 68 additions & 20 deletions

File tree

app/services/infrastructure/job_management/executor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,9 @@ async def _worker(self, worker_name: str) -> None:
253253
# Get next job from priority queue
254254
execution = await self.priority_queue.get_next_job()
255255
if not execution:
256-
await asyncio.sleep(0.1) # Brief pause if no jobs
256+
await asyncio.sleep(
257+
0.5
258+
) # Increased pause if no jobs to reduce CPU usage
257259
continue
258260

259261
# Check if we can acquire a slot for this job type

app/services/infrastructure/job_management/job_manager.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dataclasses import dataclass
55
from datetime import datetime
66
from typing import Any, Dict, List, Optional
7+
from app.backend.factory import backend
78

89
from apscheduler.schedulers.asyncio import AsyncIOScheduler
910

@@ -131,12 +132,36 @@ async def _execute_job_via_executor(self, job_type: str) -> None:
131132
# Convert job_type string to JobType enum
132133
job_type_enum = JobType.get_or_create(job_type)
133134

135+
logger.debug(f"🔍 Checking if {job_type} has work to do...")
136+
134137
# Get job metadata to check if it should run
135138
metadata = JobRegistry.get_metadata(job_type_enum)
136139
if not metadata:
137140
logger.error(f"No metadata found for job type: {job_type}")
138141
return
139142

143+
# For jobs that process messages, check if there are pending messages first
144+
# This prevents unnecessary job executions when there's no work
145+
if job_type in ["tweet", "discord", "stx_transfer"]:
146+
from app.backend.models import QueueMessageFilter
147+
148+
pending_messages = backend.list_queue_messages(
149+
filters=QueueMessageFilter(
150+
type=QueueMessageType.get_or_create(job_type),
151+
is_processed=False,
152+
)
153+
)
154+
155+
if not pending_messages:
156+
logger.debug(
157+
f"⏭️ Skipping {job_type} execution - no pending messages"
158+
)
159+
return
160+
161+
logger.debug(
162+
f"📝 Found {len(pending_messages)} pending {job_type} messages"
163+
)
164+
140165
# Create a synthetic queue message for scheduled execution
141166
# This allows the job to go through the proper executor pipeline with concurrency control
142167
synthetic_message = QueueMessage(
@@ -160,8 +185,8 @@ async def _execute_job_via_executor(self, job_type: str) -> None:
160185
synthetic_message, metadata.priority
161186
)
162187

163-
logger.debug(
164-
f"Enqueued scheduled job {job_type} with ID {job_id} (priority: {metadata.priority})"
188+
logger.info(
189+
f"✅ Queued {job_type} job {job_id} for execution (priority: {metadata.priority})"
165190
)
166191

167192
except Exception as e:

app/services/infrastructure/job_management/tasks/tweet_task.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ class TweetProcessingResult(RunnerResult):
4343
job_type="tweet",
4444
name="Tweet Processor",
4545
description="Processes and sends tweets for DAOs with automatic retry and error handling",
46-
interval_seconds=5,
47-
priority=JobPriority.HIGH,
46+
interval_seconds=30, # Reduced frequency from 5s to 30s
47+
priority=JobPriority.NORMAL, # Changed from HIGH to NORMAL to not dominate queue
4848
max_retries=3,
4949
retry_delay_seconds=60,
5050
timeout_seconds=300,
51-
max_concurrent=1,
51+
max_concurrent=2, # Increased from 1 to 2 to allow parallel processing
5252
requires_twitter=True,
5353
batch_size=5,
5454
enable_dead_letter_queue=True,
@@ -200,7 +200,7 @@ async def _validate_prerequisites(self, context: JobContext) -> bool:
200200
async def _validate_task_specific(self, context: JobContext) -> bool:
201201
"""Validate task-specific conditions."""
202202
if not self._pending_messages:
203-
logger.debug("No pending tweet messages found")
203+
logger.debug("No pending tweet messages found - skipping execution")
204204
return False
205205

206206
# Validate each message before processing

app/services/integrations/webhooks/chainhook/handlers/airdrop_stx_handler.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,52 +53,73 @@ def can_handle_transaction(self, transaction: TransactionWithReceipt) -> bool:
5353
tx_data_content = tx_data["tx_data"]
5454
tx_metadata = tx_data["tx_metadata"]
5555

56+
# Add debug logging
57+
tx_hash = transaction.transaction_identifier.hash
58+
self.logger.debug(f"AirdropSTXHandler checking transaction: {tx_hash}")
59+
5660
# Only handle ContractCall type transactions
5761
if not isinstance(tx_kind, dict):
58-
self.logger.debug(f"Skipping: tx_kind is not a dict: {type(tx_kind)}")
62+
self.logger.debug(
63+
f"[{tx_hash}] Rejecting: tx_kind is not a dict: {type(tx_kind)}"
64+
)
5965
return False
6066

6167
tx_kind_type = tx_kind.get("type")
68+
self.logger.debug(f"[{tx_hash}] tx_kind_type: {tx_kind_type}")
6269

6370
if not isinstance(tx_data_content, dict):
6471
self.logger.debug(
65-
f"Skipping: tx_data_content is not a dict: {type(tx_data_content)}"
72+
f"[{tx_hash}] Rejecting: tx_data_content is not a dict: {type(tx_data_content)}"
6673
)
6774
return False
6875

6976
# Check if the method name is exactly "send-many"
7077
tx_method = tx_data_content.get("method", "")
7178
is_send_many = tx_method == "send-many"
79+
self.logger.debug(
80+
f"[{tx_hash}] method: '{tx_method}', is_send_many: {is_send_many}"
81+
)
7282

7383
# Check if this is the specific send-many contract
7484
contract_identifier = tx_data_content.get("contract_identifier", "")
7585
is_target_contract = contract_identifier == self.TARGET_CONTRACT
86+
self.logger.debug(
87+
f"[{tx_hash}] contract: '{contract_identifier}', is_target_contract: {is_target_contract}"
88+
)
89+
self.logger.debug(f"[{tx_hash}] TARGET_CONTRACT: '{self.TARGET_CONTRACT}'")
7690

7791
# Access success from TransactionMetadata
7892
tx_success = tx_metadata.success
93+
self.logger.debug(
94+
f"[{tx_hash}] tx_success: {tx_success} (type: {type(tx_success)})"
95+
)
7996

8097
# Check if there are any STX transfers to agent accounts in the events
8198
events = tx_metadata.receipt.events if hasattr(tx_metadata, "receipt") else []
82-
has_agent_account_transfers = self._has_agent_account_stx_transfers(events)
99+
self.logger.debug(f"[{tx_hash}] events count: {len(events)}")
83100

84-
if (
85-
is_send_many
86-
and is_target_contract
87-
and tx_success
88-
and has_agent_account_transfers
89-
):
90-
self.logger.debug(
91-
f"Found send-many STX funding to agent accounts: {tx_method} on contract: {contract_identifier}"
92-
)
101+
has_agent_account_transfers = self._has_agent_account_stx_transfers(events)
102+
self.logger.debug(
103+
f"[{tx_hash}] has_agent_account_transfers: {has_agent_account_transfers}"
104+
)
93105

94-
return (
106+
final_result = (
95107
tx_kind_type == "ContractCall"
96108
and is_send_many
97109
and is_target_contract
98110
and tx_success is True
99111
and has_agent_account_transfers
100112
)
101113

114+
self.logger.debug(f"[{tx_hash}] Final result: {final_result}")
115+
116+
if final_result:
117+
self.logger.info(f"AirdropSTXHandler CLAIMING transaction: {tx_hash}")
118+
else:
119+
self.logger.info(f"AirdropSTXHandler REJECTING transaction: {tx_hash}")
120+
121+
return final_result
122+
102123
def _has_agent_account_stx_transfers(self, events: List[Event]) -> bool:
103124
"""Check if there are any STX transfers.
104125

0 commit comments

Comments
 (0)