-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathevaluator.py
More file actions
87 lines (68 loc) · 2.94 KB
/
evaluator.py
File metadata and controls
87 lines (68 loc) · 2.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import logging
import time
from typing import List
from dotenv import load_dotenv
from virtuals_acp.client import VirtualsACP
from virtuals_acp.contract_clients.contract_client_v2 import ACPContractClientV2
from virtuals_acp.env import EnvSettings
from virtuals_acp.job import ACPJob
from virtuals_acp.models import ACPJobPhase
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("EvaluatorAgent")
load_dotenv(override=True)
# --- Configuration for the job polling interval ---
POLL_INTERVAL_SECONDS = 20
# --------------------------------------------------
ACCEPT_EVALUATION = True
def evaluator():
env = EnvSettings()
acp_client = VirtualsACP(
acp_contract_clients=ACPContractClientV2(
wallet_private_key=env.WHITELISTED_WALLET_PRIVATE_KEY,
agent_wallet_address=env.EVALUATOR_AGENT_WALLET_ADDRESS,
entity_id=env.EVALUATOR_ENTITY_ID,
),
)
logger.info(f"Evaluator ACP Initialized. Agent: {acp_client.wallet_address}")
while True:
logger.info(
f"\nPolling for jobs assigned to {acp_client.wallet_address} requiring evaluation."
)
active_jobs_list: List[ACPJob] = acp_client.get_active_jobs()
if not active_jobs_list:
logger.info("No active jobs found in this poll.")
time.sleep(POLL_INTERVAL_SECONDS)
continue
for job in active_jobs_list:
try:
# Ensure this job is for the current evaluator
if job.evaluator_address != acp_client.wallet_address:
continue
if job.phase == ACPJobPhase.EVALUATION:
logger.info(f"Found Job {job.id} in EVALUATION phase.")
logger.info(
f"Job {job.id}: Evaluating deliverable: {job.get_deliverable()} with requirement: {job.requirement}"
)
job.evaluate(
accept=ACCEPT_EVALUATION,
reason="Deliverable looks great, approved!" if ACCEPT_EVALUATION else "Deliverable not accepted.",
)
logger.info(f"Job {job.id}: Evaluated with {ACCEPT_EVALUATION}.")
elif job.phase in [ACPJobPhase.REQUEST, ACPJobPhase.NEGOTIATION]:
logger.info(
f"Job {job.id} is in {job.phase.name} phase. Waiting for job to be delivered."
)
continue
elif job.phase in [ACPJobPhase.COMPLETED, ACPJobPhase.REJECTED]:
logger.info(
f"Job {job.id} is already in {job.phase.name}. No action."
)
except Exception as e:
logger.error(f"Error processing job {job.id}: {e}")
time.sleep(POLL_INTERVAL_SECONDS)
if __name__ == "__main__":
evaluator()