Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions dtable_events/activities/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,23 @@ def __init__(self, app):
Thread.__init__(self)
self._finished = Event()
self._db_session_class = init_db_session_class()
self._redis_client = RedisClient()
self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5,
health_check_interval=30, retry_on_timeout=True)
self.app = app
self._pubsub_channel_name = 'table-events'
self._pubsub_no_message_timeout = 5 * 60

def run(self):
logger.info('Starting handle table activities...')
subscriber = self._redis_client.get_subscriber('table-events')

subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name)
last_pubsub_message_time = time.time()
while not self._finished.is_set():
try:
message = subscriber.get_message()
if message is not None:
if message.get('type') != 'message':
continue
last_pubsub_message_time = time.time()
event = json.loads(message['data'])
if event['op_type'] not in self.SUPPORT_OPERATION_TYPES:
continue
Expand All @@ -56,7 +62,13 @@ def run(self):
finally:
session.close()
else:
if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout:
subscriber = self._redis_client.refresh_subscriber(
subscriber, self._pubsub_channel_name, 'no message timeout')
last_pubsub_message_time = time.time()
continue
time.sleep(0.5)
except Exception as e:
logger.error('Failed get message from redis: %s' % e)
subscriber = self._redis_client.get_subscriber('table-events')
logger.error('redis pubsub receive error: %s', e)
subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e))
last_pubsub_message_time = time.time()
25 changes: 19 additions & 6 deletions dtable_events/api_calls/api_calls_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
logger = logging.getLogger(__name__)


class APICallsCounter:
class APICallsCounter(object):
def __init__(self):
self._finished = Event()
self._db_session_class = init_db_session_class()
self._redis_client = RedisClient()
self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5,
health_check_interval=30, retry_on_timeout=True)
self.keep_months = 3 # including this month
self._pubsub_channel_name = 'stats_api_calls'
self._pubsub_no_message_timeout = 5 * 60

def count_api_gateway(self, info, db_session):
try:
Expand Down Expand Up @@ -126,12 +129,16 @@ def count_api_calls(self, info, db_session):

def count(self):
logger.info('Starting count api calls...')
subscriber = self._redis_client.get_subscriber('stats_api_calls')
subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name)
last_pubsub_message_time = time.time()

while not self._finished.is_set():
try:
message = subscriber.get_message()
if message is not None:
if message.get('type') != 'message':
continue
last_pubsub_message_time = time.time()
msg = json.loads(message['data'])
session = self._db_session_class()
try:
Expand All @@ -141,10 +148,16 @@ def count(self):
finally:
session.close()
else:
if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout:
subscriber = self._redis_client.refresh_subscriber(
subscriber, self._pubsub_channel_name, 'no message timeout')
last_pubsub_message_time = time.time()
continue
time.sleep(0.5)
except Exception as e:
logger.error('Failed get message from redis: %s' % e)
subscriber = self._redis_client.get_subscriber('stats_api_calls')
logger.error('redis pubsub receive error: %s', e)
subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e))
last_pubsub_message_time = time.time()

def clean(self):
logger.info('Starting schedule clean api calls...')
Expand Down Expand Up @@ -185,7 +198,7 @@ def timed_job():
session.close()

try:
self._redis_client.connection.publish('exceed_api_quota', json.dumps({'changed': True}))
self._redis_client.publish('exceed_api_quota', json.dumps({'changed': True}))
except Exception as e:
logger.exception('publish exceed_api_quota error: %s', e)

Expand Down
76 changes: 55 additions & 21 deletions dtable_events/app/event_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,45 @@

class RedisClient(object):

def __init__(self, socket_connect_timeout=30, socket_timeout=None):
self._host = '127.0.0.1'
self._port = 6379
self._password = None
self._parse_config()
def __init__(self, socket_connect_timeout=30, socket_timeout=None,
health_check_interval=None, retry_on_timeout=None):
self._host = REDIS_HOST
self._port = REDIS_PORT
self._password = REDIS_PASSWORD

self._connection_kwargs = {
'host': self._host,
'port': self._port,
'password': self._password,
'socket_timeout': socket_timeout,
'socket_connect_timeout': socket_connect_timeout,
'decode_responses': True,
}
if health_check_interval is not None:
self._connection_kwargs['health_check_interval'] = health_check_interval
if retry_on_timeout is not None:
self._connection_kwargs['retry_on_timeout'] = retry_on_timeout

