Source code for ascetic_ddd.seedwork.domain.aggregate.persistent_domain_event

from abc import ABCMeta, abstractmethod
from dataclasses import dataclass

from ascetic_ddd.seedwork.domain.aggregate.domain_event import DomainEvent
from ascetic_ddd.seedwork.domain.aggregate.event_meta import EventMeta

__all__ = (
    "PersistentDomainEvent",
    "IPersistentDomainEventExporter",
)


[docs] @dataclass(frozen=True, kw_only=True) class PersistentDomainEvent(DomainEvent): event_version: int = 1 event_meta: EventMeta | None = None aggregate_version: int = 0 # occurred_at: datetime.datetime = None # for partitioning? # Where would this value be known at the domain level? Let it remain in Meta. @property def event_type(self): return type(self).__name__
[docs] def export(self, exporter: "IPersistentDomainEventExporter") -> None: exporter.set_event_type(self.event_type) exporter.set_event_version(self.event_version) exporter.set_event_meta(self.event_meta) exporter.set_aggregate_version(self.aggregate_version)
[docs] class IPersistentDomainEventExporter(metaclass=ABCMeta):
[docs] @abstractmethod def set_event_type(self, value: str) -> None: raise NotImplementedError
[docs] @abstractmethod def set_event_version(self, value: int) -> None: raise NotImplementedError
[docs] @abstractmethod def set_event_meta(self, meta: EventMeta) -> None: raise NotImplementedError
[docs] @abstractmethod def set_aggregate_version(self, value: int) -> None: raise NotImplementedError