11import json
22import time
3+ from collections import deque
34from datetime import datetime
45
56from celery import group
@@ -701,7 +702,7 @@ def __init__(
701702 self .results = []
702703 self .elapsed_seconds = 0
703704 self .resource_wise_time = {}
704- self .parts = [[]]
705+ self .parts = deque ([])
705706 self .result = None
706707 self ._json_result = None
707708 self .redis_service = RedisService ()
@@ -727,14 +728,12 @@ def make_parts(self):
727728 sources = self .resource_distribution .get ('Source' , None )
728729 collections = self .resource_distribution .get ('Collection' , None )
729730 if orgs :
730- self .parts = [orgs ]
731+ self .parts = deque ( [orgs ])
731732 if sources :
732733 self .parts .append (sources )
733734 if collections :
734735 self .parts .append (collections )
735736
736- self .parts = compact (self .parts )
737-
738737 self .parts .append ([])
739738
740739 for data in self .input_list :
@@ -753,8 +752,6 @@ def make_parts(self):
753752 self .parts [- 1 ].append (line )
754753 prev_line = line
755754
756- self .parts = compact (self .parts )
757-
758755 @staticmethod
759756 def chunker_list (seq , size ):
760757 return (seq [i ::size ] for i in range (size ))
@@ -814,14 +811,14 @@ def run(self):
814811 print ("****STARTED MAIN****" )
815812 print ("TASK ID: {}" .format (self .self_task_id ))
816813 print ("***************" )
817- for part_list in list (self .parts ):
814+ while len (self .parts ) > 0 :
815+ part_list = self .parts .popleft ()
818816 if part_list :
819817 part_type = get (part_list , '0.type' , '' ).lower ()
820818 if part_type :
821819 is_child = part_type in ['concept' , 'mapping' , 'reference' ]
822820 start_time = time .time ()
823821 self .queue_tasks (part_list , is_child )
824- self .parts .remove (part_list ) # memory optimization
825822 self .wait_till_tasks_alive ()
826823 if is_child :
827824 if part_type not in self .resource_wise_time :
0 commit comments