Skip to content

Commit e7a909c

Browse files
authored
Merge pull request #180 from game-by-virtuals/refactor/example-threading-lock
Refactor/example threading lock
2 parents 891664f + 450628b commit e7a909c

3 files changed

Lines changed: 150 additions & 24 deletions

File tree

plugins/acp/examples/reactive/buyer.py

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import threading
2+
13
from typing import Tuple
24
from game_sdk.game.agent import Agent, WorkerConfig
35
from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
@@ -53,32 +55,81 @@ def on_evaluate(job: ACPJob):
5355
# },
5456
# }
5557

56-
def buyer():
57-
# upon phase change, the buyer agent will respond to the transaction
58+
def buyer(use_thread_lock: bool = True):
59+
if env.WHITELISTED_WALLET_PRIVATE_KEY is None:
60+
return
61+
62+
if env.BUYER_ENTITY_ID is None:
63+
return
64+
65+
# Thread-safe job queue setup
66+
job_queue = []
67+
job_queue_lock = threading.Lock()
68+
job_event = threading.Event()
69+
70+
# Thread-safe append with optional lock
71+
def safe_append_job(job):
72+
if use_thread_lock:
73+
print(f"[safe_append_job] Acquiring lock to append job {job.id}")
74+
with job_queue_lock:
75+
print(f"[safe_append_job] Lock acquired, appending job {job.id} to queue")
76+
job_queue.append(job)
77+
else:
78+
job_queue.append(job)
79+
80+
# Thread-safe pop with optional lock
81+
def safe_pop_job():
82+
if use_thread_lock:
83+
print(f"[safe_pop_job] Acquiring lock to pop job")
84+
with job_queue_lock:
85+
if job_queue:
86+
job = job_queue.pop(0)
87+
print(f"[safe_pop_job] Lock acquired, popped job {job.id}")
88+
return job
89+
else:
90+
print("[safe_pop_job] Queue is empty after acquiring lock")
91+
else:
92+
if job_queue:
93+
job = job_queue.pop(0)
94+
print(f"[safe_pop_job] Popped job {job.id} without lock")
95+
return job
96+
else:
97+
print("[safe_pop_job] Queue is empty (no lock)")
98+
return None
99+
100+
# Background thread worker: process jobs one by one
101+
def job_worker():
102+
while True:
103+
job_event.wait() # Wait for job
104+
105+
job = safe_pop_job()
106+
while job:
107+
process_job(job)
108+
job = safe_pop_job()
109+
110+
job_event.clear() # Go back to wait
111+
112+
# Event-triggered job task receiver
58113
def on_new_task(job: ACPJob):
114+
print(f"[on_new_task] Received job {job.id} (phase: {job.phase})")
115+
safe_append_job(job)
116+
job_event.set()
117+
118+
def process_job(job: ACPJob):
59119
out = ""
60120
print(job.phase, "job.phase")
61121
if job.phase == ACPJobPhase.NEGOTIATION:
62122
for memo in job.memos:
63123
print(memo.next_phase, "memo.next_phase")
64124
if memo.next_phase == ACPJobPhase.TRANSACTION:
65125
out += f"Buyer agent is reacting to job:\n{job}\n\n"
66-
67126
buyer_agent.get_worker("acp_worker").run(
68127
f"Respond to the following transaction: {job}",
69128
)
70-
71129
out += "Buyer agent has responded to the job\n"
130+
72131
print(Panel(out, title="🔁 Reaction", box=box.ROUNDED, title_align="left", border_style="red"))
73132

