-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathjob_offering.py
More file actions
186 lines (156 loc) · 6.25 KB
/
job_offering.py
File metadata and controls
186 lines (156 loc) · 6.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import json
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, Optional, Union, TYPE_CHECKING, List
from jsonschema import ValidationError, validate
from pydantic import BaseModel, field_validator, ConfigDict
from web3 import Web3
from web3.constants import ADDRESS_ZERO
from virtuals_acp.configs.configs import (
BASE_SEPOLIA_CONFIG,
BASE_MAINNET_CONFIG,
BASE_SEPOLIA_ACP_X402_CONFIG,
BASE_MAINNET_ACP_X402_CONFIG
)
from virtuals_acp.constants import USDC_TOKEN_ADDRESS
from virtuals_acp.contract_clients.base_contract_client import BaseAcpContractClient
from virtuals_acp.fare import FareAmount
from virtuals_acp.models import ACPJobPhase, MemoType, OperationPayload, PriceType
if TYPE_CHECKING:
from virtuals_acp.client import VirtualsACP
class ACPJobOffering(BaseModel):
acp_client: "VirtualsACP"
contract_client: BaseAcpContractClient
provider_address: str
name: str
price: float
price_type: PriceType
required_funds: bool
sla_minutes: int
requirement: Optional[Union[Dict[str, Any], str]] = None
deliverable: Optional[Union[Dict[str, Any], str]] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_validator("requirement", mode="before")
def parse_requirement_schema(cls, v):
if isinstance(v, str):
try:
return json.loads(json.dumps(v))
except json.JSONDecodeError:
return None
return v
def __str__(self):
return f"ACPJobOffering({self.model_dump(exclude={'acp_client'})})"
def __repr__(self) -> str:
return self.__str__()
def initiate_job(
self,
service_requirement: Union[Dict[str, Any], str],
evaluator_address: Optional[str] = None,
) -> int:
expired_at = datetime.now(timezone.utc) + timedelta(minutes=self.sla_minutes)
# Validate against requirement schema if present
if self.requirement:
try:
service_requirement = json.loads(json.dumps(service_requirement))
except json.JSONDecodeError:
raise ValueError(
f"Invalid JSON in service requirement. Required format: {json.dumps(self.requirement, indent=2)}"
)
if isinstance(self.requirement, dict):
try:
validate(instance=service_requirement, schema=self.requirement)
except ValidationError as e:
raise ValueError(f"Invalid service requirement: {str(e)}")
final_service_requirement: Dict[str, Any] = {
"name": self.name,
"requirement": service_requirement,
"priceValue": self.price,
"priceType": self.price_type,
}
eval_addr = (
Web3.to_checksum_address(evaluator_address)
if evaluator_address
else self.contract_client.agent_wallet_address
)
# Prepare fare amount based on this offering's price and contract's base fare
fare_amount = FareAmount(
self.price if self.price_type == PriceType.FIXED else 0,
self.contract_client.config.base_fare,
)
# Lookup existing account between client and provider
account = self.acp_client.get_by_client_and_provider(
self.contract_client.agent_wallet_address,
self.provider_address,
self.contract_client,
)
base_contract_addresses = {
BASE_SEPOLIA_CONFIG.contract_address.lower(),
BASE_SEPOLIA_ACP_X402_CONFIG.contract_address.lower(),
BASE_MAINNET_CONFIG.contract_address.lower(),
BASE_MAINNET_ACP_X402_CONFIG.contract_address.lower(),
}
use_simple_create = (
self.contract_client.config.contract_address.lower()
in base_contract_addresses
)
chain_id = self.contract_client.config.chain_id
usdc_token_address = USDC_TOKEN_ADDRESS[chain_id]
is_usdc_payment_token = usdc_token_address == fare_amount.fare.contract_address
# If the contract has x402_config and USDC is used, call create_job_with_x402
is_x402_job = bool(getattr(self.contract_client.config, "x402_config", None) and is_usdc_payment_token)
if use_simple_create or not account:
create_job_operation = self.contract_client.create_job(
self.provider_address,
eval_addr,
expired_at,
fare_amount.fare.contract_address,
fare_amount.amount,
"",
is_x402_job=is_x402_job,
)
else:
evaluator_address = (
Web3.to_checksum_address(evaluator_address)
if evaluator_address
else ADDRESS_ZERO
)
create_job_operation = self.contract_client.create_job_with_account(
account.id,
evaluator_address or self.contract_client.agent_wallet_address,
fare_amount.amount,
fare_amount.fare.contract_address,
expired_at,
is_x402_job=is_x402_job,
)
response = self.contract_client.handle_operation([create_job_operation])
job_id = self.contract_client.get_job_id(
response,
self.contract_client.agent_wallet_address,
self.provider_address,
)
operations: List[OperationPayload] = []
operation = self.contract_client.set_budget_with_payment_token(
job_id,
fare_amount.amount,
fare_amount.fare.contract_address,
)
if operation:
operations.append(operation)
operations.append(
self.contract_client.create_memo(
job_id,
json.dumps(final_service_requirement),
MemoType.MESSAGE,
True,
ACPJobPhase.NEGOTIATION,
)
)
self.contract_client.handle_operation(operations)
return job_id
class ACPResourceOffering(BaseModel):
acp_client: "VirtualsACP"
name: str
description: str
url: str
parameters: Optional[Dict[str, Any]]
id: int
model_config = ConfigDict(arbitrary_types_allowed=True)