This guide explains how to add a new endpoint to the application, ensuring it adheres to the Service-Controller-Repository architecture and follows practices such as pagination, ordering, and wrapping responses in the GenericResponseDTO.
Create a new controller file in the controllers/ directory or modify an existing one. Use APIRouter to define the endpoint and ensure the response is wrapped in GenericResponseDTO.
Create a service file in the services/ directory or modify an existing one. The service should interact with the repository and map database models to DTOs.
Add a repository file in the repositories/ directory or modify an existing one. Ensure it handles pagination and ordering using offset and limit.
Use the SQLAlchemy 2.0 select() style — not the legacy db.query() API.
Example:
# filepath: repositories/new_entity_repository.py
from sqlalchemy.orm import Session
from sqlalchemy import select, func
from models.new_entity_model import NewEntity
from typing import Optional, List, Tuple
from utils import page_size
class NewEntityRepository:
def get_all(
self,
db: Session,
field1: Optional[str] = None,
page: int = 1,
order_by: Optional[str] = None,
) -> Tuple[List[NewEntity], int]:
stmt = select(NewEntity)
if field1:
stmt = stmt.where(NewEntity.field1 == field1)
total_count = db.scalar(select(func.count()).select_from(stmt.subquery()))
if order_by and hasattr(NewEntity, order_by):
stmt = stmt.order_by(getattr(NewEntity, order_by))
offset = (page - 1) * page_size
results = db.scalars(stmt.offset(offset).limit(page_size)).all()
return results, total_countIf the model has a relationship that needs to be loaded eagerly alongside the main query, use contains_eager with of_type():
from sqlalchemy.orm import contains_eager, aliased
RelatedModel = aliased(NewEntity.related_relation.property.mapper.class_)
stmt = (
select(NewEntity)
.join(NewEntity.related_relation.of_type(RelatedModel))
.options(contains_eager(NewEntity.related_relation.of_type(RelatedModel)))
)
# then add .where() clauses and call db.scalars(...).unique().all()Add a new model in the models/ directory or modify an existing one. If you need indexing or hypertable functionality, include the __indexes__ and __hypertable__ attributes.
Defines custom indexes for the table. Example:
__indexes__ = [
{
'name': 'new_entity_field1_idx',
'columns': ['field1']
},
]Defines TimescaleDB hypertable metadata. Example:
__hypertable__ = {
'time_column': 'timestamp_field', # Time column for hypertable
'chunk_time_interval': '1 day', # Chunk interval for partitioning
'compress': True, # Enable compression
'compress_segmentby': 'field1', # Segment by column for compression
'compress_orderby': 'timestamp_field', # Order by column for compression
'compress_policy': True, # Enable compression policy
'compress_after': '7 days' # Compress data older than 7 days
}Example model:
# filepath: models/new_entity_model.py
from sqlalchemy import Column, String, TIMESTAMP
from config.database import Base
class NewEntity(Base):
__tablename__ = "new_entity"
__indexes__ = [
{
'name': 'new_entity_field1_idx',
'columns': ['field1']
},
]
__hypertable__ = {
'time_column': 'timestamp_field',
'chunk_time_interval': '1 day',
'compress': True,
'compress_segmentby': 'field1',
'compress_orderby': 'timestamp_field',
'compress_policy': True,
'compress_after': '7 days'
}
field1 = Column(String, primary_key=True)
field2 = Column(String, nullable=False)
timestamp_field = Column(TIMESTAMP, nullable=False)Add a DTO in the dtos/ directory to define the structure of the response.
Example:
# filepath: dtos/new_entity_dto.py
from pydantic import BaseModel, ConfigDict
class NewEntityDTO(BaseModel):
field1: str
field2: str
model_config = ConfigDict(from_attributes=True)- Pagination and Ordering: Ensure the repository uses
offsetandlimitfor pagination and supports ordering by columns. - GenericResponseDTO: Wrap all responses in
GenericResponseDTOto maintain consistency. - Indexes: Use the
__indexes__attribute in models to define indexes. - Hypertables: Use the
__hypertable__attribute in models for TimescaleDB-specific features. The hypertables will only be generated for newely generated tables. Fields:time_column: Column used for time-based partitioning.chunk_time_interval: Interval for partitioning data.compress: Enable compression.compress_segmentby: Column for segmenting compressed data.compress_orderby: Column for ordering compressed data.compress_policy: Enable automatic compression policy.compress_after: Time after which data is compressed.