-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathbuyer.py
More file actions
97 lines (80 loc) · 3.26 KB
/
buyer.py
File metadata and controls
97 lines (80 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
import threading
from typing import Optional
from dotenv import load_dotenv
from virtuals_acp.client import VirtualsACP
from virtuals_acp.configs.configs import BASE_MAINNET_ACP_X402_CONFIG_V2
from virtuals_acp.contract_clients.contract_client_v2 import ACPContractClientV2
from virtuals_acp.env import EnvSettings
from virtuals_acp.job import ACPJob
from virtuals_acp.memo import ACPMemo
from virtuals_acp.models import (
ACPAgentSort,
ACPJobPhase,
ACPGraduationStatus,
ACPOnlineStatus
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("BuyerAgent")
load_dotenv(override=True)
def buyer():
env = EnvSettings()
def on_new_task(job: ACPJob, memo_to_sign: Optional[ACPMemo] = None):
if (
job.phase == ACPJobPhase.NEGOTIATION
and memo_to_sign is not None
and memo_to_sign.next_phase == ACPJobPhase.TRANSACTION
):
logger.info(f"Paying for job {job.id}")
job.pay_and_accept_requirement()
logger.info(f"Job {job.id} paid")
elif (
job.phase == ACPJobPhase.TRANSACTION
and memo_to_sign is not None
and memo_to_sign.next_phase == ACPJobPhase.REJECTED
):
logger.info(f"Signing job {job.id} rejection memo, rejection reason: {memo_to_sign.content}")
memo_to_sign.sign(True, "Accepts job rejection")
logger.info(f"Job {job.id} rejection memo signed")
elif job.phase == ACPJobPhase.COMPLETED:
logger.info(f"Job {job.id} completed, received deliverable: {job.get_deliverable()}")
elif job.phase == ACPJobPhase.REJECTED:
logger.info(f"Job {job.id} rejected by seller")
acp_client = VirtualsACP(
acp_contract_clients=ACPContractClientV2(
wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
agent_wallet_address=env.BUYER_AGENT_WALLET_ADDRESS,
entity_id=env.BUYER_ENTITY_ID,
config=BASE_MAINNET_ACP_X402_CONFIG_V2, # route to x402 for payment, undefined defaulted back to direct transfer
),
on_new_task=on_new_task
)
# Browse available agents based on a keyword
relevant_agents = acp_client.browse_agents(
keyword="<your-filter-agent-keyword>",
sort_by=[ACPAgentSort.SUCCESSFUL_JOB_COUNT],
top_k=5,
graduation_status=ACPGraduationStatus.ALL,
online_status=ACPOnlineStatus.ALL,
show_hidden_offerings=True,
)
logger.info(f"Relevant agents: {relevant_agents}")
# Pick one of the agents based on your criteria (in this example we just pick the first one)
chosen_agent = relevant_agents[0]
# Pick one of the service offerings based on your criteria (in this example we just pick the first one)
chosen_job_offering = chosen_agent.job_offerings[0]
job_id = chosen_job_offering.initiate_job(
service_requirement={
"<your-schema-key-1>": "<your-schema-value-1>",
"<your-schema-key-2>": "<your-schema-value-2>",
},
)
logger.info(f"Job {job_id} initiated")
logger.info("Listening for next steps...")
threading.Event().wait()
if __name__ == "__main__":
buyer()