Skip to content

Commit e862971

Browse files
authored
Add enforce_row_independence parameter for agent maps (#97)
1 parent c3d3e68 commit e862971

5 files changed

Lines changed: 52 additions & 104 deletions

File tree

src/everyrow/generated/api/operations/single_agent_operations_single_agent_post.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ async def asyncio_detailed(
190190
)
191191

192192
response = await client.get_async_httpx_client().request(**kwargs)
193+
193194
return _build_response(client=client, response=response)
194195

195196

src/everyrow/generated/models/agent_map_operation.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ class AgentMapOperation:
4242
not set.
4343
include_research (bool | None | Unset): Include research notes in the response. Required when effort_level is
4444
not set.
45+
enforce_row_independence (bool | Unset): If True, each agent runs completely independently without being
46+
affected by other agents. Disables adaptive budget adjustment and straggler management, ensuring agents are not
47+
hurried or given iteration limits based on other agents' progress. Use this when consistent per-row behavior is
48+
more important than overall throughput. Default: False.
4549
"""
4650

4751
input_: AgentMapOperationInputType2 | list[AgentMapOperationInputType1Item] | UUID
@@ -53,6 +57,7 @@ class AgentMapOperation:
5357
join_with_input: bool | Unset = True
5458
iteration_budget: int | None | Unset = UNSET
5559
include_research: bool | None | Unset = UNSET
60+
enforce_row_independence: bool | Unset = False
5661
additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
5762

5863
def to_dict(self) -> dict[str, Any]:
@@ -118,6 +123,8 @@ def to_dict(self) -> dict[str, Any]:
118123
else:
119124
include_research = self.include_research
120125

126+
enforce_row_independence = self.enforce_row_independence
127+
121128
field_dict: dict[str, Any] = {}
122129
field_dict.update(self.additional_properties)
123130
field_dict.update(
@@ -140,6 +147,8 @@ def to_dict(self) -> dict[str, Any]:
140147
field_dict["iteration_budget"] = iteration_budget
141148
if include_research is not UNSET:
142149
field_dict["include_research"] = include_research
150+
if enforce_row_independence is not UNSET:
151+
field_dict["enforce_row_independence"] = enforce_row_independence
143152

144153
return field_dict
145154

@@ -271,6 +280,8 @@ def _parse_include_research(data: object) -> bool | None | Unset:
271280

272281
include_research = _parse_include_research(d.pop("include_research", UNSET))
273282

283+
enforce_row_independence = d.pop("enforce_row_independence", UNSET)
284+
274285
agent_map_operation = cls(
275286
input_=input_,
276287
task=task,
@@ -281,6 +292,7 @@ def _parse_include_research(data: object) -> bool | None | Unset:
281292
join_with_input=join_with_input,
282293
iteration_budget=iteration_budget,
283294
include_research=include_research,
295+
enforce_row_independence=enforce_row_independence,
284296
)
285297

286298
agent_map_operation.additional_properties = d

src/everyrow/ops.py

Lines changed: 23 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,7 @@ async def create_scalar_artifact(input: BaseModel, session: Session) -> UUID:
111111
data=CreateArtifactRequestDataType1.from_dict(input.model_dump()),
112112
session_id=session.session_id,
113113
)
114-
response = await create_artifact_artifacts_post.asyncio(
115-
client=session.client, body=body
116-
)
114+
response = await create_artifact_artifacts_post.asyncio(client=session.client, body=body)
117115
response = handle_response(response)
118116
return response.artifact_id
119117

@@ -125,9 +123,7 @@ async def create_table_artifact(input: DataFrame, session: Session) -> UUID:
125123
data=[CreateArtifactRequestDataType0Item.from_dict(r) for r in records],
126124
session_id=session.session_id,
127125
)
128-
response = await create_artifact_artifacts_post.asyncio(
129-
client=session.client, body=body
130-
)
126+
response = await create_artifact_artifacts_post.asyncio(client=session.client, body=body)
131127
response = handle_response(response)
132128
return response.artifact_id
133129

@@ -231,35 +227,25 @@ async def single_agent_async[T: BaseModel](
231227
return_table: bool = False,
232228
) -> EveryrowTask[T]:
233229
"""Submit a single_agent task asynchronously."""
234-
input_data = _prepare_single_input(
235-
input, SingleAgentOperationInputType1Item, SingleAgentOperationInputType2
236-
)
230+
input_data = _prepare_single_input(input, SingleAgentOperationInputType1Item, SingleAgentOperationInputType2)
237231

238232
# Build the operation body with either preset or custom params
239233
body = SingleAgentOperation(
240234
input_=input_data, # type: ignore
241235
task=task,
242236
session_id=session.session_id,
243-
response_schema=SingleAgentOperationResponseSchemaType0.from_dict(
244-
response_model.model_json_schema()
245-
),
246-
effort_level=PublicEffortLevel(effort_level.value)
247-
if effort_level is not None
248-
else UNSET,
237+
response_schema=SingleAgentOperationResponseSchemaType0.from_dict(response_model.model_json_schema()),
238+
effort_level=PublicEffortLevel(effort_level.value) if effort_level is not None else UNSET,
249239
llm=LLMEnumPublic(llm.value) if llm is not None else UNSET,
250240
iteration_budget=iteration_budget if iteration_budget is not None else UNSET,
251241
include_research=include_research if include_research is not None else UNSET,
252242
return_list=return_table,
253243
)
254244

255-
response = await single_agent_operations_single_agent_post.asyncio(
256-
client=session.client, body=body
257-
)
245+
response = await single_agent_operations_single_agent_post.asyncio(client=session.client, body=body)
258246
response = handle_response(response)
259247

260-
cohort_task: EveryrowTask[T] = EveryrowTask(
261-
response_model=response_model, is_map=False, is_expand=return_table
262-
)
248+
cohort_task: EveryrowTask[T] = EveryrowTask(response_model=response_model, is_map=False, is_expand=return_table)
263249
cohort_task.set_submitted(response.task_id, response.session_id, session.client)
264250
return cohort_task
265251

@@ -275,6 +261,7 @@ async def agent_map(
275261
llm: LLM | None = None,
276262
iteration_budget: int | None = None,
277263
include_research: bool | None = None,
264+
enforce_row_independence: bool = False,
278265
response_model: type[BaseModel] = DefaultAgentResponse,
279266
) -> TableResult:
280267
"""Execute an AI agent task on each row of the input table.
@@ -305,6 +292,7 @@ async def agent_map(
305292
llm=llm,
306293
iteration_budget=iteration_budget,
307294
include_research=include_research,
295+
enforce_row_independence=enforce_row_independence,
308296
response_model=response_model,
309297
)
310298
result = await cohort_task.await_result()
@@ -319,6 +307,7 @@ async def agent_map(
319307
llm=llm,
320308
iteration_budget=iteration_budget,
321309
include_research=include_research,
310+
enforce_row_independence=enforce_row_independence,
322311
response_model=response_model,
323312
)
324313
result = await cohort_task.await_result()
@@ -335,6 +324,7 @@ async def agent_map_async(
335324
llm: LLM | None = None,
336325
iteration_budget: int | None = None,
337326
include_research: bool | None = None,
327+
enforce_row_independence: bool = False,
338328
response_model: type[BaseModel] = DefaultAgentResponse,
339329
) -> EveryrowTask[BaseModel]:
340330
"""Submit an agent_map task asynchronously."""
@@ -345,26 +335,19 @@ async def agent_map_async(
345335
input_=input_data, # type: ignore
346336
task=task,
347337
session_id=session.session_id,
348-
response_schema=AgentMapOperationResponseSchemaType0.from_dict(
349-
response_model.model_json_schema()
350-
),
351-
effort_level=PublicEffortLevel(effort_level.value)
352-
if effort_level is not None
353-
else UNSET,
338+
response_schema=AgentMapOperationResponseSchemaType0.from_dict(response_model.model_json_schema()),
339+
effort_level=PublicEffortLevel(effort_level.value) if effort_level is not None else UNSET,
354340
llm=LLMEnumPublic(llm.value) if llm is not None else UNSET,
355341
iteration_budget=iteration_budget if iteration_budget is not None else UNSET,
356342
include_research=include_research if include_research is not None else UNSET,
357343
join_with_input=True,
344+
enforce_row_independence=enforce_row_independence,
358345
)
359346

360-
response = await agent_map_operations_agent_map_post.asyncio(
361-
client=session.client, body=body
362-
)
347+
response = await agent_map_operations_agent_map_post.asyncio(client=session.client, body=body)
363348
response = handle_response(response)
364349

365-
cohort_task = EveryrowTask(
366-
response_model=response_model, is_map=True, is_expand=False
367-
)
350+
cohort_task = EveryrowTask(response_model=response_model, is_map=True, is_expand=False)
368351
cohort_task.set_submitted(response.task_id, response.session_id, session.client)
369352
return cohort_task
370353

@@ -403,9 +386,7 @@ async def screen[T: BaseModel](
403386
if isinstance(result, TableResult):
404387
return result
405388
raise EveryrowError("Screen task did not return a table result")
406-
cohort_task = await screen_async(
407-
task=task, session=session, input=input, response_model=response_model
408-
)
389+
cohort_task = await screen_async(task=task, session=session, input=input, response_model=response_model)
409390
result = await cohort_task.await_result()
410391
if isinstance(result, TableResult):
411392
return result
@@ -426,14 +407,10 @@ async def screen_async[T: BaseModel](
426407
input_=input_data, # type: ignore
427408
task=task,
428409
session_id=session.session_id,
429-
response_schema=ScreenOperationResponseSchemaType0.from_dict(
430-
actual_response_model.model_json_schema()
431-
),
410+
response_schema=ScreenOperationResponseSchemaType0.from_dict(actual_response_model.model_json_schema()),
432411
)
433412

434-
response = await screen_operations_screen_post.asyncio(
435-
client=session.client, body=body
436-
)
413+
response = await screen_operations_screen_post.asyncio(client=session.client, body=body)
437414
response = handle_response(response)
438415

439416
cohort_task: EveryrowTask[T] = EveryrowTask(
@@ -520,9 +497,7 @@ async def rank_async[T: BaseModel](
520497
# Validate that field_name exists in the model
521498
properties = response_schema.get("properties", {})
522499
if field_name not in properties:
523-
raise ValueError(
524-
f"Field {field_name} not in response model {response_model.__name__}"
525-
)
500+
raise ValueError(f"Field {field_name} not in response model {response_model.__name__}")
526501
else:
527502
# Build a minimal JSON schema with just the sort field
528503
json_type_map = {
@@ -533,9 +508,7 @@ async def rank_async[T: BaseModel](
533508
}
534509
response_schema = {
535510
"type": "object",
536-
"properties": {
537-
field_name: {"type": json_type_map.get(field_type, field_type)}
538-
},
511+
"properties": {field_name: {"type": json_type_map.get(field_type, field_type)}},
539512
"required": [field_name],
540513
}
541514

@@ -641,9 +614,7 @@ async def merge_async(
641614
session_id=session.session_id,
642615
)
643616

644-
response = await merge_operations_merge_post.asyncio(
645-
client=session.client, body=body
646-
)
617+
response = await merge_operations_merge_post.asyncio(client=session.client, body=body)
647618
response = handle_response(response)
648619

649620
cohort_task = EveryrowTask(response_model=BaseModel, is_map=True, is_expand=False)
@@ -707,9 +678,7 @@ async def dedupe_async(
707678
session_id=session.session_id,
708679
)
709680

710-
response = await dedupe_operations_dedupe_post.asyncio(
711-
client=session.client, body=body
712-
)
681+
response = await dedupe_operations_dedupe_post.asyncio(client=session.client, body=body)
713682
response = handle_response(response)
714683

715684
cohort_task = EveryrowTask(response_model=BaseModel, is_map=True, is_expand=False)

src/everyrow/session.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@ async def create_session(
6363
try:
6464
response = await create_session_endpoint_sessions_post.asyncio(
6565
client=client,
66-
body=CreateSession(
67-
name=name or f"everyrow-sdk-session-{datetime.now().isoformat()}"
68-
),
66+
body=CreateSession(name=name or f"everyrow-sdk-session-{datetime.now().isoformat()}"),
6967
)
7068
response = handle_response(response)
7169
session = Session(client=client, session_id=response.session_id)

0 commit comments

Comments
 (0)