Source code for ascetic_ddd.seedwork.domain.aggregate.event_sourced_aggregate

import dataclasses
import typing
from abc import ABCMeta

from ascetic_ddd.seedwork.domain.aggregate import PersistentDomainEvent
from ascetic_ddd.seedwork.domain.aggregate.eventive_entity import EventiveEntity
from ascetic_ddd.seedwork.domain.aggregate.interfaces import IEventSourcedAggregate
from ascetic_ddd.seedwork.domain.aggregate.versioned_aggregate import VersionedAggregate

__all__ = ("EventSourcedAggregate",)

IPDE = typing.TypeVar("IPDE", covariant=True)


[docs] class EventSourcedAggregate( typing.Generic[IPDE], EventiveEntity[IPDE], VersionedAggregate, IEventSourcedAggregate[IPDE], metaclass=ABCMeta, ):
[docs] class Handlers(dict):
[docs] def register(self, event_type: type[IPDE]): def do_register(handler: typing.Callable[["EventSourcedAggregate", IPDE], None]): self[event_type] = handler return handler return do_register
_handlers = Handlers() def __init__(self, **kwargs) -> None: super().__init__(**kwargs) def _load_from(self, past_events: typing.Iterable[IPDE]) -> None: for event in past_events: self.version = event.aggregate_version self._handlers[type(event)](self, event) def _update(self, event: IPDE) -> None: event = dataclasses.replace(event, aggregate_version=self.next_version()) self._add_domain_event(event) self._handlers[type(event)](self, event)
[docs] @classmethod def fold(cls, past_events: typing.Iterable[PersistentDomainEvent]): """ Or reduce. """ agg: typing.Self = cls.make_empty() agg._load_from(past_events) return agg