A SQL-backed broker for FastStream. Documentation.
Implementing the transactional outbox pattern becomes as simple as the following.
Publish messages transactionally with your other database operations.
from sqlalchemy.ext.asyncio import create_async_engine
from faststream import AckPolicy, FastStream
from faststream.kafka import KafkaBroker
from faststream_sqlbroker.sqlbroker import SqlBroker
from faststream_sqlbroker.sqlbroker.retry import ExponentialBackoffRetryStrategy
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker_sqlbroker = SqlBroker(engine=engine)
broker_kafka = KafkaBroker("127.0.0.1:9092")
app = FastStream(broker_sqlbroker, on_startup=[broker_kafka.connect])
publisher_sqlbroker = broker_sqlbroker.publisher()
@app.after_startup # just an example
async def publish_examples():
async with engine.begin() as connection:
# ... your other database operations using `connection` ...
await publisher_sqlbroker.publish(
{"message": "Hello, SqlBroker!"},
queue="sqlbroker_queue",
connection=connection,
)And relay the messages from the database to another broker.
publisher_kafka = broker_kafka.publisher("kafka_topic")
@publisher_kafka
@broker_sqlbroker.subscriber(
queues=["sqlbroker_queue"],
max_workers=10,
retry_strategy=ExponentialBackoffRetryStrategy(
initial_delay_seconds=1,
multiplier=2,
max_delay_seconds=60 * 5,
max_total_delay_seconds=60 * 60 * 6,
max_attempts=None,
),
max_fetch_interval=1,
min_fetch_interval=0,
fetch_batch_size=10,
overfetch_factor=1.5,
flush_interval=3,
release_stuck_interval=5,
release_stuck_timeout=60 * 60,
max_deliveries=20,
ack_policy=AckPolicy.NACK_ON_ERROR,
)
async def handle_msg(msg_body: dict) -> dict:
return msg_bodyOriginated as a PR to FastStream.