From 5822b8b6c0800be049484b5a65b4ea9d8c69bc7e Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Wed, 22 Apr 2026 16:12:19 +0800 Subject: [PATCH 1/7] fix all redis pubsub connection --- dtable_events/activities/handlers.py | 38 +++++++++++-- dtable_events/api_calls/api_calls_counter.py | 37 ++++++++++-- dtable_events/app/event_redis.py | 56 ++++++++++++++++--- .../automations/automations_pipeline.py | 45 +++++++++++---- dtable_events/notification_rules/handler.py | 36 ++++++++++-- dtable_events/statistics/counter.py | 35 ++++++++++-- dtable_events/tasks/ai_stats_worker.py | 40 +++++++++++-- .../tasks/dtable_real_time_rows_counter.py | 37 ++++++++++-- dtable_events/tasks/metrics.py | 36 ++++++++++-- .../tasks/universal_app_auto_buckup.py | 35 ++++++++++-- dtable_events/webhook/webhook.py | 41 ++++++++++++-- dtable_events/workflow/workflow_actions.py | 35 ++++++++++-- 12 files changed, 406 insertions(+), 65 deletions(-) diff --git a/dtable_events/activities/handlers.py b/dtable_events/activities/handlers.py index 5d7511bd..668a53f0 100644 --- a/dtable_events/activities/handlers.py +++ b/dtable_events/activities/handlers.py @@ -30,17 +30,25 @@ def __init__(self, app): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self.app = app + self._pubsub_channel_name = 'table-events' + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle table activities...') - subscriber = self._redis_client.get_subscriber('table-events') - + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() + now = time.time() if message is not None: + if message.get('type') != 'message': + continue event = json.loads(message['data']) if event['op_type'] not in self.SUPPORT_OPERATION_TYPES: continue @@ -55,8 +63,28 @@ def run(self): logger.error('Handle activities message failed: %s' % e) finally: session.close() + last_pubsub_message_time = now + last_pubsub_health_check_time = last_pubsub_message_time else: + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (now - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('table-events') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/api_calls/api_calls_counter.py b/dtable_events/api_calls/api_calls_counter.py index c66950c9..a6024e1e 100644 --- a/dtable_events/api_calls/api_calls_counter.py +++ b/dtable_events/api_calls/api_calls_counter.py @@ -19,12 +19,16 @@ logger = logging.getLogger(__name__) -class APICallsCounter: +class APICallsCounter(object): def __init__(self): self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self.keep_months = 3 # including this month + self._pubsub_channel_name = 'stats_api_calls' + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 def count_api_gateway(self, info, db_session): try: @@ -126,12 +130,16 @@ def count_api_calls(self, info, db_session): def count(self): logger.info('Starting count api calls...') - subscriber = self._redis_client.get_subscriber('stats_api_calls') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue msg = json.loads(message['data']) session = self._db_session_class() try: @@ -141,10 +149,29 @@ def count(self): finally: session.close() else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('stats_api_calls') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time def clean(self): logger.info('Starting schedule clean api calls...') diff --git a/dtable_events/app/event_redis.py b/dtable_events/app/event_redis.py index 5dbe73d7..36e98013 100644 --- a/dtable_events/app/event_redis.py +++ b/dtable_events/app/event_redis.py @@ -14,21 +14,41 @@ class RedisClient(object): - def __init__(self, socket_connect_timeout=30, socket_timeout=None): + def __init__(self, socket_connect_timeout=30, socket_timeout=None, + health_check_interval=None, retry_on_timeout=None): self._host = '127.0.0.1' self._port = 6379 self._password = None self._parse_config() + self._connection_kwargs = { + 'host': self._host, + 'port': self._port, + 'password': self._password, + 'socket_timeout': socket_timeout, + 'socket_connect_timeout': socket_connect_timeout, + 'decode_responses': True, + } + if health_check_interval is not None: + self._connection_kwargs['health_check_interval'] = health_check_interval + if retry_on_timeout is not None: + self._connection_kwargs['retry_on_timeout'] = retry_on_timeout """ By default, each Redis instance created will in turn create its own connection pool. Every caller using redis client will has it's own pool with config caller passed. """ - self.connection = redis.Redis( - host=self._host, port=self._port, password=self._password, - socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout, - decode_responses=True - ) + self.connection = self._build_connection() + + def _build_connection(self): + return redis.Redis(**self._connection_kwargs) + + def reconnect(self): + try: + self.connection.connection_pool.disconnect() + except Exception: + pass + self.connection = self._build_connection() + return self.connection def _parse_config(self): @@ -50,6 +70,23 @@ def get_subscriber(self, channel_name): else: return subscriber + def close_subscriber(self, subscriber): + if not subscriber: + return + try: + subscriber.close() + except Exception as e: + logger.debug('close redis subscriber failed: %s', e) + + def refresh_subscriber(self, subscriber, pubsub_channel_name, reason='unknown'): + logger.warning('reconnect redis pubsub channel=%s reason=%s', pubsub_channel_name, reason) + self.close_subscriber(subscriber) + try: + self.reconnect() + except Exception as e: + logger.warning('redis reconnect failed channel=%s error=%s', pubsub_channel_name, e) + return self.get_subscriber(pubsub_channel_name) + def get(self, key): return self.connection.get(key) @@ -63,7 +100,12 @@ def delete(self, key): return self.connection.delete(key) def publish(self, channel_name, message): - return self.connection.publish(channel_name, message) + try: + return self.connection.publish(channel_name, message) + except Exception as e: + logger.warning('redis publish failed on %s: %s', channel_name, e) + self.reconnect() + return self.connection.publish(channel_name, message) class RedisCache(object): diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index a7159a2c..4a5d6b2f 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -65,24 +65,25 @@ def get_percent(self, owner, org_id, workers): return self.counters.get(limit_key, 0) / (self.window_secs * workers) -class AutomationsPipeline: +class AutomationsPipeline(object): def __init__(self): self.workers = 5 self.automations_queue: Queue[AutomationRule] = Queue() self.results_queue: Queue[AutomationResult] = Queue() + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 self._db_session_class = init_db_session_class() - self._redis_client = RedisClient(socket_timeout=10) + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=10, + health_check_interval=30, retry_on_timeout=True) self.per_update_channel = 'automation-rule-triggered' self.rate_limiter = RateLimiter() self.automations_stats_manager = AutomationsStatsManager() - self.log_none_message_timeout = 10 * 60 - # metrics self.realtime_trigger_count = 0 self.scheduled_trigger_count = 0 @@ -143,15 +144,19 @@ def get_automation_rule(self, db_session, event_data): def receive(self): auto_rule_logger.info(f"Start to receive automation event from redis, window seconds {self.rate_limiter.window_secs} limit percent {self.rate_limiter.percent}") subscriber = self._redis_client.get_subscriber(self.per_update_channel) - last_message_time = datetime.now() + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while True: try: message = subscriber.get_message() self.realtime_automation_heartbeat = time.time() if message is not None: + if message.get('type') != 'message': + continue event = json.loads(message['data']) auto_rule_logger.info(f"subscribe event {event}") - last_message_time = datetime.now() + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time db_session = self._db_session_class() try: @@ -192,14 +197,30 @@ def receive(self): finally: db_session.close() else: - if (datetime.now() - last_message_time).seconds >= self.log_none_message_timeout: - auto_rule_logger.info(f'No message for {self.log_none_message_timeout}s...') - last_message_time = datetime.now() + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self.per_update_channel, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if now - last_pubsub_message_time >= self._pubsub_no_message_timeout: + auto_rule_logger.info('no automation message for %ss', self._pubsub_no_message_timeout) + subscriber = self._redis_client.refresh_subscriber( + subscriber, self.per_update_channel, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - auto_rule_logger.exception('Failed get automation rules message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('automation-rule-triggered') - last_message_time = datetime.now() + auto_rule_logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self.per_update_channel, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time def worker(self): while True: diff --git a/dtable_events/notification_rules/handler.py b/dtable_events/notification_rules/handler.py index c0a529ed..ac7759bc 100644 --- a/dtable_events/notification_rules/handler.py +++ b/dtable_events/notification_rules/handler.py @@ -15,16 +15,23 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = 'notification-rule-triggered' + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle notification rules...') - subscriber = self._redis_client.get_subscriber('notification-rule-triggered') - + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue event = json.loads(message['data']) session = self._db_session_class() try: @@ -34,7 +41,26 @@ def run(self): finally: session.close() else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get notification rules message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('notification-rule-triggered') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/statistics/counter.py b/dtable_events/statistics/counter.py index 890cb796..5bd63439 100644 --- a/dtable_events/statistics/counter.py +++ b/dtable_events/statistics/counter.py @@ -16,16 +16,24 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = 'user-activity-statistic' + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting count user activity...') - subscriber = self._redis_client.get_subscriber('user-activity-statistic') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue msg = json.loads(message['data']) session = self._db_session_class() try: @@ -35,7 +43,26 @@ def run(self): finally: session.close() else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('user-activity-statistic') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/tasks/ai_stats_worker.py b/dtable_events/tasks/ai_stats_worker.py index c7a6df71..c12fdc0a 100644 --- a/dtable_events/tasks/ai_stats_worker.py +++ b/dtable_events/tasks/ai_stats_worker.py @@ -18,15 +18,18 @@ logger = logging.getLogger(__name__) -class AIStatsWorker: +class AIStatsWorker(object): def __init__(self): self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self.stats_lock = Lock() - self.channel = 'log_ai_model_usage' + self._pubsub_channel_name = 'log_ai_model_usage' self.keep_months = 3 self.owner_info_cache_timeout = 24 * 60 * 60 + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 self._parse_config() self.reset_stats() @@ -94,17 +97,23 @@ def save_to_memory(self, usage_info, session): def receive(self): logger.info('Starts to receive ai calls...') - subscriber = self._redis_client.get_subscriber(self.channel) + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while True: try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue try: usage_info = json.loads(message['data']) except: logger.warning('log_ai_model_usage message invalid') continue + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time session = self._db_session_class() logger.debug('usage_info %s', usage_info) try: @@ -115,10 +124,29 @@ def receive(self): finally: session.close() else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber(self.channel) + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time def get_assistant_cache_key(self, assistant_uuid): return f'assistant:{assistant_uuid}:owner' diff --git a/dtable_events/tasks/dtable_real_time_rows_counter.py b/dtable_events/tasks/dtable_real_time_rows_counter.py index 9ee61d73..f7897e11 100644 --- a/dtable_events/tasks/dtable_real_time_rows_counter.py +++ b/dtable_events/tasks/dtable_real_time_rows_counter.py @@ -118,16 +118,24 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = 'count-rows' + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle table rows count...') - subscriber = self._redis_client.get_subscriber('count-rows') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue dtable_uuids = json.loads(message['data']) session = self._db_session_class() try: @@ -136,8 +144,29 @@ def run(self): logger.error('Handle table rows count: %s' % e) finally: session.close() + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('count-rows') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/tasks/metrics.py b/dtable_events/tasks/metrics.py index f20fa780..623be9a5 100644 --- a/dtable_events/tasks/metrics.py +++ b/dtable_events/tasks/metrics.py @@ -22,16 +22,25 @@ class MetricReceiver(Thread): def __init__(self): Thread.__init__(self) self._finished = Event() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = METRIC_CHANNEL_NAME + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 + def run(self): if not self._redis_client.connection: logging.warning('Redis connection is not established.') return - subscriber = self._redis_client.get_subscriber(METRIC_CHANNEL_NAME) + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message: + if message.get('type') != 'message': + continue metric_data = json.loads(message['data']) try: component_name = metric_data.get('component_name') @@ -46,10 +55,29 @@ def run(self): except Exception as e: logging.error('Error when handling metric data: %s' % e) else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logging.error('Failed handle metric %s' % e) - subscriber = self._redis_client.get_subscriber(METRIC_CHANNEL_NAME) + logging.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time class MetricSaver(Thread): diff --git a/dtable_events/tasks/universal_app_auto_buckup.py b/dtable_events/tasks/universal_app_auto_buckup.py index 313fd995..7c415e5a 100644 --- a/dtable_events/tasks/universal_app_auto_buckup.py +++ b/dtable_events/tasks/universal_app_auto_buckup.py @@ -21,8 +21,12 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self._lock = Lock() + self._pubsub_channel_name = 'universal-app-auto-backup' + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 def create_snapshot(self, session, app_id, app_version, app_config): """ @@ -137,11 +141,15 @@ def should_auto_backup(self, session, app_id, app_version): def run(self): logger.info('Starting universal app auto backup thread') - subscriber = self._redis_client.get_subscriber('universal-app-auto-backup') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue msg = json.loads(message['data']) user_name = msg.get('username', '') app_id = msg.get('app_id', '') @@ -162,7 +170,26 @@ def run(self): finally: session.close() else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('universal-app-auto-backup') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/webhook/webhook.py b/dtable_events/webhook/webhook.py index 4bc2f1e1..f582822a 100644 --- a/dtable_events/webhook/webhook.py +++ b/dtable_events/webhook/webhook.py @@ -1,5 +1,6 @@ import json import logging +import time from datetime import datetime from threading import Thread from queue import Queue @@ -28,9 +29,12 @@ class Webhooker(object): """ def __init__(self): self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() - self._subscriber = self._redis_client.get_subscriber('table-events') + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self.job_queue = Queue() + self._pubsub_channel_name = 'table-events' + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 def start(self): logger.info('Starting handle webhook jobs...') @@ -40,9 +44,13 @@ def start(self): def add_jobs(self): """all events from redis are kind of update so far""" + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while True: try: - for message in self._subscriber.listen(): + message = subscriber.get_message() + if message is not None: if message['type'] != 'message': continue try: @@ -66,9 +74,32 @@ def add_jobs(self): logger.error('add jobs error: %s' % e) finally: session.close() + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + time.sleep(0.5) except Exception as e: - logger.error('webhook sub from redis error: %s', e) - self._subscriber = self._redis_client.get_subscriber('table-events') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time def invalidate_webhook(self, webhook_id, db_session): sql = "UPDATE webhooks SET is_valid=0 WHERE id=:webhook_id" diff --git a/dtable_events/workflow/workflow_actions.py b/dtable_events/workflow/workflow_actions.py index 199295d7..36dc1170 100644 --- a/dtable_events/workflow/workflow_actions.py +++ b/dtable_events/workflow/workflow_actions.py @@ -167,16 +167,24 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = 'workflow-actions' + self._pubsub_health_check_interval = 30 + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle workflow actions...') - subscriber = self._redis_client.get_subscriber('workflow-actions') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue sub_data = json.loads(message['data']) session = self._db_session_class() task_id = sub_data['task_id'] @@ -189,7 +197,26 @@ def run(self): finally: session.close() else: + now = time.time() + if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: + last_pubsub_health_check_time = now + try: + subscriber.ping() + except Exception as e: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get workflow-actions message: %s', e) - subscriber = self._redis_client.get_subscriber('workflow-actions') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() + last_pubsub_health_check_time = last_pubsub_message_time From cdcff73d75596ecff9393e557201250bd52d98c6 Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Thu, 23 Apr 2026 11:46:22 +0800 Subject: [PATCH 2/7] remove ping check in pubsub workers --- dtable_events/activities/handlers.py | 20 ++----------------- dtable_events/api_calls/api_calls_counter.py | 16 +-------------- .../automations/automations_pipeline.py | 20 ++----------------- dtable_events/notification_rules/handler.py | 16 +-------------- dtable_events/statistics/counter.py | 16 +-------------- dtable_events/tasks/ai_stats_worker.py | 18 +---------------- .../tasks/dtable_real_time_rows_counter.py | 18 +---------------- dtable_events/tasks/metrics.py | 16 +-------------- .../tasks/universal_app_auto_buckup.py | 16 +-------------- dtable_events/webhook/webhook.py | 18 +---------------- dtable_events/workflow/workflow_actions.py | 16 +-------------- 11 files changed, 13 insertions(+), 177 deletions(-) diff --git a/dtable_events/activities/handlers.py b/dtable_events/activities/handlers.py index 668a53f0..3c0c9f22 100644 --- a/dtable_events/activities/handlers.py +++ b/dtable_events/activities/handlers.py @@ -34,21 +34,19 @@ def __init__(self, app): health_check_interval=30, retry_on_timeout=True) self.app = app self._pubsub_channel_name = 'table-events' - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle table activities...') subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() - now = time.time() if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() event = json.loads(message['data']) if event['op_type'] not in self.SUPPORT_OPERATION_TYPES: continue @@ -63,28 +61,14 @@ def run(self): logger.error('Handle activities message failed: %s' % e) finally: session.close() - last_pubsub_message_time = now - last_pubsub_health_check_time = last_pubsub_message_time else: - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue - if (now - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/api_calls/api_calls_counter.py b/dtable_events/api_calls/api_calls_counter.py index a6024e1e..9064283e 100644 --- a/dtable_events/api_calls/api_calls_counter.py +++ b/dtable_events/api_calls/api_calls_counter.py @@ -27,7 +27,6 @@ def __init__(self): health_check_interval=30, retry_on_timeout=True) self.keep_months = 3 # including this month self._pubsub_channel_name = 'stats_api_calls' - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 def count_api_gateway(self, info, db_session): @@ -132,7 +131,6 @@ def count(self): logger.info('Starting count api calls...') subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: @@ -140,6 +138,7 @@ def count(self): if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() msg = json.loads(message['data']) session = self._db_session_class() try: @@ -149,29 +148,16 @@ def count(self): finally: session.close() else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time def clean(self): logger.info('Starting schedule clean api calls...') diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index 4a5d6b2f..f5b8f059 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -71,7 +71,6 @@ def __init__(self): self.workers = 5 self.automations_queue: Queue[AutomationRule] = Queue() self.results_queue: Queue[AutomationResult] = Queue() - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 self._db_session_class = init_db_session_class() @@ -145,7 +144,6 @@ def receive(self): auto_rule_logger.info(f"Start to receive automation event from redis, window seconds {self.rate_limiter.window_secs} limit percent {self.rate_limiter.percent}") subscriber = self._redis_client.get_subscriber(self.per_update_channel) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while True: try: message = subscriber.get_message() @@ -153,10 +151,9 @@ def receive(self): if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() event = json.loads(message['data']) auto_rule_logger.info(f"subscribe event {event}") - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time db_session = self._db_session_class() try: @@ -197,30 +194,17 @@ def receive(self): finally: db_session.close() else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self.per_update_channel, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue - if now - last_pubsub_message_time >= self._pubsub_no_message_timeout: + if time.time() - last_pubsub_message_time >= self._pubsub_no_message_timeout: auto_rule_logger.info('no automation message for %ss', self._pubsub_no_message_timeout) subscriber = self._redis_client.refresh_subscriber( subscriber, self.per_update_channel, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: auto_rule_logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self.per_update_channel, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time def worker(self): while True: diff --git a/dtable_events/notification_rules/handler.py b/dtable_events/notification_rules/handler.py index ac7759bc..74b065e3 100644 --- a/dtable_events/notification_rules/handler.py +++ b/dtable_events/notification_rules/handler.py @@ -18,20 +18,19 @@ def __init__(self): self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, health_check_interval=30, retry_on_timeout=True) self._pubsub_channel_name = 'notification-rule-triggered' - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle notification rules...') subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() event = json.loads(message['data']) session = self._db_session_class() try: @@ -41,26 +40,13 @@ def run(self): finally: session.close() else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/statistics/counter.py b/dtable_events/statistics/counter.py index 5bd63439..826f41ba 100644 --- a/dtable_events/statistics/counter.py +++ b/dtable_events/statistics/counter.py @@ -19,14 +19,12 @@ def __init__(self): self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, health_check_interval=30, retry_on_timeout=True) self._pubsub_channel_name = 'user-activity-statistic' - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting count user activity...') subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: @@ -34,6 +32,7 @@ def run(self): if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = last_pubsub_message_time() msg = json.loads(message['data']) session = self._db_session_class() try: @@ -43,26 +42,13 @@ def run(self): finally: session.close() else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/tasks/ai_stats_worker.py b/dtable_events/tasks/ai_stats_worker.py index c12fdc0a..b6561ba7 100644 --- a/dtable_events/tasks/ai_stats_worker.py +++ b/dtable_events/tasks/ai_stats_worker.py @@ -28,7 +28,6 @@ def __init__(self): self._pubsub_channel_name = 'log_ai_model_usage' self.keep_months = 3 self.owner_info_cache_timeout = 24 * 60 * 60 - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 self._parse_config() self.reset_stats() @@ -99,7 +98,6 @@ def receive(self): logger.info('Starts to receive ai calls...') subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while True: try: @@ -107,13 +105,12 @@ def receive(self): if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() try: usage_info = json.loads(message['data']) except: logger.warning('log_ai_model_usage message invalid') continue - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time session = self._db_session_class() logger.debug('usage_info %s', usage_info) try: @@ -124,29 +121,16 @@ def receive(self): finally: session.close() else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time def get_assistant_cache_key(self, assistant_uuid): return f'assistant:{assistant_uuid}:owner' diff --git a/dtable_events/tasks/dtable_real_time_rows_counter.py b/dtable_events/tasks/dtable_real_time_rows_counter.py index f7897e11..2296072c 100644 --- a/dtable_events/tasks/dtable_real_time_rows_counter.py +++ b/dtable_events/tasks/dtable_real_time_rows_counter.py @@ -121,7 +121,6 @@ def __init__(self): self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, health_check_interval=30, retry_on_timeout=True) self._pubsub_channel_name = 'count-rows' - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 @@ -129,13 +128,13 @@ def run(self): logger.info('Starting handle table rows count...') subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() dtable_uuids = json.loads(message['data']) session = self._db_session_class() try: @@ -144,29 +143,14 @@ def run(self): logger.error('Handle table rows count: %s' % e) finally: session.close() - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/tasks/metrics.py b/dtable_events/tasks/metrics.py index 623be9a5..8b572bc3 100644 --- a/dtable_events/tasks/metrics.py +++ b/dtable_events/tasks/metrics.py @@ -25,7 +25,6 @@ def __init__(self): self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, health_check_interval=30, retry_on_timeout=True) self._pubsub_channel_name = METRIC_CHANNEL_NAME - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 def run(self): @@ -34,13 +33,13 @@ def run(self): return subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() metric_data = json.loads(message['data']) try: component_name = metric_data.get('component_name') @@ -55,29 +54,16 @@ def run(self): except Exception as e: logging.error('Error when handling metric data: %s' % e) else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logging.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time class MetricSaver(Thread): diff --git a/dtable_events/tasks/universal_app_auto_buckup.py b/dtable_events/tasks/universal_app_auto_buckup.py index 7c415e5a..a1ad578d 100644 --- a/dtable_events/tasks/universal_app_auto_buckup.py +++ b/dtable_events/tasks/universal_app_auto_buckup.py @@ -25,7 +25,6 @@ def __init__(self): health_check_interval=30, retry_on_timeout=True) self._lock = Lock() self._pubsub_channel_name = 'universal-app-auto-backup' - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 def create_snapshot(self, session, app_id, app_version, app_config): @@ -143,13 +142,13 @@ def run(self): logger.info('Starting universal app auto backup thread') subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() msg = json.loads(message['data']) user_name = msg.get('username', '') app_id = msg.get('app_id', '') @@ -170,26 +169,13 @@ def run(self): finally: session.close() else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time diff --git a/dtable_events/webhook/webhook.py b/dtable_events/webhook/webhook.py index f582822a..735c2a2b 100644 --- a/dtable_events/webhook/webhook.py +++ b/dtable_events/webhook/webhook.py @@ -33,7 +33,6 @@ def __init__(self): health_check_interval=30, retry_on_timeout=True) self.job_queue = Queue() self._pubsub_channel_name = 'table-events' - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 def start(self): @@ -46,13 +45,13 @@ def add_jobs(self): """all events from redis are kind of update so far""" subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while True: try: message = subscriber.get_message() if message is not None: if message['type'] != 'message': continue + last_pubsub_message_time = time.time() try: data = json.loads(message['data']) except Exception as e: @@ -74,32 +73,17 @@ def add_jobs(self): logger.error('add jobs error: %s' % e) finally: session.close() - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time def invalidate_webhook(self, webhook_id, db_session): sql = "UPDATE webhooks SET is_valid=0 WHERE id=:webhook_id" diff --git a/dtable_events/workflow/workflow_actions.py b/dtable_events/workflow/workflow_actions.py index 36dc1170..0f7ea024 100644 --- a/dtable_events/workflow/workflow_actions.py +++ b/dtable_events/workflow/workflow_actions.py @@ -170,14 +170,12 @@ def __init__(self): self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, health_check_interval=30, retry_on_timeout=True) self._pubsub_channel_name = 'workflow-actions' - self._pubsub_health_check_interval = 30 self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle workflow actions...') subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time while not self._finished.is_set(): try: @@ -185,6 +183,7 @@ def run(self): if message is not None: if message.get('type') != 'message': continue + last_pubsub_message_time = time.time() sub_data = json.loads(message['data']) session = self._db_session_class() task_id = sub_data['task_id'] @@ -197,26 +196,13 @@ def run(self): finally: session.close() else: - now = time.time() - if now - last_pubsub_health_check_time >= self._pubsub_health_check_interval: - last_pubsub_health_check_time = now - try: - subscriber.ping() - except Exception as e: - subscriber = self._redis_client.refresh_subscriber( - subscriber, self._pubsub_channel_name, 'health check failed: %s' % e) - last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time - continue if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: subscriber = self._redis_client.refresh_subscriber( subscriber, self._pubsub_channel_name, 'no message timeout') last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time continue time.sleep(0.5) except Exception as e: logger.error('redis pubsub receive error: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) last_pubsub_message_time = time.time() - last_pubsub_health_check_time = last_pubsub_message_time From 5a0dbfe00c02975622d54b43d612067ea888f85c Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Thu, 23 Apr 2026 12:07:28 +0800 Subject: [PATCH 3/7] add success subscribe log --- dtable_events/app/event_redis.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dtable_events/app/event_redis.py b/dtable_events/app/event_redis.py index 36e98013..95051ef2 100644 --- a/dtable_events/app/event_redis.py +++ b/dtable_events/app/event_redis.py @@ -61,6 +61,7 @@ def get_subscriber(self, channel_name): try: subscriber = self.connection.pubsub(ignore_subscribe_messages=True) subscriber.subscribe(channel_name) + logger.info('redis pubsub success, success subscribe %s', channel_name) except redis.AuthenticationError as e: logger.critical('connect to redis auth error: %s', e) raise e From 04c58430349701e54130ea85cf634660d1a78620 Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Thu, 23 Apr 2026 13:49:02 +0800 Subject: [PATCH 4/7] update --- dtable_events/api_calls/api_calls_counter.py | 2 +- dtable_events/app/event_redis.py | 39 ++++++++------------ dtable_events/app/stats_sender.py | 2 +- dtable_events/tasks/metrics.py | 3 -- 4 files changed, 17 insertions(+), 29 deletions(-) diff --git a/dtable_events/api_calls/api_calls_counter.py b/dtable_events/api_calls/api_calls_counter.py index 9064283e..119c17b9 100644 --- a/dtable_events/api_calls/api_calls_counter.py +++ b/dtable_events/api_calls/api_calls_counter.py @@ -198,7 +198,7 @@ def timed_job(): session.close() try: - self._redis_client.connection.publish('exceed_api_quota', json.dumps({'changed': True})) + self._redis_client.publish('exceed_api_quota', json.dumps({'changed': True})) except Exception as e: logger.exception('publish exceed_api_quota error: %s', e) diff --git a/dtable_events/app/event_redis.py b/dtable_events/app/event_redis.py index 95051ef2..c66172c6 100644 --- a/dtable_events/app/event_redis.py +++ b/dtable_events/app/event_redis.py @@ -16,10 +16,10 @@ class RedisClient(object): def __init__(self, socket_connect_timeout=30, socket_timeout=None, health_check_interval=None, retry_on_timeout=None): - self._host = '127.0.0.1' - self._port = 6379 - self._password = None - self._parse_config() + self._host = REDIS_HOST + self._port = REDIS_PORT + self._password = REDIS_PASSWORD + self._connection_kwargs = { 'host': self._host, 'port': self._port, @@ -37,29 +37,20 @@ def __init__(self, socket_connect_timeout=30, socket_timeout=None, By default, each Redis instance created will in turn create its own connection pool. Every caller using redis client will has it's own pool with config caller passed. """ - self.connection = self._build_connection() - - def _build_connection(self): - return redis.Redis(**self._connection_kwargs) + self._redis = redis.Redis(**self._connection_kwargs) def reconnect(self): try: - self.connection.connection_pool.disconnect() + self._redis.connection_pool.disconnect() except Exception: pass - self.connection = self._build_connection() - return self.connection - - def _parse_config(self): - - self._host = REDIS_HOST - self._port = REDIS_PORT - self._password = REDIS_PASSWORD + self._redis = redis.Redis(**self._connection_kwargs) + return self._redis def get_subscriber(self, channel_name): while True: try: - subscriber = self.connection.pubsub(ignore_subscribe_messages=True) + subscriber = self._redis.pubsub(ignore_subscribe_messages=True) subscriber.subscribe(channel_name) logger.info('redis pubsub success, success subscribe %s', channel_name) except redis.AuthenticationError as e: @@ -89,24 +80,24 @@ def refresh_subscriber(self, subscriber, pubsub_channel_name, reason='unknown'): return self.get_subscriber(pubsub_channel_name) def get(self, key): - return self.connection.get(key) + return self._redis.get(key) def set(self, key, value, timeout=None): if not timeout: - return self.connection.set(key, value) + return self._redis.set(key, value) else: - return self.connection.setex(key, timeout, value) + return self._redis.setex(key, timeout, value) def delete(self, key): - return self.connection.delete(key) + return self._redis.delete(key) def publish(self, channel_name, message): try: - return self.connection.publish(channel_name, message) + return self._redis.publish(channel_name, message) except Exception as e: logger.warning('redis publish failed on %s: %s', channel_name, e) self.reconnect() - return self.connection.publish(channel_name, message) + return self._redis.publish(channel_name, message) class RedisCache(object): diff --git a/dtable_events/app/stats_sender.py b/dtable_events/app/stats_sender.py index 6c98582c..a7ebb786 100644 --- a/dtable_events/app/stats_sender.py +++ b/dtable_events/app/stats_sender.py @@ -16,7 +16,7 @@ def __init__(self): def send(self, channel: str, info: dict): try: - self._redis_client.connection.publish(channel, json.dumps(info)) + self._redis_client.publish(channel, json.dumps(info)) except Exception as e: logger.warning('send info to channel: %s error: %s', channel, e) diff --git a/dtable_events/tasks/metrics.py b/dtable_events/tasks/metrics.py index 8b572bc3..99d3c3cd 100644 --- a/dtable_events/tasks/metrics.py +++ b/dtable_events/tasks/metrics.py @@ -28,9 +28,6 @@ def __init__(self): self._pubsub_no_message_timeout = 5 * 60 def run(self): - if not self._redis_client.connection: - logging.warning('Redis connection is not established.') - return subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) last_pubsub_message_time = time.time() while not self._finished.is_set(): From ae0d0698bb0d800758e9bf2ac7d73718e0dd6da8 Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Thu, 23 Apr 2026 14:04:06 +0800 Subject: [PATCH 5/7] cache roles in redis timeout 600 --- .../automations/automations_stats_manager.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dtable_events/automations/automations_stats_manager.py b/dtable_events/automations/automations_stats_manager.py index 97712852..53a5f79e 100644 --- a/dtable_events/automations/automations_stats_manager.py +++ b/dtable_events/automations/automations_stats_manager.py @@ -9,6 +9,7 @@ from dtable_events.utils import get_dtable_admins from dtable_events.app.config import SEATABLE_MYSQL_DB_CCNET_DB_NAME, DTABLE_WEB_SERVICE_URL, ORG_MEMBER_QUOTA_DEFAULT +from dtable_events.app.event_redis import redis_cache from dtable_events.automations.actions import AutomationResult from dtable_events.utils.dtable_web_api import DTableWebAPI @@ -17,15 +18,19 @@ class AutomationsStatsManager: def __init__(self): self.dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL) - self.roles = None + self.roles_cache_key = 'DTABLE_WEB_ROLES' + self.roles_cache_timeout = 600 self.ccnet_db_name = SEATABLE_MYSQL_DB_CCNET_DB_NAME def get_roles(self): - if self.roles: - return self.roles - self.roles = self.dtable_web_api.internal_roles() - return self.roles + roles_json = redis_cache.get(self.roles_cache_key) + if not roles_json: + roles = self.dtable_web_api.internal_roles() + roles_json = json.dumps(roles) + redis_cache.set(self.roles_cache_key, roles_json, timeout=self.roles_cache_timeout) + return roles + return json.loads(roles_json) def get_user_quota(self, db_session, username): sql = "SELECT username, monthly_automation_limit_per_user FROM user_quota WHERE username=:username" From 113f7ce4d0eabdf619ecf2bf3f9e887ef87203a5 Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Thu, 23 Apr 2026 14:42:22 +0800 Subject: [PATCH 6/7] update --- dtable_events/statistics/counter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dtable_events/statistics/counter.py b/dtable_events/statistics/counter.py index 826f41ba..b874bd82 100644 --- a/dtable_events/statistics/counter.py +++ b/dtable_events/statistics/counter.py @@ -32,7 +32,7 @@ def run(self): if message is not None: if message.get('type') != 'message': continue - last_pubsub_message_time = last_pubsub_message_time() + last_pubsub_message_time = time.time() msg = json.loads(message['data']) session = self._db_session_class() try: From 788c7c67f9c308337c8f76967fc29f208d1f7360 Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Fri, 24 Apr 2026 14:15:22 +0800 Subject: [PATCH 7/7] change some log message level --- dtable_events/app/event_redis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dtable_events/app/event_redis.py b/dtable_events/app/event_redis.py index c66172c6..7757b7bb 100644 --- a/dtable_events/app/event_redis.py +++ b/dtable_events/app/event_redis.py @@ -71,12 +71,12 @@ def close_subscriber(self, subscriber): logger.debug('close redis subscriber failed: %s', e) def refresh_subscriber(self, subscriber, pubsub_channel_name, reason='unknown'): - logger.warning('reconnect redis pubsub channel=%s reason=%s', pubsub_channel_name, reason) + logger.info('reconnect redis pubsub channel=%s reason=%s', pubsub_channel_name, reason) self.close_subscriber(subscriber) try: self.reconnect() except Exception as e: - logger.warning('redis reconnect failed channel=%s error=%s', pubsub_channel_name, e) + logger.error('redis reconnect failed channel=%s error=%s', pubsub_channel_name, e) return self.get_subscriber(pubsub_channel_name) def get(self, key):