diff --git a/dtable_events/activities/handlers.py b/dtable_events/activities/handlers.py index 5d7511bd..3c0c9f22 100644 --- a/dtable_events/activities/handlers.py +++ b/dtable_events/activities/handlers.py @@ -30,17 +30,23 @@ def __init__(self, app): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self.app = app + self._pubsub_channel_name = 'table-events' + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle table activities...') - subscriber = self._redis_client.get_subscriber('table-events') - + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() event = json.loads(message['data']) if event['op_type'] not in self.SUPPORT_OPERATION_TYPES: continue @@ -56,7 +62,13 @@ def run(self): finally: session.close() else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('table-events') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() diff --git a/dtable_events/api_calls/api_calls_counter.py b/dtable_events/api_calls/api_calls_counter.py index c66950c9..119c17b9 100644 --- a/dtable_events/api_calls/api_calls_counter.py +++ b/dtable_events/api_calls/api_calls_counter.py @@ -19,12 +19,15 @@ logger = logging.getLogger(__name__) -class APICallsCounter: +class APICallsCounter(object): def __init__(self): self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self.keep_months = 3 # including this month + self._pubsub_channel_name = 'stats_api_calls' + self._pubsub_no_message_timeout = 5 * 60 def count_api_gateway(self, info, db_session): try: @@ -126,12 +129,16 @@ def count_api_calls(self, info, db_session): def count(self): logger.info('Starting count api calls...') - subscriber = self._redis_client.get_subscriber('stats_api_calls') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() msg = json.loads(message['data']) session = self._db_session_class() try: @@ -141,10 +148,16 @@ def count(self): finally: session.close() else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('stats_api_calls') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() def clean(self): logger.info('Starting schedule clean api calls...') @@ -185,7 +198,7 @@ def timed_job(): session.close() try: - self._redis_client.connection.publish('exceed_api_quota', json.dumps({'changed': True})) + self._redis_client.publish('exceed_api_quota', json.dumps({'changed': True})) except Exception as e: logger.exception('publish exceed_api_quota error: %s', e) diff --git a/dtable_events/app/event_redis.py b/dtable_events/app/event_redis.py index 5dbe73d7..7757b7bb 100644 --- a/dtable_events/app/event_redis.py +++ b/dtable_events/app/event_redis.py @@ -14,33 +14,45 @@ class RedisClient(object): - def __init__(self, socket_connect_timeout=30, socket_timeout=None): - self._host = '127.0.0.1' - self._port = 6379 - self._password = None - self._parse_config() + def __init__(self, socket_connect_timeout=30, socket_timeout=None, + health_check_interval=None, retry_on_timeout=None): + self._host = REDIS_HOST + self._port = REDIS_PORT + self._password = REDIS_PASSWORD + + self._connection_kwargs = { + 'host': self._host, + 'port': self._port, + 'password': self._password, + 'socket_timeout': socket_timeout, + 'socket_connect_timeout': socket_connect_timeout, + 'decode_responses': True, + } + if health_check_interval is not None: + self._connection_kwargs['health_check_interval'] = health_check_interval + if retry_on_timeout is not None: + self._connection_kwargs['retry_on_timeout'] = retry_on_timeout """ By default, each Redis instance created will in turn create its own connection pool. Every caller using redis client will has it's own pool with config caller passed. """ - self.connection = redis.Redis( - host=self._host, port=self._port, password=self._password, - socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout, - decode_responses=True - ) - - def _parse_config(self): + self._redis = redis.Redis(**self._connection_kwargs) - self._host = REDIS_HOST - self._port = REDIS_PORT - self._password = REDIS_PASSWORD + def reconnect(self): + try: + self._redis.connection_pool.disconnect() + except Exception: + pass + self._redis = redis.Redis(**self._connection_kwargs) + return self._redis def get_subscriber(self, channel_name): while True: try: - subscriber = self.connection.pubsub(ignore_subscribe_messages=True) + subscriber = self._redis.pubsub(ignore_subscribe_messages=True) subscriber.subscribe(channel_name) + logger.info('redis pubsub success, success subscribe %s', channel_name) except redis.AuthenticationError as e: logger.critical('connect to redis auth error: %s', e) raise e @@ -50,20 +62,42 @@ def get_subscriber(self, channel_name): else: return subscriber + def close_subscriber(self, subscriber): + if not subscriber: + return + try: + subscriber.close() + except Exception as e: + logger.debug('close redis subscriber failed: %s', e) + + def refresh_subscriber(self, subscriber, pubsub_channel_name, reason='unknown'): + logger.info('reconnect redis pubsub channel=%s reason=%s', pubsub_channel_name, reason) + self.close_subscriber(subscriber) + try: + self.reconnect() + except Exception as e: + logger.error('redis reconnect failed channel=%s error=%s', pubsub_channel_name, e) + return self.get_subscriber(pubsub_channel_name) + def get(self, key): - return self.connection.get(key) + return self._redis.get(key) def set(self, key, value, timeout=None): if not timeout: - return self.connection.set(key, value) + return self._redis.set(key, value) else: - return self.connection.setex(key, timeout, value) + return self._redis.setex(key, timeout, value) def delete(self, key): - return self.connection.delete(key) + return self._redis.delete(key) def publish(self, channel_name, message): - return self.connection.publish(channel_name, message) + try: + return self._redis.publish(channel_name, message) + except Exception as e: + logger.warning('redis publish failed on %s: %s', channel_name, e) + self.reconnect() + return self._redis.publish(channel_name, message) class RedisCache(object): diff --git a/dtable_events/app/stats_sender.py b/dtable_events/app/stats_sender.py index 6c98582c..a7ebb786 100644 --- a/dtable_events/app/stats_sender.py +++ b/dtable_events/app/stats_sender.py @@ -16,7 +16,7 @@ def __init__(self): def send(self, channel: str, info: dict): try: - self._redis_client.connection.publish(channel, json.dumps(info)) + self._redis_client.publish(channel, json.dumps(info)) except Exception as e: logger.warning('send info to channel: %s error: %s', channel, e) diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index a7159a2c..f5b8f059 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -65,24 +65,24 @@ def get_percent(self, owner, org_id, workers): return self.counters.get(limit_key, 0) / (self.window_secs * workers) -class AutomationsPipeline: +class AutomationsPipeline(object): def __init__(self): self.workers = 5 self.automations_queue: Queue[AutomationRule] = Queue() self.results_queue: Queue[AutomationResult] = Queue() + self._pubsub_no_message_timeout = 5 * 60 self._db_session_class = init_db_session_class() - self._redis_client = RedisClient(socket_timeout=10) + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=10, + health_check_interval=30, retry_on_timeout=True) self.per_update_channel = 'automation-rule-triggered' self.rate_limiter = RateLimiter() self.automations_stats_manager = AutomationsStatsManager() - self.log_none_message_timeout = 10 * 60 - # metrics self.realtime_trigger_count = 0 self.scheduled_trigger_count = 0 @@ -143,15 +143,17 @@ def get_automation_rule(self, db_session, event_data): def receive(self): auto_rule_logger.info(f"Start to receive automation event from redis, window seconds {self.rate_limiter.window_secs} limit percent {self.rate_limiter.percent}") subscriber = self._redis_client.get_subscriber(self.per_update_channel) - last_message_time = datetime.now() + last_pubsub_message_time = time.time() while True: try: message = subscriber.get_message() self.realtime_automation_heartbeat = time.time() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() event = json.loads(message['data']) auto_rule_logger.info(f"subscribe event {event}") - last_message_time = datetime.now() db_session = self._db_session_class() try: @@ -192,14 +194,17 @@ def receive(self): finally: db_session.close() else: - if (datetime.now() - last_message_time).seconds >= self.log_none_message_timeout: - auto_rule_logger.info(f'No message for {self.log_none_message_timeout}s...') - last_message_time = datetime.now() + if time.time() - last_pubsub_message_time >= self._pubsub_no_message_timeout: + auto_rule_logger.info('no automation message for %ss', self._pubsub_no_message_timeout) + subscriber = self._redis_client.refresh_subscriber( + subscriber, self.per_update_channel, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - auto_rule_logger.exception('Failed get automation rules message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('automation-rule-triggered') - last_message_time = datetime.now() + auto_rule_logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self.per_update_channel, str(e)) + last_pubsub_message_time = time.time() def worker(self): while True: diff --git a/dtable_events/automations/automations_stats_manager.py b/dtable_events/automations/automations_stats_manager.py index 97712852..53a5f79e 100644 --- a/dtable_events/automations/automations_stats_manager.py +++ b/dtable_events/automations/automations_stats_manager.py @@ -9,6 +9,7 @@ from dtable_events.utils import get_dtable_admins from dtable_events.app.config import SEATABLE_MYSQL_DB_CCNET_DB_NAME, DTABLE_WEB_SERVICE_URL, ORG_MEMBER_QUOTA_DEFAULT +from dtable_events.app.event_redis import redis_cache from dtable_events.automations.actions import AutomationResult from dtable_events.utils.dtable_web_api import DTableWebAPI @@ -17,15 +18,19 @@ class AutomationsStatsManager: def __init__(self): self.dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL) - self.roles = None + self.roles_cache_key = 'DTABLE_WEB_ROLES' + self.roles_cache_timeout = 600 self.ccnet_db_name = SEATABLE_MYSQL_DB_CCNET_DB_NAME def get_roles(self): - if self.roles: - return self.roles - self.roles = self.dtable_web_api.internal_roles() - return self.roles + roles_json = redis_cache.get(self.roles_cache_key) + if not roles_json: + roles = self.dtable_web_api.internal_roles() + roles_json = json.dumps(roles) + redis_cache.set(self.roles_cache_key, roles_json, timeout=self.roles_cache_timeout) + return roles + return json.loads(roles_json) def get_user_quota(self, db_session, username): sql = "SELECT username, monthly_automation_limit_per_user FROM user_quota WHERE username=:username" diff --git a/dtable_events/notification_rules/handler.py b/dtable_events/notification_rules/handler.py index c0a529ed..74b065e3 100644 --- a/dtable_events/notification_rules/handler.py +++ b/dtable_events/notification_rules/handler.py @@ -15,16 +15,22 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = 'notification-rule-triggered' + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle notification rules...') - subscriber = self._redis_client.get_subscriber('notification-rule-triggered') - + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() event = json.loads(message['data']) session = self._db_session_class() try: @@ -34,7 +40,13 @@ def run(self): finally: session.close() else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get notification rules message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('notification-rule-triggered') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() diff --git a/dtable_events/statistics/counter.py b/dtable_events/statistics/counter.py index 890cb796..b874bd82 100644 --- a/dtable_events/statistics/counter.py +++ b/dtable_events/statistics/counter.py @@ -16,16 +16,23 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = 'user-activity-statistic' + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting count user activity...') - subscriber = self._redis_client.get_subscriber('user-activity-statistic') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() msg = json.loads(message['data']) session = self._db_session_class() try: @@ -35,7 +42,13 @@ def run(self): finally: session.close() else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('user-activity-statistic') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() diff --git a/dtable_events/tasks/ai_stats_worker.py b/dtable_events/tasks/ai_stats_worker.py index c7a6df71..b6561ba7 100644 --- a/dtable_events/tasks/ai_stats_worker.py +++ b/dtable_events/tasks/ai_stats_worker.py @@ -18,15 +18,17 @@ logger = logging.getLogger(__name__) -class AIStatsWorker: +class AIStatsWorker(object): def __init__(self): self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self.stats_lock = Lock() - self.channel = 'log_ai_model_usage' + self._pubsub_channel_name = 'log_ai_model_usage' self.keep_months = 3 self.owner_info_cache_timeout = 24 * 60 * 60 + self._pubsub_no_message_timeout = 5 * 60 self._parse_config() self.reset_stats() @@ -94,12 +96,16 @@ def save_to_memory(self, usage_info, session): def receive(self): logger.info('Starts to receive ai calls...') - subscriber = self._redis_client.get_subscriber(self.channel) + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while True: try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() try: usage_info = json.loads(message['data']) except: @@ -115,10 +121,16 @@ def receive(self): finally: session.close() else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber(self.channel) + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() def get_assistant_cache_key(self, assistant_uuid): return f'assistant:{assistant_uuid}:owner' diff --git a/dtable_events/tasks/dtable_real_time_rows_counter.py b/dtable_events/tasks/dtable_real_time_rows_counter.py index 9ee61d73..2296072c 100644 --- a/dtable_events/tasks/dtable_real_time_rows_counter.py +++ b/dtable_events/tasks/dtable_real_time_rows_counter.py @@ -118,16 +118,23 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = 'count-rows' + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle table rows count...') - subscriber = self._redis_client.get_subscriber('count-rows') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() dtable_uuids = json.loads(message['data']) session = self._db_session_class() try: @@ -137,7 +144,13 @@ def run(self): finally: session.close() else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('count-rows') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() diff --git a/dtable_events/tasks/metrics.py b/dtable_events/tasks/metrics.py index f20fa780..99d3c3cd 100644 --- a/dtable_events/tasks/metrics.py +++ b/dtable_events/tasks/metrics.py @@ -22,16 +22,21 @@ class MetricReceiver(Thread): def __init__(self): Thread.__init__(self) self._finished = Event() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = METRIC_CHANNEL_NAME + self._pubsub_no_message_timeout = 5 * 60 + def run(self): - if not self._redis_client.connection: - logging.warning('Redis connection is not established.') - return - subscriber = self._redis_client.get_subscriber(METRIC_CHANNEL_NAME) + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while not self._finished.is_set(): try: message = subscriber.get_message() if message: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() metric_data = json.loads(message['data']) try: component_name = metric_data.get('component_name') @@ -46,10 +51,16 @@ def run(self): except Exception as e: logging.error('Error when handling metric data: %s' % e) else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logging.error('Failed handle metric %s' % e) - subscriber = self._redis_client.get_subscriber(METRIC_CHANNEL_NAME) + logging.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() class MetricSaver(Thread): diff --git a/dtable_events/tasks/universal_app_auto_buckup.py b/dtable_events/tasks/universal_app_auto_buckup.py index 313fd995..a1ad578d 100644 --- a/dtable_events/tasks/universal_app_auto_buckup.py +++ b/dtable_events/tasks/universal_app_auto_buckup.py @@ -21,8 +21,11 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self._lock = Lock() + self._pubsub_channel_name = 'universal-app-auto-backup' + self._pubsub_no_message_timeout = 5 * 60 def create_snapshot(self, session, app_id, app_version, app_config): """ @@ -137,11 +140,15 @@ def should_auto_backup(self, session, app_id, app_version): def run(self): logger.info('Starting universal app auto backup thread') - subscriber = self._redis_client.get_subscriber('universal-app-auto-backup') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() msg = json.loads(message['data']) user_name = msg.get('username', '') app_id = msg.get('app_id', '') @@ -162,7 +169,13 @@ def run(self): finally: session.close() else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('universal-app-auto-backup') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() diff --git a/dtable_events/webhook/webhook.py b/dtable_events/webhook/webhook.py index 4bc2f1e1..735c2a2b 100644 --- a/dtable_events/webhook/webhook.py +++ b/dtable_events/webhook/webhook.py @@ -1,5 +1,6 @@ import json import logging +import time from datetime import datetime from threading import Thread from queue import Queue @@ -28,9 +29,11 @@ class Webhooker(object): """ def __init__(self): self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() - self._subscriber = self._redis_client.get_subscriber('table-events') + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) self.job_queue = Queue() + self._pubsub_channel_name = 'table-events' + self._pubsub_no_message_timeout = 5 * 60 def start(self): logger.info('Starting handle webhook jobs...') @@ -40,11 +43,15 @@ def start(self): def add_jobs(self): """all events from redis are kind of update so far""" + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while True: try: - for message in self._subscriber.listen(): + message = subscriber.get_message() + if message is not None: if message['type'] != 'message': continue + last_pubsub_message_time = time.time() try: data = json.loads(message['data']) except Exception as e: @@ -66,9 +73,17 @@ def add_jobs(self): logger.error('add jobs error: %s' % e) finally: session.close() + else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue + time.sleep(0.5) except Exception as e: - logger.error('webhook sub from redis error: %s', e) - self._subscriber = self._redis_client.get_subscriber('table-events') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time() def invalidate_webhook(self, webhook_id, db_session): sql = "UPDATE webhooks SET is_valid=0 WHERE id=:webhook_id" diff --git a/dtable_events/workflow/workflow_actions.py b/dtable_events/workflow/workflow_actions.py index 199295d7..0f7ea024 100644 --- a/dtable_events/workflow/workflow_actions.py +++ b/dtable_events/workflow/workflow_actions.py @@ -167,16 +167,23 @@ def __init__(self): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class() - self._redis_client = RedisClient() + self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5, + health_check_interval=30, retry_on_timeout=True) + self._pubsub_channel_name = 'workflow-actions' + self._pubsub_no_message_timeout = 5 * 60 def run(self): logger.info('Starting handle workflow actions...') - subscriber = self._redis_client.get_subscriber('workflow-actions') + subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name) + last_pubsub_message_time = time.time() while not self._finished.is_set(): try: message = subscriber.get_message() if message is not None: + if message.get('type') != 'message': + continue + last_pubsub_message_time = time.time() sub_data = json.loads(message['data']) session = self._db_session_class() task_id = sub_data['task_id'] @@ -189,7 +196,13 @@ def run(self): finally: session.close() else: + if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout: + subscriber = self._redis_client.refresh_subscriber( + subscriber, self._pubsub_channel_name, 'no message timeout') + last_pubsub_message_time = time.time() + continue time.sleep(0.5) except Exception as e: - logger.error('Failed get workflow-actions message: %s', e) - subscriber = self._redis_client.get_subscriber('workflow-actions') + logger.error('redis pubsub receive error: %s', e) + subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e)) + last_pubsub_message_time = time.time()