Transactional Inbox¶
Transactional Inbox pattern for idempotent message processing with causal consistency and worker partitioning.
Features¶
Idempotency: Duplicate messages are ignored (same PK = same message)
Causal Consistency: Messages are processed only after their causal dependencies
Worker Partitioning: Messages are distributed across workers by partition key strategy
At-least-once delivery: Messages are stored before processing
Usage¶
from ascetic_ddd.inbox import Inbox, InboxMessage
inbox = Inbox(session_pool)
async with session_pool.session() as session:
async with session.atomic():
await inbox.setup(session)
Partition Key Strategies¶
Two strategies determine how messages are distributed across workers:
1. UriPartitionKeyStrategy (default)¶
Partition by URI. Use when ordering is based on topic/partition from the broker.
from ascetic_ddd.inbox import Inbox, UriPartitionKeyStrategy
inbox = Inbox(session_pool, partition_key_strategy=UriPartitionKeyStrategy())
The URI may contain a partition key suffix (like Outbox):
kafka://orders— all messages go to one workerkafka://orders/order-123— distributed by hash of full URI
2. StreamPartitionKeyStrategy¶
Partition by stream identity (tenant_id, stream_type, stream_id). Use when messages have causal dependencies within a stream.
from ascetic_ddd.inbox import Inbox, StreamPartitionKeyStrategy
inbox = Inbox(session_pool, partition_key_strategy=StreamPartitionKeyStrategy())
All messages for the same stream go to the same worker, preserving causal order.
Publishing Messages¶
Receive messages from external source (e.g., message broker):
await inbox.publish(InboxMessage(
tenant_id="tenant1",
stream_type="Order",
stream_id={"id": "order-123"},
stream_position=1,
uri="kafka://orders/order-123",
payload={"type": "OrderCreated", "amount": 100},
metadata={
"event_id": "uuid-123",
"causal_dependencies": [
{"tenant_id": "tenant1", "stream_type": "User", "stream_id": {"id": "user-1"}, "stream_position": 5}
]
}
))
Processing Messages¶
Option 1: dispatch() - Single Message¶
async def handle_message(session, message: InboxMessage) -> None:
event = deserialize(message.payload)
await process_event(session, event)
# Process one message
has_message = await inbox.dispatch(handle_message)
# With partitioning (for manual worker management)
has_message = await inbox.dispatch(handle_message, worker_id=0, num_workers=3)
Option 2: run() - Continuous Loop¶
stop_event = asyncio.Event()
# Single coroutine, single process (default)
await inbox.run(
subscriber=handle_message,
poll_interval=1.0,
stop_event=stop_event,
)
# Multiple coroutines in single process (4 partitions)
await inbox.run(handle_message, concurrency=4)
# Multiple processes (run in separate processes, 3 partitions total)
# Process 0:
await inbox.run(handle_message, process_id=0, num_processes=3)
# Process 1:
await inbox.run(handle_message, process_id=1, num_processes=3)
# Process 2:
await inbox.run(handle_message, process_id=2, num_processes=3)
# Multiple processes with multiple coroutines (2 processes × 2 coroutines = 4 partitions)
# Process 0: handles partitions 0, 1
await inbox.run(handle_message, process_id=0, num_processes=2, concurrency=2)
# Process 1: handles partitions 2, 3
await inbox.run(handle_message, process_id=1, num_processes=2, concurrency=2)
Each coroutine processes its own partitions:
effective_id = process_id * concurrency + local_id
effective_total = num_processes * concurrency
Option 3: Async Iterator¶
async for session, message in inbox:
await handle_message(session, message)
# Message is automatically marked as processed after yield
Causal Dependencies¶
Messages can declare dependencies on other messages. A message is only processed after all its dependencies have been processed:
metadata={
"causal_dependencies": [
{
"tenant_id": "tenant1",
"stream_type": "User",
"stream_id": {"id": "user-1"},
"stream_position": 5
}
]
}
If dependencies are not satisfied, the Inbox skips to the next message and retries later.
Important: When using causal dependencies, use StreamPartitionKeyStrategy to ensure causally related messages go to the same worker.
Database Schema¶
The Inbox uses PostgreSQL with the following schema:
CREATE SEQUENCE inbox_received_position_seq;
CREATE TABLE inbox (
tenant_id varchar(128) NOT NULL,
stream_type varchar(128) NOT NULL,
stream_id jsonb NOT NULL,
stream_position integer NOT NULL,
uri varchar(60) NOT NULL,
payload jsonb NOT NULL,
metadata jsonb NULL,
received_position bigint NOT NULL UNIQUE DEFAULT nextval('inbox_received_position_seq'),
processed_position bigint NULL,
CONSTRAINT inbox_pk PRIMARY KEY (tenant_id, stream_type, stream_id, stream_position)
);
Primary Key:
(tenant_id, stream_type, stream_id, stream_position)ensures idempotencyreceived_position: Order of message arrival
processed_position: Set when message is processed (NULL = unprocessed)
API Reference¶
InboxMessage¶
@dataclass
class InboxMessage:
tenant_id: str # Tenant identifier
stream_type: str # Event stream type (e.g., aggregate type)
stream_id: dict[str, Any] # Stream identifier (e.g., aggregate ID)
stream_position: int # Position in the stream
uri: str # Routing URI (e.g., 'kafka://orders/order-123')
payload: dict[str, Any] # Event payload
metadata: dict[str, Any] | None # Optional metadata (causal_dependencies, event_id)
received_position: int | None # Auto-assigned by DB
processed_position: int | None # Set when processed
IInbox Interface¶
Method |
Description |
|---|---|
|
Store incoming message in inbox |
|
Process next message |
|
Run continuous processing loop |
|
Async iterator for message streaming |
|
Create tables and sequences |
|
Cleanup resources |
dispatch() Parameters¶
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
required |
Callback to handle each message |
|
|
|
This worker’s ID (0 to num_workers-1) |
|
|
|
Total number of workers for partitioning |
run() Parameters¶
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
required |
Callback to handle each message |
|
|
|
This process’s ID (0 to num_processes-1) |
|
|
|
Total number of processes |
|
|
|
Number of coroutines in this process |
|
|
|
Seconds to wait when no messages |
|
|
|
For graceful shutdown |
ISubscriber¶
ISubscriber: TypeAlias = Callable[[ISession, InboxMessage], Awaitable]
Partition Key Strategies¶
Strategy |
SQL Expression |
Use Case |
|---|---|---|
|
|
Topic-based routing (default) |
|
|
Causal consistency within stream |
Notes¶
tenant_id is typed as typing.Any – the actual type is determined
by the user’s DDL schema (varchar, integer, etc.). The DDL
can include REFERENCES to enforce referential integrity.
References¶
See also
“Domain Events in DDD” by Ivan Zakrevsky
“About the message race in terms of competing subscribers” by Ivan Zakrevsky
“Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions” by Gregor Hohpe, Bobby Woolf, “Chapter 10.Messaging Endpoints :: Transactional Client”
“Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka” by Vaughn Vernon, “Chapter 9. Message Endpoints :: Transactional Client/Actor”