Skip to content

Commit 420ee78

Browse files
feat(v2): add bulk_create to CreateServiceMixin
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4d0b5b6 commit 420ee78

6 files changed

Lines changed: 167 additions & 3 deletions

File tree

docs/v2/services.md

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ If `actor_id` is provided and the model does not have the field, a
8585

8686
Each operation has a raw and serialized variant:
8787

88-
- `create_raw`, `update_raw`, `upsert_raw` -> return SQLAlchemy model
89-
- `create`, `update`, `upsert` -> always serialize to schema
88+
- `create_raw`, `bulk_create_raw`, `update_raw`, `upsert_raw` -> return SQLAlchemy model
89+
- `create`, `bulk_create`, `update`, `upsert` -> always serialize to schema
9090

9191
## Pagination
9292

@@ -117,6 +117,27 @@ user_schema = await service.create(session, payload)
117117
user_model = await service.create_raw(session, payload)
118118
```
119119

120+
### Bulk create
121+
122+
```python
123+
payloads = [
124+
UserCreateSchema(email="a@b.com", name="Alice"),
125+
UserCreateSchema(email="c@d.com", name="Charlie"),
126+
]
127+
128+
# Serialized — returns list[ListSchema]
129+
users = await service.bulk_create(session, payloads)
130+
131+
# Raw — returns list[ModelType]
132+
models = await service.bulk_create_raw(session, payloads)
133+
134+
# With actor tracking
135+
users = await service.bulk_create(session, payloads, actor_id=current_user_id)
136+
```
137+
138+
> M2M relations are not supported in bulk create. Use single `create` calls
139+
> when you need M2M sync.
140+
120141
### Actor-aware update (updated_by)
121142

122143
```python

src/notora/v2/repositories/base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,13 @@ def create(
149149
options: Iterable[OptionSpec[ModelType]] | None = None,
150150
) -> TypedReturnsRows[tuple[ModelType]]: ...
151151

