Skip to content
This repository was archived by the owner on Mar 18, 2026. It is now read-only.

Commit eaeb771

Browse files
Merge pull request #441 from aibtcdev/fix-chain-state-monitoring
fix chain state monitoring
2 parents 3d05d3f + efd353d commit eaeb771

4 files changed

Lines changed: 28 additions & 62 deletions

File tree

app/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class SchedulerConfig:
142142
== "true"
143143
)
144144
agent_wallet_balance_monitor_interval_seconds: int = int(
145-
os.getenv("AIBTC_AGENT_WALLET_BALANCE_MONITOR_INTERVAL_SECONDS", "120")
145+
os.getenv("AIBTC_AGENT_WALLET_BALANCE_MONITOR_INTERVAL_SECONDS", "300")
146146
)
147147

148148
# chain_state_monitor job

app/services/infrastructure/job_management/tasks/agent_wallet_balance_monitor.py

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ def __post_init__(self):
4545
@job(
4646
job_type="agent_wallet_balance_monitor",
4747
name="Agent Wallet Balance Monitor",
48-
description="Monitors agent wallet STX balances and auto-funds low balance wallets every 2 minutes",
49-
interval_seconds=120, # 2 minutes
48+
description="Monitors agent wallet STX balances and auto-funds low balance wallets every 5 minutes",
49+
interval_seconds=300, # 5 minutes
5050
priority=JobPriority.HIGH,
5151
max_retries=3,
5252
retry_delay_seconds=60,
@@ -94,24 +94,7 @@ async def _validate_config(self, context: JobContext) -> bool:
9494

9595
async def _validate_resources(self, context: JobContext) -> bool:
9696
"""Validate resource availability for blockchain monitoring."""
97-
try:
98-
# Test HiroApi initialization and connectivity
99-
hiro_api = HiroApi()
100-
api_info = await hiro_api.aget_info()
101-
if not api_info:
102-
logger.error(
103-
"Cannot connect to Hiro API",
104-
extra={"task": "wallet_balance_monitor", "service": "hiro_api"},
105-
)
106-
return False
107-
108-
return True
109-
except Exception as e:
110-
logger.error(
111-
"Resource validation failed",
112-
extra={"task": "wallet_balance_monitor", "error": str(e)},
113-
)
114-
return False
97+
return True
11598

11699
async def _validate_task_specific(self, context: JobContext) -> bool:
117100
"""Validate task-specific conditions."""

app/services/infrastructure/job_management/tasks/chain_state_monitor.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ class ChainStateMonitorResult(RunnerResult):
3838
network: str = None
3939
is_stale: bool = False
4040
last_updated: Optional[datetime] = None
41-
elapsed_minutes: float = 0
4241
blocks_behind: int = 0
4342
blocks_processed: Optional[List[int]] = None
4443

@@ -684,17 +683,9 @@ async def _execute_impl(self, context: JobContext) -> List[ChainStateMonitorResu
684683
)
685684
return results
686685

687-
# Calculate how old the chain state is
688-
now = datetime.now()
686+
# Get the last updated time for logging purposes only
689687
last_updated = latest_chain_state.updated_at
690688

691-
# Convert last_updated to naive datetime if it has timezone info
692-
if last_updated.tzinfo is not None:
693-
last_updated = last_updated.replace(tzinfo=None)
694-
695-
time_difference = now - last_updated
696-
minutes_difference = time_difference.total_seconds() / 60
697-
698689
# Get current chain height from API
699690
try:
700691
logger.debug(
@@ -748,8 +739,8 @@ async def _execute_impl(self, context: JobContext) -> List[ChainStateMonitorResu
748739

749740
blocks_behind = current_api_block_height - db_block_height
750741

751-
# Consider stale if more than 10 blocks behind
752-
stale_threshold_blocks = 10
742+
# Consider stale if more than 30 blocks behind
743+
stale_threshold_blocks = 30
753744
is_stale = blocks_behind > stale_threshold_blocks
754745

755746
logger.info(
@@ -762,10 +753,10 @@ async def _execute_impl(self, context: JobContext) -> List[ChainStateMonitorResu
762753
},
763754
)
764755

765-
# Process missing blocks if we're behind
756+
# Process missing blocks if we're behind and stale
766757
if blocks_behind > 0 and is_stale:
767758
logger.warning(
768-
"Chain state is behind and exceeds threshold",
759+
"Chain state is behind and exceeds threshold, processing missing blocks",
769760
extra={
770761
"task": "chain_state_monitor",
771762
"blocks_behind": blocks_behind,
@@ -872,6 +863,20 @@ async def _execute_impl(self, context: JobContext) -> List[ChainStateMonitorResu
872863
burn_block_height,
873864
)
874865

866+
logger.info(
867+
"Generated chainhook message for block processing",
868+
extra={
869+
"task": "chain_state_monitor",
870+
"block_height": height,
871+
"block_hash": block_hash,
872+
"burn_block_height": burn_block_height,
873+
"transaction_count": transactions.total,
874+
"chainhook_uuid": chainhook_data.get(
875+
"chainhook", {}
876+
).get("uuid"),
877+
},
878+
)
879+
875880
# Process through chainhook service
876881
result = await self.chainhook_service.process(
877882
chainhook_data
@@ -906,7 +911,6 @@ async def _execute_impl(self, context: JobContext) -> List[ChainStateMonitorResu
906911
network=network,
907912
is_stale=is_stale,
908913
last_updated=last_updated,
909-
elapsed_minutes=minutes_difference,
910914
blocks_behind=blocks_behind,
911915
blocks_processed=blocks_processed,
912916
)
@@ -932,7 +936,6 @@ async def _execute_impl(self, context: JobContext) -> List[ChainStateMonitorResu
932936
network=network,
933937
is_stale=is_stale,
934938
last_updated=last_updated,
935-
elapsed_minutes=minutes_difference,
936939
blocks_behind=blocks_behind,
937940
)
938941
)
@@ -945,22 +948,19 @@ async def _execute_impl(self, context: JobContext) -> List[ChainStateMonitorResu
945948
extra={"task": "chain_state_monitor", "error": str(e)},
946949
exc_info=True,
947950
)
948-
# Fall back to legacy time-based staleness check if API call fails
951+
# Cannot determine staleness without API access
949952
logger.warning(
950-
"Falling back to time-based staleness check",
953+
"Cannot determine chain state without API access",
951954
extra={"task": "chain_state_monitor"},
952955
)
953-
stale_threshold_minutes = 5
954-
is_stale = minutes_difference > stale_threshold_minutes
955956

956957
results.append(
957958
ChainStateMonitorResult(
958959
success=False,
959-
message=f"Error checking chain height, using time-based check instead: {str(e)}",
960+
message=f"Error checking chain height: {str(e)}",
960961
network=network,
961-
is_stale=is_stale,
962+
is_stale=True, # Assume stale if we can't check
962963
last_updated=last_updated,
963-
elapsed_minutes=minutes_difference,
964964
)
965965
)
966966
return results

app/services/infrastructure/job_management/tasks/dao_token_holders_monitor.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -83,24 +83,7 @@ async def _validate_config(self, context: JobContext) -> bool:
8383

8484
async def _validate_resources(self, context: JobContext) -> bool:
8585
"""Validate resource availability for blockchain monitoring."""
86-
try:
87-
# Test HiroApi initialization and connectivity
88-
hiro_api = HiroApi()
89-
api_info = await hiro_api.aget_info()
90-
if not api_info:
91-
logger.error(
92-
"Cannot connect to Hiro API",
93-
extra={"task": "dao_token_holders_monitor"},
94-
)
95-
return False
96-
97-
return True
98-
except Exception as e:
99-
logger.error(
100-
"Resource validation failed",
101-
extra={"task": "dao_token_holders_monitor", "error": str(e)},
102-
)
103-
return False
86+
return True
10487

10588
async def _validate_task_specific(self, context: JobContext) -> bool:
10689
"""Validate task-specific conditions."""

0 commit comments

Comments
 (0)