Skip to content
This repository was archived by the owner on Mar 18, 2026. It is now read-only.

Commit a95f5f4

Browse files
Merge branch 'main' into dependabot/uv/main/dev-dependencies-ba2befb238
2 parents 251b00b + 4a05899 commit a95f5f4

9 files changed

Lines changed: 169 additions & 47 deletions

File tree

app/api/tools/dao.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,23 +118,29 @@ async def _validate_airdrop_recipients(recipients: List[str]) -> Dict[str, bool]
118118
if not recipients:
119119
return {}
120120

121-
# Query wallets table for all recipients at once
122-
wallet_filter = WalletFilterN(
123-
mainnet_addresses=recipients, testnet_addresses=recipients
124-
)
121+
# Determine which network to check based on config
122+
from app.config import config
123+
124+
use_mainnet = config.network.network == "mainnet"
125+
126+
# Query wallets table for all recipients at once, filtering by the appropriate network
127+
if use_mainnet:
128+
wallet_filter = WalletFilterN(mainnet_addresses=recipients)
129+
else:
130+
wallet_filter = WalletFilterN(testnet_addresses=recipients)
131+
125132
wallets = backend.list_wallets_n(filters=wallet_filter)
126133

127-
# Create sets of valid addresses for efficient lookup
128-
valid_mainnet_addresses = {w.mainnet_address for w in wallets if w.mainnet_address}
129-
valid_testnet_addresses = {w.testnet_address for w in wallets if w.testnet_address}
134+
# Create set of valid addresses for efficient lookup
135+
if use_mainnet:
136+
valid_addresses = {w.mainnet_address for w in wallets if w.mainnet_address}
137+
else:
138+
valid_addresses = {w.testnet_address for w in wallets if w.testnet_address}
130139

131140
# Check each recipient
132141
validation_results = {}
133142
for recipient in recipients:
134-
is_valid = (
135-
recipient in valid_mainnet_addresses or recipient in valid_testnet_addresses
136-
)
137-
validation_results[recipient] = is_valid
143+
validation_results[recipient] = recipient in valid_addresses
138144

139145
return validation_results
140146

app/backend/supabase.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2265,12 +2265,11 @@ def get_lottery_result_by_proposal(
22652265
self.client.table("lottery_results")
22662266
.select("*")
22672267
.eq("proposal_id", str(proposal_id))
2268-
.single()
22692268
.execute()
22702269
)
22712270
if not response.data:
22722271
return None
2273-
return LotteryResult(**response.data)
2272+
return LotteryResult(**response.data[0])
22742273

22752274
def list_lottery_results(
22762275
self, filters: Optional["LotteryResultFilter"] = None

app/lib/logger.py

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import os
34
from typing import Optional
@@ -12,6 +13,51 @@
1213
}
1314

1415