"""
By default, each Redis instance created will in turn create its own connection pool.
Every caller using redis client will has it's own pool with config caller passed.
"""
self.connection = redis.Redis(
host=self._host, port=self._port, password=self._password,
socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout,
decode_responses=True
)

def _parse_config(self):
self._redis = redis.Redis(**self._connection_kwargs)

self._host = REDIS_HOST
self._port = REDIS_PORT
self._password = REDIS_PASSWORD
def reconnect(self):
try:
self._redis.connection_pool.disconnect()
except Exception:
pass
self._redis = redis.Redis(**self._connection_kwargs)
return self._redis

def get_subscriber(self, channel_name):
while True:
try:
subscriber = self.connection.pubsub(ignore_subscribe_messages=True)
subscriber = self._redis.pubsub(ignore_subscribe_messages=True)
subscriber.subscribe(channel_name)
logger.info('redis pubsub success, success subscribe %s', channel_name)
except redis.AuthenticationError as e:
logger.critical('connect to redis auth error: %s', e)
raise e
Expand All @@ -50,20 +62,42 @@ def get_subscriber(self, channel_name):
else:
return subscriber

def close_subscriber(self, subscriber):
if not subscriber:
return
try:
subscriber.close()
except Exception as e:
logger.debug('close redis subscriber failed: %s', e)

def refresh_subscriber(self, subscriber, pubsub_channel_name, reason='unknown'):
logger.info('reconnect redis pubsub channel=%s reason=%s', pubsub_channel_name, reason)
self.close_subscriber(subscriber)
try:
self.reconnect()
except Exception as e:
logger.error('redis reconnect failed channel=%s error=%s', pubsub_channel_name, e)
return self.get_subscriber(pubsub_channel_name)

def get(self, key):
return self.connection.get(key)
return self._redis.get(key)

def set(self, key, value, timeout=None):
if not timeout:
return self.connection.set(key, value)
return self._redis.set(key, value)
else:
return self.connection.setex(key, timeout, value)
return self._redis.setex(key, timeout, value)

def delete(self, key):
return self.connection.delete(key)
return self._redis.delete(key)

def publish(self, channel_name, message):
return self.connection.publish(channel_name, message)
try:
return self._redis.publish(channel_name, message)
except Exception as e:
logger.warning('redis publish failed on %s: %s', channel_name, e)
self.reconnect()
return self._redis.publish(channel_name, message)


class RedisCache(object):
Expand Down
2 changes: 1 addition & 1 deletion dtable_events/app/stats_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self):

def send(self, channel: str, info: dict):
try:
self._redis_client.connection.publish(channel, json.dumps(info))
self._redis_client.publish(channel, json.dumps(info))
except Exception as e:
logger.warning('send info to channel: %s error: %s', channel, e)

Expand Down
29 changes: 17 additions & 12 deletions dtable_events/automations/automations_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,24 @@ def get_percent(self, owner, org_id, workers):
return self.counters.get(limit_key, 0) / (self.window_secs * workers)


class AutomationsPipeline:
class AutomationsPipeline(object):

def __init__(self):
self.workers = 5
self.automations_queue: Queue[AutomationRule] = Queue()
self.results_queue: Queue[AutomationResult] = Queue()
self._pubsub_no_message_timeout = 5 * 60

self._db_session_class = init_db_session_class()

self._redis_client = RedisClient(socket_timeout=10)
self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=10,
health_check_interval=30, retry_on_timeout=True)
self.per_update_channel = 'automation-rule-triggered'

self.rate_limiter = RateLimiter()

self.automations_stats_manager = AutomationsStatsManager()

self.log_none_message_timeout = 10 * 60

# metrics
self.realtime_trigger_count = 0
self.scheduled_trigger_count = 0
Expand Down Expand Up @@ -143,15 +143,17 @@ def get_automation_rule(self, db_session, event_data):
def receive(self):
auto_rule_logger.info(f"Start to receive automation event from redis, window seconds {self.rate_limiter.window_secs} limit percent {self.rate_limiter.percent}")
subscriber = self._redis_client.get_subscriber(self.per_update_channel)
last_message_time = datetime.now()
last_pubsub_message_time = time.time()
while True:
try:
message = subscriber.get_message()
self.realtime_automation_heartbeat = time.time()
if message is not None:
if message.get('type') != 'message':
continue
last_pubsub_message_time = time.time()
event = json.loads(message['data'])
auto_rule_logger.info(f"subscribe event {event}")
last_message_time = datetime.now()

