@@ -23,6 +23,9 @@ class AcpPluginOptions:
2323 evaluator_cluster : Optional [str ] = None
2424 graduated : Optional [bool ] = True
2525 job_expiry_duration_mins : Optional [int ] = None
26+ keep_completed_jobs : Optional [int ] = None
27+ keep_cancelled_jobs : Optional [int ] = None
28+ keep_produced_inventory : Optional [int ] = None
2629
2730class AcpPlugin :
2831 def __init__ (self , options : AcpPluginOptions ):
@@ -55,71 +58,109 @@ def __init__(self, options: AcpPluginOptions):
5558 self .produced_inventory : List [IInventory ] = []
5659 self .acp_base_url = self .acp_client .acp_api_url
5760 self .job_expiry_duration_mins = options .job_expiry_duration_mins if options .job_expiry_duration_mins is not None else 1440
61+ self .keep_completed_jobs = options .keep_completed_jobs if options .keep_completed_jobs is not None else 1
62+ self .keep_cancelled_jobs = options .keep_cancelled_jobs if options .keep_cancelled_jobs is not None else 0
63+ self .keep_produced_inventory = options .keep_produced_inventory if options .keep_produced_inventory is not None else 1
64+
5865
5966 def add_produce_item (self , item : IInventory ) -> None :
6067 self .produced_inventory .append (item )
61-
62- def memo_to_dict (self , m ):
63- return {
64- "id" : m .id ,
65- "type" : m .type .name ,
66- "content" : m .content ,
67- "next_phase" : m .next_phase .name
68- }
69-
70- def _to_state_acp_job (self , job : ACPJob ) -> Dict :
71- memos = []
72- for memo in job .memos :
73- memos .append (self .memo_to_dict (memo ))
74-
75-
76- return {
77- "jobId" : job .id ,
78- "clientName" : job .client_agent .name if job .client_agent else "" ,
79- "providerName" : job .provider_agent .name if job .provider_agent else "" ,
80- "desc" : job .service_requirement or "" ,
81- "price" : str (job .price ),
82- "providerAddress" : job .provider_address ,
83- "phase" : ACP_JOB_PHASE_MAP .get (job .phase ),
84- "memo" : list (reversed (memos )),
85- "tweetHistory" : [
86- {
87- "type" : tweet .get ("type" ),
88- "tweet_id" : tweet .get ("tweetId" ),
89- "content" : tweet .get ("content" ),
90- "created_at" : tweet .get ("createdAt" )
91- }
92- for tweet in reversed (job .context .get ('tweets' , []) if job .context else [])
93- ],
94- }
9568
9669 def get_acp_state (self ) -> Dict :
70+ agent_addr = self .acp_client .agent_address .lower ()
71+
72+ def serialize_job (job : ACPJob , active : bool ) -> Dict :
73+ return {
74+ "jobId" : job .id ,
75+ "clientName" : job .client_agent .name if job .client_agent else "" ,
76+ "providerName" : job .provider_agent .name if job .provider_agent else "" ,
77+ "desc" : job .service_requirement or "" ,
78+ "price" : str (job .price ),
79+ "providerAddress" : job .provider_address ,
80+ "phase" : ACP_JOB_PHASE_MAP .get (job .phase ),
81+ # Include memos only if active
82+ "memo" : [
83+ {
84+ "id" : m .id ,
85+ "type" : m .type .value ,
86+ "content" : m .content ,
87+ "next_phase" : m .next_phase .value ,
88+ }
89+ for m in reversed (job .memos )
90+ ] if active and job .memos else [],
91+ # Include tweetHistory only if active
92+ "tweetHistory" : [
93+ {
94+ "type" : t .get ("type" ),
95+ "tweet_id" : t .get ("tweetId" ),
96+ "content" : t .get ("content" ),
97+ "created_at" : t .get ("createdAt" ),
98+ }
99+ for t in reversed (job .context .get ("tweets" , []))
100+ ] if active and job .context else [],
101+ }
102+
103+ # Fetch job states
97104 active_jobs = self .acp_client .get_active_jobs ()
98- completed_jobs = self .acp_client .get_completed_jobs ()
99- cancelled_jobs = self .acp_client .get_cancelled_jobs ()
100105
101- agent_addr = self .acp_client .agent_address .lower ()
106+ # Fetch completed jobs if not explicitly disabled
107+ if self .keep_completed_jobs == 0 :
108+ completed_jobs = []
109+ else :
110+ completed_jobs = self .acp_client .get_completed_jobs ()
102111
103- active_buyer_jobs = []
104- active_seller_jobs = []
105-
106- for job in active_jobs :
107- processed = self ._to_state_acp_job (job )
108- client_addr = job .client_address .lower ()
109- provider_addr = job .provider_address .lower ()
110-
111- if client_addr == agent_addr :
112- active_buyer_jobs .append (processed )
113- if provider_addr == agent_addr :
114- active_seller_jobs .append (processed )
112+ # Fetch cancelled jobs if not explicitly disabled
113+ if self .keep_cancelled_jobs == 0 :
114+ cancelled_jobs = []
115+ else :
116+ cancelled_jobs = self .acp_client .get_cancelled_jobs ()
117+
118+ active_buyer_jobs = [
119+ serialize_job (job , active = True )
120+ for job in active_jobs
121+ if job .client_address .lower () == agent_addr
122+ ]
123+
124+ active_seller_jobs = [
125+ serialize_job (job , active = True )
126+ for job in active_jobs
127+ if job .provider_address .lower () == agent_addr
128+ ]
129+
130+ # Limit completed and cancelled jobs
131+ completed = [
132+ serialize_job (job , active = False )
133+ for job in (
134+ completed_jobs [:self .keep_completed_jobs ]
135+ if self .keep_completed_jobs is not None
136+ else completed_jobs
137+ )
138+ ]
139+
140+ cancelled = [
141+ serialize_job (job , active = False )
142+ for job in (
143+ cancelled_jobs [:self .keep_cancelled_jobs ]
144+ if self .keep_cancelled_jobs is not None
145+ else cancelled_jobs
146+ )
147+ ]
148+
149+ # Produced inventory logic
150+ produced = []
151+ if self .produced_inventory and self .keep_produced_inventory > 0 :
152+ produced = [
153+ item .model_dump () for item in (
154+ self .produced_inventory [:self .keep_produced_inventory ]
155+ if self .keep_produced_inventory is not None
156+ else self .produced_inventory
157+ )
158+ ]
115159
116- completed = [self ._to_state_acp_job (job ) for job in completed_jobs ]
117- cancelled = [self ._to_state_acp_job (job ) for job in cancelled_jobs ]
118-
119160 return {
120161 "inventory" : {
121162 "acquired" : [],
122- "produced" : [ item . model_dump () for item in self . produced_inventory ] if self . produced_inventory else [] ,
163+ "produced" : produced ,
123164 },
124165 "jobs" : {
125166 "active" : {
@@ -128,7 +169,7 @@ def get_acp_state(self) -> Dict:
128169 },
129170 "completed" : completed ,
130171 "cancelled" : cancelled ,
131- }
172+ },
132173 }
133174
134175 def get_worker (self , data : Optional [Dict ] = None ) -> WorkerConfig :
0 commit comments