16+
class JSONFormatter(logging.Formatter):
17+
"""JSON formatter that outputs single-line structured logs."""
18+
19+
def format(self, record):
20+
log_entry = {
21+
"timestamp": self.formatTime(record, self.datefmt),
22+
"level": record.levelname,
23+
"logger": record.name,
24+
"message": record.getMessage(),
25+
}
26+
27+
# Add exception info if present
28+
if record.exc_info:
29+
log_entry["exception"] = self.formatException(record.exc_info)
30+
31+
# Add extra fields if present
32+
for key, value in record.__dict__.items():
33+
if key not in [
34+
"name",
35+
"msg",
36+
"args",
37+
"levelname",
38+
"levelno",
39+
"pathname",
40+
"filename",
41+
"module",
42+
"exc_info",
43+
"exc_text",
44+
"stack_info",
45+
"lineno",
46+
"funcName",
47+
"created",
48+
"msecs",
49+
"relativeCreated",
50+
"thread",
51+
"threadName",
52+
"processName",
53+
"process",
54+
"message",
55+
]:
56+
log_entry[key] = value
57+
58+
return json.dumps(log_entry, ensure_ascii=False)
59+
60+
1561
def configure_logger(name: Optional[str] = None) -> logging.Logger:
1662
"""
1763
Configure and return a logger instance with consistent formatting and level.
@@ -34,37 +80,37 @@ def configure_logger(name: Optional[str] = None) -> logging.Logger:
3480
if not logger.handlers:
3581
console_handler = logging.StreamHandler()
3682
console_handler.setLevel(log_level)
37-
formatter = logging.Formatter("%(levelname)s: %(message)s")
83+
formatter = JSONFormatter()
3884
console_handler.setFormatter(formatter)
3985
logger.addHandler(console_handler)
4086

4187
return logger
4288

4389

4490
def setup_uvicorn_logging():
45-
"""Configure uvicorn and other loggers to use normal formatting."""
91+
"""Configure uvicorn and other loggers to use JSON formatting."""
4692
# Disable uvicorn access logging since we handle it with middleware
4793
logging.getLogger("uvicorn.access").disabled = True
4894

49-
# Create a standard formatter
50-
standard_formatter = logging.Formatter("%(levelname)s: %(message)s")
95+
# Create a JSON formatter
96+
json_formatter = JSONFormatter()
5197

5298
# Get all existing loggers and configure them
5399
for logger_name in ["uvicorn", "uvicorn.error", "fastapi"]:
54100
logger = logging.getLogger(logger_name)
55101
for handler in logger.handlers:
56-
handler.setFormatter(standard_formatter)
102+
handler.setFormatter(json_formatter)
57103

58104
# Configure root logger
59105
root_logger = logging.getLogger()
60106
for handler in root_logger.handlers:
61-
handler.setFormatter(standard_formatter)
107+
handler.setFormatter(json_formatter)
62108

63109
# Set up a hook to catch any new handlers that get added later
64110
original_add_handler = logging.Logger.addHandler
65111

66112
def patched_add_handler(self, hdlr):
67-
hdlr.setFormatter(standard_formatter)
113+
hdlr.setFormatter(json_formatter)
68114
return original_add_handler(self, hdlr)
69115

70116
logging.Logger.addHandler = patched_add_handler

app/middleware/logging.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,14 @@ async def dispatch(self, request: Request, call_next):
3737
"process_time_ms": round(process_time * 1000, 2),
3838
}
3939

40-
# Log the request/response
40+
# Log the request/response with structured data
4141
logger.info(
42-
f"{request_info['method']} {request_info['path']} - {response_info['status_code']} - {response_info['process_time_ms']}ms"
42+
f"HTTP {request_info['method']} {request_info['path']} - {response_info['status_code']}",
43+
extra={
44+
"request": request_info,
45+
"response": response_info,
46+
"event_type": "http_request",
47+
},
4348
)
4449

4550
return response

app/services/infrastructure/job_management/executor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,9 @@ async def _worker(self, worker_name: str) -> None:
253253
# Get next job from priority queue
254254
execution = await self.priority_queue.get_next_job()
255255
if not execution:
256-
await asyncio.sleep(0.1) # Brief pause if no jobs
256+
await asyncio.sleep(
257+
0.5
258+
) # Increased pause if no jobs to reduce CPU usage
257259
continue
258260

259261
# Check if we can acquire a slot for this job type

app/services/infrastructure/job_management/job_manager.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dataclasses import dataclass
55
from datetime import datetime
66
from typing import Any, Dict, List, Optional
7+
from app.backend.factory import backend
78

89
from apscheduler.schedulers.asyncio import AsyncIOScheduler
910

@@ -122,6 +123,7 @@ def _get_job_interval(self, job_type, metadata: JobMetadata) -> int:
122123

123124
async def _execute_job_via_executor(self, job_type: str) -> None:
124125
"""Execute a job through the enhanced executor system with proper concurrency control."""
126+
logger.info(f"🚀 Scheduled execution triggered for job type: {job_type}")
125127
try:
126128
from app.backend.models import QueueMessage, QueueMessageType
127129

@@ -130,13 +132,38 @@ async def _execute_job_via_executor(self, job_type: str) -> None:
130132

131133
# Convert job_type string to JobType enum
132134
job_type_enum = JobType.get_or_create(job_type)
135+
logger.debug(f"Converted job type '{job_type}' to enum: {job_type_enum}")
136+
137+
logger.debug(f"🔍 Checking if {job_type} has work to do...")
133138

134139
# Get job metadata to check if it should run
135140
metadata = JobRegistry.get_metadata(job_type_enum)
136141
if not metadata:
137142
logger.error(f"No metadata found for job type: {job_type}")
138143
return
139144

145+
# For jobs that process messages, check if there are pending messages first
146+
# This prevents unnecessary job executions when there's no work
147+
if job_type in ["tweet", "discord", "stx_transfer"]:
148+
from app.backend.models import QueueMessageFilter
149+
150+
pending_messages = backend.list_queue_messages(
151+
filters=QueueMessageFilter(
152+
type=QueueMessageType.get_or_create(job_type),
153+
is_processed=False,
154+
)
155+
)
156+
157+
if not pending_messages:
158+
logger.debug(
159+
f"⏭️ Skipping {job_type} execution - no pending messages"
160+
)
161+
return
162+
163+
logger.debug(
164+
f"📝 Found {len(pending_messages)} pending {job_type} messages"
165+
)
166+
140167
# Create a synthetic queue message for scheduled execution
141168
# This allows the job to go through the proper executor pipeline with concurrency control
142169
synthetic_message = QueueMessage(
@@ -147,21 +174,22 @@ async def _execute_job_via_executor(self, job_type: str) -> None:
147174
"triggered_at": str(datetime.now()),
148175
},
149176
dao_id=None,
150-
tweet_id=None,
151-
conversation_id=None,
177+
wallet_id=None,
152178
is_processed=False,
153179
result=None,
154180
created_at=datetime.now(),
155-
updated_at=datetime.now(),
156181
)
157182

158183
# Enqueue the synthetic message with the job's priority
184+
logger.debug(
185+
f"Enqueuing job {job_type} to executor with priority {metadata.priority}"
186+
)
159187
job_id = await self._executor.priority_queue.enqueue(
160188
synthetic_message, metadata.priority
161189
)
162190

163-
logger.debug(
164-
f"Enqueued scheduled job {job_type} with ID {job_id} (priority: {metadata.priority})"
191+
logger.info(
192+
f"Enqueued scheduled job '{job_type}' with ID {job_id} (priority: {metadata.priority})"
165193
)
166194

167195
except Exception as e:

app/services/infrastructure/job_management/tasks/tweet_task.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ class TweetProcessingResult(RunnerResult):
4343
job_type="tweet",
4444
name="Tweet Processor",
4545
description="Processes and sends tweets for DAOs with automatic retry and error handling",
46-
interval_seconds=5,
47-
priority=JobPriority.HIGH,
46+
interval_seconds=30, # Reduced frequency from 5s to 30s
47+
priority=JobPriority.NORMAL, # Changed from HIGH to NORMAL to not dominate queue
4848
max_retries=3,
4949
retry_delay_seconds=60,
5050
timeout_seconds=300,
51-
max_concurrent=1,
51+
max_concurrent=2, # Increased from 1 to 2 to allow parallel processing
5252
requires_twitter=True,
5353
batch_size=5,
5454
enable_dead_letter_queue=True,
@@ -200,7 +200,7 @@ async def _validate_prerequisites(self, context: JobContext) -> bool:
200200
async def _validate_task_specific(self, context: JobContext) -> bool:
201201
"""Validate task-specific conditions."""
202202
if not self._pending_messages:
203-
logger.debug("No pending tweet messages found")
203+
logger.debug("No pending tweet messages found - skipping execution")
204204
return False
205205

206206
# Validate each message before processing

app/services/infrastructure/startup_service.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,34 @@ async def start_enhanced_job_system(self) -> None:
9191
raise RuntimeError("Job system initialization failed")
9292

9393
# Schedule jobs with the scheduler
94+
logger.info("Attempting to schedule jobs with the job manager...")
9495
any_jobs_scheduled = self.job_manager.schedule_jobs(self.scheduler)
9596
if any_jobs_scheduled:
9697
# Start the scheduler
98+
logger.info("Starting APScheduler with scheduled jobs...")
9799
self.scheduler.start()
98100
logger.info("Job scheduler started successfully")
101+
102+
# Log scheduler status
103+
jobs = self.scheduler.get_jobs()
104+
logger.info(
105+
f"APScheduler is running with {len(jobs)} active job schedules:"
106+
)
107+
for job in jobs:
108+
logger.info(f" - Job ID: {job.id}, next run: {job.next_run_time}")
99109
else:
100-
logger.warning("No jobs were scheduled")
110+
logger.warning("No jobs were scheduled - scheduler will not be started")
101111

102112
# Start the job executor
113+
logger.info("Starting job executor for background job processing...")
103114
await self.job_manager.start_executor()
104-
logger.info("Enhanced job manager executor started successfully")
115+
executor_stats = self.job_manager.get_executor_stats()
116+
logger.info(
117+
f"Enhanced job executor started with {executor_stats.get('worker_count', 0)} workers"
118+
)
105119

106120
# Start system metrics collection
121+
logger.info("Starting system metrics monitoring...")
107122
await system_metrics.start_monitoring()
108123
logger.info("System metrics monitoring started")
109124

0 commit comments

Comments
 (0)