db_session = self._db_session_class()
try:
Expand Down Expand Up @@ -192,14 +194,17 @@ def receive(self):
finally:
db_session.close()
else:
if (datetime.now() - last_message_time).seconds >= self.log_none_message_timeout:
auto_rule_logger.info(f'No message for {self.log_none_message_timeout}s...')
last_message_time = datetime.now()
if time.time() - last_pubsub_message_time >= self._pubsub_no_message_timeout:
auto_rule_logger.info('no automation message for %ss', self._pubsub_no_message_timeout)
subscriber = self._redis_client.refresh_subscriber(
subscriber, self.per_update_channel, 'no message timeout')
last_pubsub_message_time = time.time()
continue
time.sleep(0.5)
except Exception as e:
auto_rule_logger.exception('Failed get automation rules message from redis: %s' % e)
subscriber = self._redis_client.get_subscriber('automation-rule-triggered')
last_message_time = datetime.now()
auto_rule_logger.error('redis pubsub receive error: %s', e)
subscriber = self._redis_client.refresh_subscriber(subscriber, self.per_update_channel, str(e))
last_pubsub_message_time = time.time()

def worker(self):
while True:
Expand Down
15 changes: 10 additions & 5 deletions dtable_events/automations/automations_stats_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dtable_events.utils import get_dtable_admins

from dtable_events.app.config import SEATABLE_MYSQL_DB_CCNET_DB_NAME, DTABLE_WEB_SERVICE_URL, ORG_MEMBER_QUOTA_DEFAULT
from dtable_events.app.event_redis import redis_cache
from dtable_events.automations.actions import AutomationResult
from dtable_events.utils.dtable_web_api import DTableWebAPI

Expand All @@ -17,15 +18,19 @@ class AutomationsStatsManager:

def __init__(self):
self.dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL)
self.roles = None
self.roles_cache_key = 'DTABLE_WEB_ROLES'
self.roles_cache_timeout = 600

self.ccnet_db_name = SEATABLE_MYSQL_DB_CCNET_DB_NAME

def get_roles(self):
if self.roles:
return self.roles
self.roles = self.dtable_web_api.internal_roles()
return self.roles
roles_json = redis_cache.get(self.roles_cache_key)
if not roles_json:
roles = self.dtable_web_api.internal_roles()
roles_json = json.dumps(roles)
redis_cache.set(self.roles_cache_key, roles_json, timeout=self.roles_cache_timeout)
return roles
return json.loads(roles_json)

def get_user_quota(self, db_session, username):
sql = "SELECT username, monthly_automation_limit_per_user FROM user_quota WHERE username=:username"
Expand Down
22 changes: 17 additions & 5 deletions dtable_events/notification_rules/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@ def __init__(self):
Thread.__init__(self)
self._finished = Event()
self._db_session_class = init_db_session_class()
self._redis_client = RedisClient()
self._redis_client = RedisClient(socket_connect_timeout=5, socket_timeout=5,
health_check_interval=30, retry_on_timeout=True)
self._pubsub_channel_name = 'notification-rule-triggered'
self._pubsub_no_message_timeout = 5 * 60

def run(self):
logger.info('Starting handle notification rules...')
subscriber = self._redis_client.get_subscriber('notification-rule-triggered')

subscriber = self._redis_client.get_subscriber(self._pubsub_channel_name)
last_pubsub_message_time = time.time()
while not self._finished.is_set():
try:
message = subscriber.get_message()
if message is not None:
if message.get('type') != 'message':
continue
last_pubsub_message_time = time.time()
event = json.loads(message['data'])
session = self._db_session_class()
try:
Expand All @@ -34,7 +40,13 @@ def run(self):
finally:
session.close()
else:
if (time.time() - last_pubsub_message_time) >= self._pubsub_no_message_timeout:
subscriber = self._redis_client.refresh_subscriber(
subscriber, self._pubsub_channel_name, 'no message timeout')
last_pubsub_message_time = time.time()
continue
time.sleep(0.5)
except Exception as e:
logger.error('Failed get notification rules message from redis: %s' % e)
subscriber = self._redis_client.get_subscriber('notification-rule-triggered')
logger.error('redis pubsub receive error: %s', e)
subscriber = self._redis_client.refresh_subscriber(subscriber, self._pubsub_channel_name, str(e))
last_pubsub_message_time = time.time()
Loading
Loading