Saga Pattern¶
A Python implementation of the Saga pattern using the Routing Slip approach, based on Clemens Vasters’ article.
What is a Saga?¶
A Saga is a failure management pattern for handling long-lived and distributed transactions across systems that cannot use traditional ACID transactions with two-phase commit.
Rather than holding locks across multiple services, a Saga splits work into individual activities whose effects can be compensated (reversed) after work has been performed and committed.
The Saga module provides coordination for distributed transactions using compensating actions.
When to Use Sagas¶
Traditional distributed transactions with locks are impractical when:
Work spans multiple independent services with different trust boundaries
Transactions are long-lived and geographically distributed
Participants cannot be enlisted in a single ACID transaction
Services are autonomous and may be temporarily unavailable
How It Works¶
The pattern operates through a routing slip mechanism:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Activity 1 │───>│ Activity 2 │───>│ Activity 3 │
│ (do_work) │ │ (do_work) │ │ (do_work) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ │ │ FAILURE!
│ │ ▼
│ │ ┌─────────────┐
│ │<──────────│ compensate │
│ ▼ └─────────────┘
│ ┌─────────────┐
│<──────────│ compensate │
▼ └─────────────┘
┌─────────────┐
│ compensate │
└─────────────┘
Forward Path: The routing slip progresses through sequential work items
Backward Path: On failure, the routing slip reverses through completed steps for compensation
Key Characteristics¶
No centralized coordinator
All work remains local to individual nodes
The routing slip carries all transaction context
Decisions occur locally at each step
Can be serialized and transmitted between distributed systems
Components¶
Activity¶
Each activity encapsulates two operations:
from ascetic_ddd.saga import Activity, WorkItem, WorkLog, RoutingSlip
class MyActivity(Activity):
async def do_work(self, work_item: WorkItem) -> WorkLog:
# Perform the business operation
result = await perform_operation(work_item.arguments)
return WorkLog(self, WorkResult({"id": result.id}))
async def compensate(self, work_log: WorkLog, routing_slip: RoutingSlip) -> bool:
# Reverse the operation
await cancel_operation(work_log.result["id"])
return True # Continue backward
@property
def work_item_queue_address(self) -> str:
return "sb://./myActivityQueue"
@property
def compensation_queue_address(self) -> str:
return "sb://./myActivityCompensation"
RoutingSlip¶
The document flowing through the system:
from ascetic_ddd.saga import RoutingSlip, WorkItem, WorkItemArguments
routing_slip = RoutingSlip([
WorkItem(Activity1, WorkItemArguments({"key": "value1"})),
WorkItem(Activity2, WorkItemArguments({"key": "value2"})),
WorkItem(Activity3, WorkItemArguments({"key": "value3"})),
])
Properties:
is_completed: True if all work items processedis_in_progress: True if some work completed (can compensate)progress_uri: Next activity’s work queue addresscompensation_uri: Last completed activity’s compensation address
Methods:
process_next(): Execute next work item, returns success/failureundo_last(): Compensate last completed work
ActivityHost¶
Manages message processing for a specific activity type:
from ascetic_ddd.saga import ActivityHost
def send(uri: str, routing_slip: RoutingSlip):
# Route to appropriate host based on URI
...
host = ActivityHost(MyActivity, send)
await host.accept_message(uri, routing_slip)
Example: Travel Booking Saga¶
from ascetic_ddd.saga import RoutingSlip, WorkItem, WorkItemArguments
from ascetic_ddd.saga.examples import (
ReserveCarActivity,
ReserveHotelActivity,
ReserveFlightActivity,
)
# Create routing slip with activities ordered by risk (least risky first)
routing_slip = RoutingSlip([
WorkItem(ReserveCarActivity, WorkItemArguments({"vehicleType": "Compact"})),
WorkItem(ReserveHotelActivity, WorkItemArguments({"roomType": "Suite"})),
WorkItem(ReserveFlightActivity, WorkItemArguments({"destination": "DUS"})),
])
# Process the saga
while not routing_slip.is_completed:
if not await routing_slip.process_next():
# Activity failed - compensate all completed work
print("Failure! Starting compensation...")
while routing_slip.is_in_progress:
await routing_slip.undo_last()
break
else:
print("Saga completed successfully!")
Risk Ordering Strategy¶
Activities should be sequenced by success probability (least risky first):
Car reservations - Highest success rate, easily cancellable
Hotel bookings - Moderate risk, cancellable until 24 hours before
Airfare - Highest risk due to refund restrictions
This minimizes the amount of compensation needed when failures occur.
Compensation Semantics¶
The compensate() method returns a boolean:
True: Compensation succeeded, continue backward pathFalse: Compensation added new work, resume forward path
This allows for sophisticated recovery strategies where compensation might involve retrying with different parameters.
Idempotency Requirement¶
Activities MUST be idempotent. This is a fundamental requirement for saga reliability.
Why Idempotency?¶
In distributed systems with message queues, messages are typically delivered with “at least once” semantics:
Worker receives message from queue
Worker processes the saga step
Worker sends Ack (acknowledgment) to queue
If worker crashes before Ack, message returns to queue and is redelivered
This means do_work() and compensate() may be called multiple times for the same logical operation.
Ensuring Idempotency¶
class ReserveHotelActivity(Activity):
async def do_work(self, work_item: WorkItem) -> WorkLog:
reservation_key = work_item.arguments["idempotency_key"]
# Check if already processed
existing = await db.find_reservation(reservation_key)
if existing:
return WorkLog(self, WorkResult({"id": existing.id}))
# Create new reservation with idempotency key
reservation = await db.create_reservation(
key=reservation_key,
room_type=work_item.arguments["roomType"],
)
return WorkLog(self, WorkResult({"id": reservation.id}))
async def compensate(self, work_log: WorkLog, routing_slip: RoutingSlip) -> bool:
reservation_id = work_log.result["id"]
# Idempotent cancellation - safe to call multiple times
await db.cancel_reservation_if_exists(reservation_id)
return True
ParallelActivity and FallbackActivity¶
Both ParallelActivity and FallbackActivity execute branches in-process using asyncio:
ParallelActivity: Uses
asyncio.gather()for concurrent executionFallbackActivity: Uses sequential loop to try alternatives
If the process crashes during execution:
No Ack is sent to the message queue
The message is redelivered
The entire step (including all branches) is re-executed
Idempotent activities ensure correctness on retry
This design avoids the complexity of distributed coordination while maintaining reliability through idempotency.
Parallel Execution (Fork/Join)¶
Based on Section 8 of the original Sagas paper, ParallelActivity executes multiple RoutingSlips concurrently.
Each branch is a full saga with its own forward/backward paths:
from ascetic_ddd.saga import (
RoutingSlip, WorkItem, WorkItemArguments, ParallelActivity
)
# T1 → (T2a,T2b || T3) → T4
routing_slip = RoutingSlip([
WorkItem(ReserveFlightActivity, WorkItemArguments({"destination": "DUS"})),
WorkItem(ParallelActivity, WorkItemArguments({
"branches": [
RoutingSlip([
WorkItem(ReserveHotelActivity, WorkItemArguments({"room": "Suite"})),
WorkItem(ConfirmHotelActivity, WorkItemArguments({})),
]),
RoutingSlip([
WorkItem(ReserveCarActivity, WorkItemArguments({"type": "Compact"})),
]),
]
})),
WorkItem(SendConfirmationActivity, WorkItemArguments({})),
])
# Process the saga
while not routing_slip.is_completed:
if not await routing_slip.process_next():
while routing_slip.is_in_progress:
await routing_slip.undo_last()
break
Behavior:
Each branch is a full RoutingSlip (multi-step saga)
All branches execute concurrently
Fail-fast: On first failure, all branches are compensated
Compensation: All branches compensated in parallel (reverse order within each)
Recovery Blocks (Fallback)¶
Based on Section 6 of the original paper, FallbackActivity tries alternative RoutingSlips until one succeeds:
from ascetic_ddd.saga import (
RoutingSlip, WorkItem, WorkItemArguments, FallbackActivity
)
# Try primary payment flow, if fails try backup flow
routing_slip = RoutingSlip([
WorkItem(PrepareOrderActivity, WorkItemArguments({})),
WorkItem(FallbackActivity, WorkItemArguments({
"alternatives": [
RoutingSlip([
WorkItem(PrimaryPaymentActivity, WorkItemArguments({"amount": 100})),
WorkItem(ConfirmPaymentActivity, WorkItemArguments({})),
]),
RoutingSlip([
WorkItem(BackupPaymentActivity, WorkItemArguments({"amount": 100})),
]),
]
})),
WorkItem(ShipOrderActivity, WorkItemArguments({})),
])
Behavior:
Each alternative is a full RoutingSlip (multi-step saga)
Tries each alternative in order
If alternative fails, it compensates itself before trying next
Only the successful alternative needs compensation later
If all alternatives fail, returns
None(saga can compensate previous steps)
Combining Parallel and Fallback¶
Activities can be nested for complex scenarios:
# Flight → (Hotel saga || Car with fallback) → Confirmation
routing_slip = RoutingSlip([
WorkItem(ReserveFlightActivity, WorkItemArguments({"dest": "DUS"})),
WorkItem(ParallelActivity, WorkItemArguments({
"branches": [
# Branch 1: Hotel saga
RoutingSlip([
WorkItem(ReserveHotelActivity, WorkItemArguments({"room": "Suite"})),
WorkItem(ConfirmHotelActivity, WorkItemArguments({})),
]),
# Branch 2: Car with fallback providers
RoutingSlip([
WorkItem(FallbackActivity, WorkItemArguments({
"alternatives": [
RoutingSlip([WorkItem(ReserveHertzActivity, WorkItemArguments({}))]),
RoutingSlip([WorkItem(ReserveAvisActivity, WorkItemArguments({}))]),
]
})),
]),
]
})),
WorkItem(SendConfirmationActivity, WorkItemArguments({})),
])
Distributed Execution¶
For distributed systems, use ActivityHost with message queues:
from ascetic_ddd.saga import ActivityHost, RoutingSlip
# Each service hosts its own activities
car_host = ActivityHost(ReserveCarActivity, send_message)
hotel_host = ActivityHost(ReserveHotelActivity, send_message)
flight_host = ActivityHost(ReserveFlightActivity, send_message)
hosts = [car_host, hotel_host, flight_host]
async def send_message(uri: str, routing_slip: RoutingSlip):
# In production: serialize and send to message queue
for host in hosts:
if await host.accept_message(uri, routing_slip):
break
# Start the saga
await send_message(routing_slip.progress_uri, routing_slip)
RoutingSlip Serialization¶
To run a saga across services, the RoutingSlip must travel with the message that
hands work off to the next participant. RoutingSlip references activity classes
directly, which are not JSON-serializable, so an additional indirection is needed:
each side of the wire keeps a registry that translates between activity classes
and their canonical names (strings).
Overview¶
The serialization mechanism follows an interface-based approach with dependency
injection. There is no global registry: each ActivityTypeResolver instance is
independent, which prevents state pollution across tests and across services that
should not know about each other’s activities.
Components¶
1. ActivityTypeResolver¶
Abstract base class providing bidirectional mapping between activity type names and activity classes:
from abc import ABCMeta, abstractmethod
class ActivityTypeResolver(metaclass=ABCMeta):
@abstractmethod
def resolve(self, type_name: str) -> type[Activity]: ...
@abstractmethod
def get_name(self, activity_type: type[Activity]) -> str: ...
2. MapBasedResolver¶
Default in-memory implementation backed by a pair of dicts (name → type,
type → name):
from ascetic_ddd.saga import MapBasedResolver
resolver = MapBasedResolver()
resolver.register("ReserveCarActivity", ReserveCarActivity)
resolver.register("ReserveHotelActivity", ReserveHotelActivity)
3. NamedActivity Protocol (Optional)¶
Activities can optionally implement the NamedActivity protocol to expose their
canonical name:
from ascetic_ddd.saga import Activity
class ReserveCarActivity(Activity):
def type_name(self) -> str:
return "ReserveCarActivity"
# ... do_work, compensate, work_item_queue_address, compensation_queue_address
Benefits:
Enables fallback name resolution in
get_name()even without explicit registrationDeserialization still requires registration (no name → type mapping otherwise)
Makes activity type names self-documenting on the class itself
NamedActivity is a runtime_checkable Protocol, so any class that defines a
type_name() method satisfies it structurally — no inheritance required.
Basic Usage¶
Step 1: Create a Resolver and Register Activities¶
from ascetic_ddd.saga import MapBasedResolver
resolver = MapBasedResolver()
resolver.register("ReserveCarActivity", ReserveCarActivity)
resolver.register("ReserveHotelActivity", ReserveHotelActivity)
resolver.register("ReserveFlightActivity", ReserveFlightActivity)
Step 2: Serialize RoutingSlip¶
import json
from ascetic_ddd.saga import (
RoutingSlip, WorkItem, WorkItemArguments, to_serializable,
)
routing_slip = RoutingSlip([
WorkItem(ReserveCarActivity, WorkItemArguments({"vehicleType": "SUV"})),
WorkItem(ReserveHotelActivity, WorkItemArguments({"roomType": "Suite"})),
])
await routing_slip.process_next()
serializable = to_serializable(routing_slip, resolver)
wire = json.dumps(serializable.to_dict())
# Pseudocode -- substitute your message bus client here.
await bus.publish(session, "saga/routing-slip", wire)
Step 3: Deserialize RoutingSlip¶
import json
from ascetic_ddd.saga import (
SerializableRoutingSlip, from_serializable,
)
# Pseudocode -- substitute your message bus client here.
wire = await bus.receive("saga/routing-slip")
serializable = SerializableRoutingSlip.from_dict(json.loads(wire))
routing_slip = from_serializable(serializable, resolver)
await routing_slip.process_next()
Wire Format¶
SerializableRoutingSlip.to_dict() produces JSON-compatible dicts with camelCase
keys, identical to the format produced by the Go implementation. This allows mixed
Python/Go services to participate in the same distributed saga.
{
"completedWorkLogs": [
{
"activityTypeName": "ReserveCarActivity",
"result": {
"reservationId": 12345
}
}
],
"nextWorkItems": [
{
"activityTypeName": "ReserveHotelActivity",
"arguments": {
"roomType": "Suite",
"checkInDate": "2024-01-15"
}
},
{
"activityTypeName": "ReserveFlightActivity",
"arguments": {
"destination": "LAX",
"flightDate": "2024-01-15"
}
}
]
}
Advanced Patterns¶
Multiple Resolvers for Different Services¶
Service-specific resolvers limit which activities each service can deserialize — useful as a defense-in-depth measure:
# Orchestrator knows every activity
orchestrator_resolver = MapBasedResolver()
orchestrator_resolver.register("ReserveCarActivity", ReserveCarActivity)
orchestrator_resolver.register("ReserveHotelActivity", ReserveHotelActivity)
orchestrator_resolver.register("ReserveFlightActivity", ReserveFlightActivity)
# Car service only knows car activities
car_service_resolver = MapBasedResolver()
car_service_resolver.register("ReserveCarActivity", ReserveCarActivity)
# Hotel service only knows hotel activities
hotel_service_resolver = MapBasedResolver()
hotel_service_resolver.register("ReserveHotelActivity", ReserveHotelActivity)
Compensation Serialization¶
The same serialization works for the backward path:
serializable = to_serializable(routing_slip, resolver)
wire = json.dumps(serializable.to_dict())
# Pseudocode -- substitute your message bus client here.
await bus.publish(session, routing_slip.compensation_uri, wire)
# On the compensation service:
serializable = SerializableRoutingSlip.from_dict(json.loads(wire))
routing_slip = from_serializable(serializable, resolver)
while routing_slip.is_in_progress:
await routing_slip.undo_last()
Testing with Isolated Resolvers¶
Each test creates its own resolver — no global state to clean up:
class MySagaTestCase(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.resolver = MapBasedResolver()
self.resolver.register("TestActivity", TestActivity)
Design Rationale¶
Why Not a Global Registry?¶
The interface-based approach was preferred over a global module-level registry because:
No global state — each resolver is independent, preventing pollution between tests and between services.
Better testability — tests construct isolated resolvers with no cleanup.
Explicit dependencies — resolvers are passed explicitly, making the dependency on the registry visible at the call site.
Service isolation — different services can have different resolver configurations without shared mutable state.
Thread safety — no need for locks on a global mutable map (though
MapBasedResolveritself is also not thread-safe; register everything at startup).
Trade-offs¶
Pros: clean dependency injection, isolation, no global state.
Cons: slightly more verbose (the resolver must be passed explicitly), and a resolver is required on both ends of the wire.
Error Handling¶
Unregistered Activity (Deserialization)¶
from_serializable(serializable, resolver)
# KeyError: activity type not registered: UnknownActivity
Solution: register all expected activities before deserializing.
Unregistered Activity Without NamedActivity (Serialization)¶
to_serializable(routing_slip, resolver)
# KeyError: activity type not registered: AnonymousActivity
Solution: register the activity, or implement type_name() on the class.
Best Practices¶
Register activities at module-load time:
def make_resolver() -> MapBasedResolver: resolver = MapBasedResolver() resolver.register("Activity1", Activity1) resolver.register("Activity2", Activity2) return resolverImplement
type_name()on every activity so serialization keeps working even if a registration is forgotten:class MyActivity(Activity): def type_name(self) -> str: return "MyActivity"Use descriptive names rather than abbreviations — they end up on the wire and in logs:
resolver.register("ReserveCarActivity", ReserveCarActivity) # not: resolver.register("car", ReserveCarActivity)Share resolver configuration through a single factory function so every service spins up an identically configured resolver.
Test round-trip serialization to catch missing registrations and non-JSON-safe arguments early. A complete, executable end-to-end demo covering both the forward path (with a mid-saga handoff) and the compensation path is shipped with the library:
Module:
ascetic_ddd/saga/examples/serialization_example.pyRun it:
python -m ascetic_ddd.saga.examples.serialization_exampleImportable entry points:
make_orchestrator_resolver(),run_travel_booking_with_serialization(),run_compensation_with_serialization()
References¶
Sagas - Clemens Vasters
Sagas (Original Paper) - Garcia-Molina & Salem, 1987