Source code for ascetic_ddd.seedwork.domain.aggregate.event_meta
import datetime
import uuid
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from ascetic_ddd.seedwork.domain.aggregate.causal_dependency import CausalDependency
__all__ = (
"EventMeta",
"IEventMetaExporter",
)
[docs]
@dataclass(frozen=True)
class EventMeta:
event_id: uuid.UUID | None = None
causation_id: uuid.UUID | None = None
correlation_id: uuid.UUID | None = None
reason: str | None = None
occurred_at: datetime.datetime | None = None
causal_dependencies: tuple[CausalDependency] = ()
[docs]
def export(self, exporter: "IEventMetaExporter") -> None:
exporter.set_event_id(self.event_id)
exporter.set_causation_id(self.causation_id)
exporter.set_correlation_id(self.correlation_id)
exporter.set_reason(self.reason)
exporter.set_occurred_at(self.occurred_at)
for causal_dependency in self.causal_dependencies:
exporter.add_causal_dependency(causal_dependency)
[docs]
class IEventMetaExporter(metaclass=ABCMeta):
[docs]
@abstractmethod
def set_event_id(self, value: uuid.UUID | None) -> None:
raise NotImplementedError
[docs]
@abstractmethod
def set_causation_id(self, value: uuid.UUID | None) -> None:
raise NotImplementedError
[docs]
@abstractmethod
def set_correlation_id(self, value: uuid.UUID | None) -> None:
raise NotImplementedError
[docs]
@abstractmethod
def set_occurred_at(self, value: datetime.datetime | None) -> None:
raise NotImplementedError
[docs]
@abstractmethod
def add_causal_dependency(self, value: CausalDependency) -> None:
raise NotImplementedError