Source code for ascetic_ddd.seedwork.domain.aggregate.event_meta_exporter
import datetime
import typing
import uuid
from ascetic_ddd.seedwork.domain.aggregate.causal_dependency import CausalDependency
from ascetic_ddd.seedwork.domain.aggregate.causal_dependency_exporter import CausalDependencyExporter
from ascetic_ddd.seedwork.domain.aggregate.event_meta import IEventMetaExporter
__all__ = ("EventMetaExporter",)
[docs]
class EventMetaExporter(IEventMetaExporter):
def __init__(self) -> None:
self.data: dict[str, typing.Any] = {
"causal_dependencies": [],
}
[docs]
def set_causation_id(self, value: uuid.UUID | None) -> None:
self.data["causation_id"] = value
[docs]
def set_correlation_id(self, value: uuid.UUID | None) -> None:
self.data["correlation_id"] = value
[docs]
def set_occurred_at(self, value: datetime.datetime | None) -> None:
self.data["occurred_at"] = value
[docs]
def add_causal_dependency(self, value: CausalDependency) -> None:
ex = CausalDependencyExporter()
value.export(ex)
self.data["causal_dependencies"].append(ex.data)