4343from .models import UserAgent , UserSession , Item , ItemAccess
4444
4545import logging
46+ import time
4647
4748
4849User = get_user_model ()
4950
5051
51- @celery_app .task (bind = True , name = _ ('Compute access ' ), timelimit = - 1 )
52+ @celery_app .task (bind = True , name = _ ('Parse logs ' ), timelimit = - 1 )
5253def task_parse_logs (self , collections = [], from_date = None , until_date = None , days_to_go_back = None , user_id = None , username = None ):
5354 """
5455 Parses log files associated with a given collection.
@@ -124,7 +125,15 @@ def task_parse_log(self, log_file_hash, user_id=None, username=None):
124125 return
125126
126127 log_parser , url_translator_manager = _setup_parsing_environment (log_file , robots_list , mmdb )
127- _process_lines (log_parser , url_translator_manager , log_file )
128+ success = _process_lines (log_parser , url_translator_manager , log_file )
129+
130+ if not success :
131+ logging .error (f'Failed to parse log file { log_file .path } .' )
132+ return
133+
134+ log_file .status = choices .LOG_FILE_STATUS_PROCESSED
135+ log_file .save ()
136+ logging .info (f'Log file { log_file .path } has been successfully parsed.' )
128137
129138
130139def _initialize_log_file (log_file_hash ):
@@ -212,7 +221,7 @@ def _load_metrics_objs_cache(log_file):
212221 log_file (LogFile): The log file being processed.
213222
214223 Returns:
215- dict: A cache containing items, user agents, user sessions, and item accesses .
224+ dict: A cache containing items and user agents .
216225 """
217226 logging .info (f'Loading metrics objects cache for { log_file .collection } ' )
218227 cache = {
@@ -234,26 +243,6 @@ def _load_metrics_objs_cache(log_file):
234243 cache ['user_agents' ][key ] = ua
235244 logging .info (f'Loaded { len (cache ["user_agents" ])} user agents' )
236245
237- date_str = log_file .validation .get ('probably_date' )
238- user_sessions_qs = UserSession .objects .filter (datetime__date = date_str ).select_related ('user_agent' )
239- for us in user_sessions_qs :
240- key = (us .datetime , us .user_agent_id , us .user_ip )
241- cache ['user_sessions' ][key ] = us
242- logging .info (f'Loaded { len (cache ["user_sessions" ])} user sessions for { date_str } ' )
243-
244- item_accesses_qs = ItemAccess .objects .filter (item__collection = log_file .collection )
245- for ia in item_accesses_qs :
246- key = (
247- ia .item_id ,
248- ia .user_session_id ,
249- ia .media_format ,
250- ia .media_language ,
251- ia .country_code ,
252- ia .content_type ,
253- )
254- cache ['item_accesses' ][key ] = ia
255- logging .info (f'Loaded { len (cache ["item_accesses" ])} item accesses for { log_file .collection } ' )
256-
257246 return cache
258247
259248
@@ -304,9 +293,7 @@ def _process_lines(lp, utm, log_file):
304293 if not _process_line (line , utm , log_file , cache ):
305294 continue
306295
307- logging .info (f'File { log_file .path } has been parsed.' )
308- log_file .status = choices .LOG_FILE_STATUS_PROCESSED
309- log_file .save ()
296+ return True
310297
311298
312299def _process_line (line , utm , log_file , cache ):
@@ -365,20 +352,19 @@ def _process_line(line, utm, log_file, cache):
365352 )
366353 return False
367354
368- _register_item_access (item_access_data , line , jou_id , art_id , cache )
355+ try :
356+ _register_item_access (item_access_data , line , jou_id , art_id , cache )
357+ except Exception as e :
358+ _log_discarded_line (log_file , line , tracker_choices .LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR , str (e ))
359+ return False
360+
369361 return True
370362
371363
372364def _register_item_access (item_access_data , line , jou_id , art_id , cache ):
373365 """
374366 Registers an item access in the database, creating necessary objects if they do not exist.
375-
376- Args:
377- item_access_data (dict): Data related to the item access, including collection, journal, article, media format, etc.
378- line (dict): The log line being processed.
379- jou_id (int): The ID of the journal.
380- art_id (int): The ID of the article.
381- cache (dict): A cache containing pre-fetched objects to avoid redundant database queries.
367+ Handles potential deadlocks by retrying on database errors.
382368 """
383369 col_acron3 = item_access_data .get ('collection' )
384370 media_format = item_access_data .get ('media_format' )
@@ -390,64 +376,109 @@ def _register_item_access(item_access_data, line, jou_id, art_id, cache):
390376 local_datetime = line .get ('local_datetime' )
391377 country_code = line .get ('country_code' )
392378 ip_address = line .get ('ip_address' )
393-
379+
394380 truncated_datetime = truncate_datetime_to_hour (local_datetime )
395381 if timezone .is_naive (truncated_datetime ):
396382 truncated_datetime = timezone .make_aware (truncated_datetime )
397383 ms_key = extract_minute_second_key (local_datetime )
398384
399- item_key = (col_acron3 , jou_id , art_id )
400- if item_key not in cache ['items' ]:
401- collection_obj = Collection .objects .get (acron3 = col_acron3 )
402- journal_obj = Journal .objects .get (id = jou_id )
403- article_obj = Article .objects .get (id = art_id )
404- it , _it = Item .objects .get_or_create (
405- collection = collection_obj ,
406- journal = journal_obj ,
407- article = article_obj ,
408- )
409- cache ['items' ][item_key ] = it
410- else :
411- it = cache ['items' ][item_key ]
385+ it = _get_or_create_item (col_acron3 , jou_id , art_id , cache )
386+ ua = _get_or_create_user_agent (client_name , client_version , cache )
387+ us = _get_or_create_user_session (truncated_datetime , ua , ip_address , cache )
388+ ita = _get_or_create_item_access (it , us , media_format , media_language , country_code , content_type , ms_key , cache )
412389
413- user_agent_key = (client_name , client_version )
414- if user_agent_key not in cache ['user_agents' ]:
415- ua , _ua = UserAgent .objects .get_or_create (
416- name = client_name ,
417- version = client_version
418- )
419- cache ['user_agents' ][user_agent_key ] = ua
420- else :
421- ua = cache ['user_agents' ][user_agent_key ]
390+ ita .click_timestamps [ms_key ] = ita .click_timestamps .get (ms_key , 0 ) + 1
391+ ita .save ()
422392
423- us_key = (truncated_datetime , ua .id , ip_address )
424- if us_key not in cache ['user_sessions' ]:
425- us , _us = UserSession .objects .get_or_create (
426- datetime = truncated_datetime ,
427- user_agent = ua ,
428- user_ip = ip_address
429- )
430- cache ['user_sessions' ][us_key ] = us
431- else :
432- us = cache ['user_sessions' ][us_key ]
433393
394+ def _get_or_create_item (col_acron3 , jou_id , art_id , cache , max_retries = 3 ):
395+ item_key = (col_acron3 , jou_id , art_id )
396+ for attempt in range (max_retries ):
397+ try :
398+ if item_key not in cache ['items' ]:
399+ collection_obj = Collection .objects .get (acron3 = col_acron3 )
400+ journal_obj = Journal .objects .get (id = jou_id )
401+ article_obj = Article .objects .get (id = art_id )
402+ it , _ = Item .objects .get_or_create (
403+ collection = collection_obj ,
404+ journal = journal_obj ,
405+ article = article_obj ,
406+ )
407+ cache ['items' ][item_key ] = it
408+ else :
409+ it = cache ['items' ][item_key ]
410+ return it
411+ except Exception as e :
412+ if attempt == max_retries - 1 :
413+ raise
414+ time .sleep (0.1 )
415+ return None
416+
417+
418+ def _get_or_create_user_agent (client_name , client_version , cache , max_retries = 3 ):
419+ user_agent_key = (client_name , client_version )
420+ for attempt in range (max_retries ):
421+ try :
422+ if user_agent_key not in cache ['user_agents' ]:
423+ ua , _ = UserAgent .objects .get_or_create (
424+ name = client_name ,
425+ version = client_version
426+ )
427+ cache ['user_agents' ][user_agent_key ] = ua
428+ else :
429+ ua = cache ['user_agents' ][user_agent_key ]
430+ return ua
431+ except Exception as e :
432+ if attempt == max_retries - 1 :
433+ raise
434+ time .sleep (0.1 )
435+ return None
436+
437+
438+ def _get_or_create_user_session (truncated_datetime , ua , ip_address , cache , max_retries = 3 ):
439+ us_key = (truncated_datetime , ua .id , ip_address )
440+ for attempt in range (max_retries ):
441+ try :
442+ if us_key not in cache ['user_sessions' ]:
443+ us , _ = UserSession .objects .get_or_create (
444+ datetime = truncated_datetime ,
445+ user_agent = ua ,
446+ user_ip = ip_address
447+ )
448+ cache ['user_sessions' ][us_key ] = us
449+ else :
450+ us = cache ['user_sessions' ][us_key ]
451+ return us
452+ except Exception as e :
453+ if attempt == max_retries - 1 :
454+ raise
455+ time .sleep (0.1 )
456+ return None
457+
458+
459+ def _get_or_create_item_access (it , us , media_format , media_language , country_code , content_type , ms_key , cache , max_retries = 3 ):
434460 item_access_key = (it .id , us .id , media_format , media_language , country_code , content_type )
435- if item_access_key not in cache ['item_accesses' ]:
436- ita , _ita = ItemAccess .objects .get_or_create (
437- item = it ,
438- user_session = us ,
439- media_format = media_format ,
440- media_language = media_language ,
441- country_code = country_code ,
442- content_type = content_type ,
443- click_timestamps = {ms_key : 1 }
444- )
445- cache ['item_accesses' ][item_access_key ] = ita
446- else :
447- ita = cache ['item_accesses' ][item_access_key ]
448-
449- ita .click_timestamps [ms_key ] = ita .click_timestamps .get (ms_key , 0 ) + 1
450- ita .save ()
461+ for attempt in range (max_retries ):
462+ try :
463+ if item_access_key not in cache ['item_accesses' ]:
464+ ita , _ = ItemAccess .objects .get_or_create (
465+ item = it ,
466+ user_session = us ,
467+ media_format = media_format ,
468+ media_language = media_language ,
469+ country_code = country_code ,
470+ content_type = content_type ,
471+ defaults = {'click_timestamps' : {ms_key : 1 }}
472+ )
473+ cache ['item_accesses' ][item_access_key ] = ita
474+ else :
475+ ita = cache ['item_accesses' ][item_access_key ]
476+ return ita
477+ except Exception as e :
478+ if attempt == max_retries - 1 :
479+ raise
480+ time .sleep (0.1 )
481+ return None
451482
452483
453484def _log_discarded_line (log_file , line , error_type , message ):
@@ -586,9 +617,12 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None,
586617 for collection in collections :
587618 logging .info (f'Computing metrics for collection { collection } from { from_date_str } to { until_date_str } ' )
588619
620+ clfdc_to_update = []
621+
589622 bulk_data = []
623+ metrics_result = compute_metrics_for_collection (collection , dates , replace , clfdc_to_update )
590624
591- for key , metric_data in compute_metrics_for_collection ( collection , dates , replace ) .items ():
625+ for key , metric_data in metrics_result .items ():
592626 bulk_data .append ({
593627 "_id" : key ,
594628 "_source" : metric_data ,
@@ -604,6 +638,7 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None,
604638 bulk_data = []
605639 except Exception as e :
606640 logging .error (f"Failed to send bulk metrics to Elasticsearch: { e } " )
641+ clfdc_to_update = []
607642
608643 if bulk_data :
609644 try :
@@ -614,9 +649,14 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None,
614649 )
615650 except Exception as e :
616651 logging .error (f"Failed to send remaining bulk metrics to Elasticsearch: { e } " )
652+ clfdc_to_update = []
653+
654+ for clfdc in clfdc_to_update :
655+ clfdc .is_usage_metric_computed = True
656+ clfdc .save ()
617657
618658
619- def compute_metrics_for_collection (collection , dates , replace = False ):
659+ def compute_metrics_for_collection (collection , dates , replace = False , clfdc_to_update = None ):
620660 """
621661 Computes usage metrics for a given collection over a range of dates.
622662
@@ -627,13 +667,17 @@ def compute_metrics_for_collection(collection, dates, replace=False):
627667 should be computed.
628668 replace (bool, optional): A flag indicating whether to replace
629669 existing metrics. Defaults to False.
670+ clfdc_to_update (list, optional): List to append clfdc objects that should be marked as computed after successful export.
630671
631672 Returns:
632673 dict: A dictionary containing computed metrics, keyed by a
633674 generated usage key.
634675 """
635676 data = {}
636677
678+ if clfdc_to_update is None :
679+ clfdc_to_update = []
680+
637681 for date in dates :
638682 date_str = get_date_str (date )
639683
@@ -646,8 +690,7 @@ def compute_metrics_for_collection(collection, dates, replace=False):
646690
647691 logging .info (f"Computing metrics for { date_str } " )
648692 _process_user_sessions (collection , date , date_str , data )
649- clfdc .is_usage_metric_computed = True
650- clfdc .save ()
693+ clfdc_to_update .append (clfdc )
651694
652695 return data
653696
@@ -771,6 +814,8 @@ def _process_user_sessions(collection, date, date_str, data):
771814 item_access .content_type ,
772815 )
773816
817+ return True
818+
774819
775820def _generate_usage_key (collection , journal , pid_v2 , pid_v3 , pid_generic , media_language , country_code , date_str ):
776821 """"
0 commit comments