Source code for ascetic_ddd.seedwork.domain.aggregate.event_meta_exporter

import datetime
import typing
import uuid

from ascetic_ddd.seedwork.domain.aggregate.causal_dependency import CausalDependency
from ascetic_ddd.seedwork.domain.aggregate.causal_dependency_exporter import CausalDependencyExporter
from ascetic_ddd.seedwork.domain.aggregate.event_meta import IEventMetaExporter

__all__ = ("EventMetaExporter",)


[docs] class EventMetaExporter(IEventMetaExporter): def __init__(self) -> None: self.data: dict[str, typing.Any] = { "causal_dependencies": [], }
[docs] def set_event_id(self, value: uuid.UUID | None) -> None: self.data["event_id"] = value
[docs] def set_causation_id(self, value: uuid.UUID | None) -> None: self.data["causation_id"] = value
[docs] def set_correlation_id(self, value: uuid.UUID | None) -> None: self.data["correlation_id"] = value
[docs] def set_reason(self, value: str | None) -> None: self.data["reason"] = value
[docs] def set_occurred_at(self, value: datetime.datetime | None) -> None: self.data["occurred_at"] = value
[docs] def add_causal_dependency(self, value: CausalDependency) -> None: ex = CausalDependencyExporter() value.export(ex) self.data["causal_dependencies"].append(ex.data)