Source code for ascetic_ddd.seedwork.domain.aggregate.causal_dependency_exporter

import typing

from ascetic_ddd.seedwork.domain.aggregate.causal_dependency import ICausalDependencyExporter

__all__ = ("CausalDependencyExporter",)


[docs] class CausalDependencyExporter(ICausalDependencyExporter): def __init__(self) -> None: self.data = {}
[docs] def set_stream_id(self, value: typing.Any) -> None: self.data["aggregate_id"] = value
[docs] def set_stream_type(self, value: str) -> None: self.data["aggregate_type"] = value
[docs] def set_stream_position(self, value: int) -> None: self.data["aggregate_version"] = value