Source code for ascetic_ddd.seedwork.domain.aggregate.causal_dependency_exporter
import typing
from ascetic_ddd.seedwork.domain.aggregate.causal_dependency import ICausalDependencyExporter
__all__ = ("CausalDependencyExporter",)
[docs]
class CausalDependencyExporter(ICausalDependencyExporter):
def __init__(self) -> None:
self.data = {}
[docs]
def set_stream_id(self, value: typing.Any) -> None:
self.data["aggregate_id"] = value
[docs]
def set_stream_type(self, value: str) -> None:
self.data["aggregate_type"] = value
[docs]
def set_stream_position(self, value: int) -> None:
self.data["aggregate_version"] = value