Skip to content
This repository was archived by the owner on Mar 18, 2026. It is now read-only.

Commit f884b82

Browse files
Merge pull request #415 from aibtcdev/change-up-tweet
job manager
2 parents 1dd4b46 + 264811a commit f884b82

4 files changed

Lines changed: 35 additions & 8 deletions

File tree

agent-tools-ts

app/services/infrastructure/job_management/executor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,9 @@ async def _worker(self, worker_name: str) -> None:
253253
# Get next job from priority queue
254254
execution = await self.priority_queue.get_next_job()
255255
if not execution:
256-
await asyncio.sleep(0.1) # Brief pause if no jobs
256+
await asyncio.sleep(
257+
0.5
258+
) # Increased pause if no jobs to reduce CPU usage
257259
continue
258260

259261
# Check if we can acquire a slot for this job type

app/services/infrastructure/job_management/job_manager.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dataclasses import dataclass
55
from datetime import datetime
66
from typing import Any, Dict, List, Optional
7+
from app.backend.factory import backend
78

89
from apscheduler.schedulers.asyncio import AsyncIOScheduler
910

@@ -131,12 +132,36 @@ async def _execute_job_via_executor(self, job_type: str) -> None:
131132
# Convert job_type string to JobType enum
132133
job_type_enum = JobType.get_or_create(job_type)
133134

135+
logger.debug(f"🔍 Checking if {job_type} has work to do...")
136+
134137
# Get job metadata to check if it should run
135138
metadata = JobRegistry.get_metadata(job_type_enum)
136139
if not metadata:
137140
logger.error(f"No metadata found for job type: {job_type}")
138141
return
139142

143+
# For jobs that process messages, check if there are pending messages first
144+
# This prevents unnecessary job executions when there's no work
145+
if job_type in ["tweet", "discord", "stx_transfer"]:
146+
from app.backend.models import QueueMessageFilter
147+
148+
pending_messages = backend.list_queue_messages(
149+
filters=QueueMessageFilter(
150+
type=QueueMessageType.get_or_create(job_type),
151+
is_processed=False,
152+
)
153+
)
154+
155+
if not pending_messages:
156+
logger.debug(
157+
f"⏭️ Skipping {job_type} execution - no pending messages"
158+
)
159+
return
160+
161+
logger.debug(
162+
f"📝 Found {len(pending_messages)} pending {job_type} messages"
163+
)
164+
140165
# Create a synthetic queue message for scheduled execution
141166
# This allows the job to go through the proper executor pipeline with concurrency control
142167
synthetic_message = QueueMessage(
@@ -160,8 +185,8 @@ async def _execute_job_via_executor(self, job_type: str) -> None:
160185
synthetic_message, metadata.priority
161186
)
162187

163-
logger.debug(
164-
f"Enqueued scheduled job {job_type} with ID {job_id} (priority: {metadata.priority})"
188+
logger.info(
189+
f"✅ Queued {job_type} job {job_id} for execution (priority: {metadata.priority})"
165190
)
166191

167192
except Exception as e:

app/services/infrastructure/job_management/tasks/tweet_task.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ class TweetProcessingResult(RunnerResult):
4343
job_type="tweet",
4444
name="Tweet Processor",
4545
description="Processes and sends tweets for DAOs with automatic retry and error handling",
46-
interval_seconds=5,
47-
priority=JobPriority.HIGH,
46+
interval_seconds=30, # Reduced frequency from 5s to 30s
47+
priority=JobPriority.NORMAL, # Changed from HIGH to NORMAL to not dominate queue
4848
max_retries=3,
4949
retry_delay_seconds=60,
5050
timeout_seconds=300,
51-
max_concurrent=1,
51+
max_concurrent=2, # Increased from 1 to 2 to allow parallel processing
5252
requires_twitter=True,
5353
batch_size=5,
5454
enable_dead_letter_queue=True,
@@ -200,7 +200,7 @@ async def _validate_prerequisites(self, context: JobContext) -> bool:
200200
async def _validate_task_specific(self, context: JobContext) -> bool:
201201
"""Validate task-specific conditions."""
202202
if not self._pending_messages:
203-
logger.debug("No pending tweet messages found")
203+
logger.debug("No pending tweet messages found - skipping execution")
204204
return False
205205

206206
# Validate each message before processing

0 commit comments

Comments
 (0)