74-
75-
if env.WHITELISTED_WALLET_PRIVATE_KEY is None:
76-
return
77-
78-
if env.BUYER_ENTITY_ID is None:
79-
return
80-
81-
82133
acp_plugin = AcpPlugin(
83134
options=AcpPluginOptions(
84135
api_key=env.GAME_API_KEY,
@@ -185,15 +236,24 @@ def post_tweet(content: str, reasoning: str) -> Tuple[FunctionResultStatus, str,
185236
buyer_agent.compile()
186237
agent.compile()
187238

239+
# Start background job thread
240+
threading.Thread(target=job_worker, daemon=True).start()
241+
188242
while True:
189243
print("🟢"*40)
190244
init_state = AcpState.model_validate(agent.agent_state)
191245
print(Panel(f"{init_state}", title="Agent State", box=box.ROUNDED, title_align="left"))
192-
agent.step()
246+
247+
print("[agent.step] Attempting to acquire lock for agent.step()")
248+
with job_queue_lock:
249+
print("[agent.step] Lock acquired, executing agent.step()")
250+
agent.step()
251+
print("[agent.step] Released lock after agent.step()")
252+
193253
end_state = AcpState.model_validate(agent.agent_state)
194254
print(Panel(f"{end_state}", title="End Agent State", box=box.ROUNDED, title_align="left"))
195255
print("🔴"*40)
196256
input("\nPress any key to continue...\n")
197257

198258
if __name__ == "__main__":
199-
buyer()
259+
buyer(use_thread_lock=True)

plugins/acp/examples/reactive/seller.py

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
# from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
2020

2121
load_dotenv(override=True)
22+
2223
env = PluginEnvSettings()
2324

2425

@@ -46,11 +47,79 @@
4647
# },
4748
# }
4849

49-
def seller():
50+
def seller(use_thread_lock: bool = True):
51+
if env.WHITELISTED_WALLET_PRIVATE_KEY is None:
52+
return
53+
54+
if env.SELLER_ENTITY_ID is None:
55+
return
56+
57+
# Thread-safe job queue setup
58+
job_queue = []
59+
job_queue_lock = threading.Lock()
60+
job_event = threading.Event()
61+
62+
# Thread-safe append wrapper
63+
def safe_append_job(job):
64+
if use_thread_lock:
65+
print("[append] Attempting to acquire job_queue_lock")
66+
with job_queue_lock:
67+
print("[append] Lock acquired. Appending job to queue:", job.id)
68+
job_queue.append(job)
69+
print(f"[append] Queue size is now {len(job_queue)}")
70+
else:
71+
job_queue.append(job)
72+
print(f"[append] Appended job (no lock). Queue size is now {len(job_queue)}")
73+
74+
# Thread-safe pop wrapper
75+
def safe_pop_job():
76+
if use_thread_lock:
77+
print("[pop] Attempting to acquire job_queue_lock")
78+
with job_queue_lock:
79+
print("[pop] Lock acquired.")
80+
if job_queue:
81+
job = job_queue.pop(0)
82+
print(f"[pop] Job popped: {job.id}")
83+
return job
84+
else:
85+
print("[pop] Queue is empty.")
86+
else:
87+
if job_queue:
88+
job = job_queue.pop(0)
89+
print(f"[pop] Job popped (no lock): {job.id}")
90+
return job
91+
else:
92+
print("[pop] Queue is empty (no lock).")
93+
return None
94+
95+
# Background thread worker: process jobs one by one
96+
def job_worker():
97+
print("[worker] Job worker started, waiting for jobs.")
98+
while True:
99+
job_event.wait()
100+
print("[worker] job_event triggered.")
101+
102+
job = safe_pop_job()
103+
while job:
104+
print(f"[worker] Processing job {job.id}")
105+
process_job(job)
106+
job = safe_pop_job()
107+
108+
job_event.clear()
109+
print("[worker] All jobs processed. Waiting again.")
110+
111+
# Event-triggered job task receiver
50112
def on_new_task(job: ACPJob):
113+
print(f"[on_new_task] New job received: {job.id}")
114+
safe_append_job(job)
115+
job_event.set()
116+
print("[on_new_task] job_event set.")
117+
118+
def process_job(job: ACPJob):
51119
out = ""
52120
out += f"Reacting to job:\n{job}\n\n"
53121
prompt = ""
122+
54123
if job.phase == ACPJobPhase.REQUEST:
55124
for memo in job.memos:
56125
if memo.next_phase == ACPJobPhase.NEGOTIATION:
@@ -81,12 +150,6 @@ def on_new_task(job: ACPJob):
81150
out += "✅ Seller has responded to job.\n"
82151

83152
print(Panel(out, title="🔁 Reaction", box=box.ROUNDED, title_align="left", border_style="red"))
84-
85-
if env.WHITELISTED_WALLET_PRIVATE_KEY is None:
86-
return
87-
88-
if env.SELLER_ENTITY_ID is None:
89-
return
90153

91154
acp_plugin = AcpPlugin(
92155
options=AcpPluginOptions(
@@ -187,9 +250,12 @@ def generate_meme(description: str, job_id: int, reasoning: str) -> Tuple[Functi
187250
init_state = AcpState.model_validate(agent.agent_state)
188251
print(Panel(f"{init_state}", title="Agent State", box=box.ROUNDED, title_align="left"))
189252
print("🔴"*40)
190-
print("\nListening\n")
253+
254+
# Start background thread
255+
threading.Thread(target=job_worker, daemon=True).start()
256+
print("\nListening...\n")
191257
threading.Event().wait()
192258

193259

194260
if __name__ == "__main__":
195-
seller()
261+
seller(use_thread_lock=True)

plugins/acp/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ game-sdk = ">=0.1.5"
1212
python-dotenv = "^1.1.0"
1313
dacite = "^1.9.2"
1414
rich = ">=13.9.4,<15.0.0"
15-
virtuals-acp = "^0.1.16"
15+
virtuals-acp = "^0.1.17"
1616

1717
[build-system]
1818
requires = ["poetry-core"]

0 commit comments

Comments
 (0)