Source code for ascetic_ddd.seedwork.domain.aggregate.causal_dependency
import typing
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
__all__ = (
"CausalDependency",
"ICausalDependencyExporter",
)
[docs]
@dataclass(frozen=True)
class CausalDependency:
"""
This is enough to extract aggregate with required version from repository.
And this is enough to check causal dependencies in Inbox.
"""
tenant_id: typing.Any # aggregate.id.tenant_id
stream_id: typing.Any # aggregate.id.internal_id
stream_type: str # bounded_context_name.aggregate_name
stream_position: int # aggregate.version
[docs]
def export(self, exporter: "ICausalDependencyExporter") -> None:
exporter.set_tenant_id(self.tenant_id)
exporter.set_stream_id(self.stream_id)
exporter.set_stream_type(self.stream_type)
exporter.set_stream_position(self.stream_position)
[docs]
class ICausalDependencyExporter(metaclass=ABCMeta):
[docs]
@abstractmethod
def set_tenant_id(self, value: typing.Any) -> None:
raise NotImplementedError
[docs]
@abstractmethod
def set_stream_id(self, value: typing.Any) -> None:
raise NotImplementedError
[docs]
@abstractmethod
def set_stream_type(self, value: str) -> None:
raise NotImplementedError
[docs]
@abstractmethod
def set_stream_position(self, value: int) -> None:
raise NotImplementedError