152+
def bulk_create(
153+
self,
154+
payload: Sequence[dict[str, Any]],
155+
*,
156+
options: Iterable[OptionSpec[ModelType]] | None = None,
157+
) -> TypedReturnsRows[tuple[ModelType]]: ...
158+
152159
def create_or_skip(
153160
self,
154161
payload: dict[str, Any],

src/notora/v2/services/mixins/create.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,41 @@ async def create_raw(
4646
await self.sync_m2m_relations(session, self._extract_pk(entity), relation_payload)
4747
return entity
4848

49+
async def bulk_create_raw(
50+
self,
51+
session: AsyncSession,
52+
data: Sequence[PydanticModel | dict[str, Any]],
53+
*,
54+
actor_id: Any | None = None,
55+
options: Iterable[OptionSpec[ModelType]] | None = None,
56+
) -> list[ModelType]:
57+
payloads = [
58+
self._apply_updated_by(
59+
self._dump_payload(item, exclude_unset=False),
60+
actor_id,
61+
)
62+
for item in data
63+
]
64+
query = self.repo.bulk_create(payloads, options=options)
65+
return await self.execute_for_many(session, query)
66+
67+
async def bulk_create(
68+
self,
69+
session: AsyncSession,
70+
data: Sequence[PydanticModel | dict[str, Any]],
71+
*,
72+
actor_id: Any | None = None,
73+
options: Iterable[OptionSpec[ModelType]] | None = None,
74+
schema: type[ListSchema] | None = None,
75+
) -> list[ListSchema]:
76+
entities = await self.bulk_create_raw(
77+
session,
78+
data,
79+
actor_id=actor_id,
80+
options=options,
81+
)
82+
return self.serialize_many(entities, schema=schema)
83+
4984
async def create(
5085
self,
5186
session: AsyncSession,

src/notora/v2/services/mixins/create.pyi

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,34 @@ class CreateServiceMixin[
5858
options: Iterable[OptionSpec[ModelType]] | None = None,
5959
schema: type[SchemaT],
6060
) -> SchemaT: ...
61+
async def bulk_create_raw(
62+
self,
63+
session: AsyncSession,
64+
data: Sequence[PydanticModel | dict[str, Any]],
65+
*,
66+
actor_id: Any | None = None,
67+
options: Iterable[OptionSpec[ModelType]] | None = None,
68+
) -> list[ModelType]: ...
69+
@overload
70+
async def bulk_create(
71+
self,
72+
session: AsyncSession,
73+
data: Sequence[PydanticModel | dict[str, Any]],
74+
*,
75+
actor_id: Any | None = None,
76+
options: Iterable[OptionSpec[ModelType]] | None = None,
77+
schema: None = ...,
78+
) -> list[ListSchema]: ...
79+
@overload
80+
async def bulk_create[SchemaT: BaseResponseSchema](
81+
self,
82+
session: AsyncSession,
83+
data: Sequence[PydanticModel | dict[str, Any]],
84+
*,
85+
actor_id: Any | None = None,
86+
options: Iterable[OptionSpec[ModelType]] | None = None,
87+
schema: type[SchemaT],
88+
) -> list[SchemaT]: ...
6189

6290
class CreateOrSkipServiceMixin[
6391
PKType,

src/notora/v2/services/mixins/executor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,14 @@ async def execute_optional(
7979
result = await self.execute(session, statement)
8080
return cast(ModelType | None, result.unique().scalar_one_or_none())
8181

82+
async def execute_for_many(
83+
self,
84+
session: AsyncSession,
85+
statement: TypedReturnsRows[tuple[ModelType]],
86+
) -> list[ModelType]:
87+
result = await self.execute(session, statement)
88+
return list(cast('ScalarResult[ModelType]', result.unique().scalars()).all())
89+
8290
def _translate_integrity_error(self, err: exc.IntegrityError) -> Exception:
8391
if match := self._fk_constraint_pattern.match(err.args[0]):
8492
fk_name = match.group('fk_name')

tests/v2/test_integration/test_service_crud.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pytest
44
from sqlalchemy.ext.asyncio import AsyncSession
55

6-
from notora.v2.exceptions.common import NotFoundError
6+
from notora.v2.exceptions.common import AlreadyExistsError, NotFoundError
77
from notora.v2.repositories.params import PaginationParams, QueryParams
88
from notora.v2.schemas.base import PaginatedResponseSchema
99
from tests.v2.test_integration.mocks.model import V2User
@@ -182,3 +182,68 @@ async def test_service_retrieve_create_or_skip_upsert_and_delete(
182182

183183
with pytest.raises(NotFoundError):
184184
await user_service.retrieve(db_session, created.id)
185+
186+
187+
async def test_service_bulk_create(
188+
db_session: AsyncSession,
189+
user_service: V2UserService,
190+
) -> None:
191+
payloads = [
192+
_create_user_payload('bulk1@ex.com', 'Bulk1'),
193+
_create_user_payload('bulk2@ex.com', 'Bulk2'),
194+
_create_user_payload('bulk3@ex.com', 'Bulk3'),
195+
]
196+
created = await user_service.bulk_create(db_session, payloads)
197+
await db_session.commit()
198+
199+
assert len(created) == len(payloads)
200+
assert all(isinstance(item, V2UserResponseSchema) for item in created)
201+
assert {item.email for item in created} == {'bulk1@ex.com', 'bulk2@ex.com', 'bulk3@ex.com'}
202+
203+
204+
async def test_service_bulk_create_raw(
205+
db_session: AsyncSession,
206+
user_service: V2UserService,
207+
) -> None:
208+
payloads = [
209+
_create_user_payload('raw1@ex.com', 'Raw1'),
210+
_create_user_payload('raw2@ex.com', 'Raw2'),
211+
]
212+
created = await user_service.bulk_create_raw(db_session, payloads)
213+
await db_session.commit()
214+
215+
assert len(created) == len(payloads)
216+
assert all(isinstance(item, V2User) for item in created)
217+
218+
219+
async def test_service_bulk_create_with_actor_id(
220+
db_session: AsyncSession,
221+
user_service: V2UserService,
222+
) -> None:
223+
actor_id = uuid4()
224+
payloads = [
225+
_create_user_payload('actor1@ex.com', 'Actor1'),
226+
_create_user_payload('actor2@ex.com', 'Actor2'),
227+
]
228+
created = await user_service.bulk_create(db_session, payloads, actor_id=actor_id)
229+
await db_session.commit()
230+
231+
assert len(created) == len(payloads)
232+
assert all(item.updated_by == actor_id for item in created)
233+
234+
235+
async def test_service_bulk_create_duplicate_raises(
236+
db_session: AsyncSession,
237+
user_service: V2UserService,
238+
) -> None:
239+
payload = _create_user_payload('dup@ex.com', 'Original')
240+
await user_service.create(db_session, payload)
241+
await db_session.commit()
242+
243+
with pytest.raises(AlreadyExistsError):
244+
await user_service.bulk_create(
245+
db_session,
246+
[
247+
_create_user_payload('dup@ex.com', 'Duplicate'),
248+
],
249+
)

0 commit comments

Comments
 (0)