domain.model¶
The domain model package contains classes and functions that can help develop an event sourced domain model.
events¶
Base classes for domain events of different kinds.
-
class
eventsourcing.domain.model.events.
DomainEvent
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.versioning.Upcastable
,eventsourcing.whitehead.ActualOccasion
,typing.Generic
Base class for domain model events.
Implements methods to make instances read-only, comparable for equality in Python, and have recognisable representations.Custom To make domain events hashable, this class also implements a method to create a cryptographic hash of the state of the event.
-
__repr__
() → str[source]¶ Creates a string representing the type and attribute values of the event.
Return type: str
-
__mutate__
(obj: Optional[TEntity]) → Optional[TEntity][source]¶ Updates ‘obj’ with values from ‘self’.
Calls the ‘mutate()’ method.
Can be extended, but subclasses must call super and return an object to their caller.
Parameters: obj – object (normally a domain entity) to be mutated Returns: mutated object
-
mutate
(obj: TEntity) → None[source]¶ Updates (“mutates”) given ‘obj’.
Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).
The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.
Parameters: obj – domain entity to be mutated
-
__setattr__
(key: Any, value: Any) → None[source]¶ Inhibits event attributes from being updated by assignment.
-
__hash__
() → int[source]¶ Computes a Python integer hash for an event.
Supports Python equality and inequality comparisons.
Returns: Python integer hash Return type: int
-
-
class
eventsourcing.domain.model.events.
EventWithHash
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Base class for domain events with a cryptographic event hash.
Extends DomainEvent by setting a cryptographic event hash when the event is originated, and checking the event hash whenever its default projection mutates an object.
-
__event_hash__
¶ Returns SHA-256 hash of the original state of the event.
Returns: SHA-256 as hexadecimal string. Return type: str
-
__hash__
() → int[source]¶ Computes a Python integer hash for an event, using its pre-computed event hash.
Supports Python equality and inequality comparisons only.
Returns: Python integer hash Return type: int
-
-
class
eventsourcing.domain.model.events.
EventWithOriginatorID
(originator_id: uuid.UUID, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have an originator ID.
-
__init__
(originator_id: uuid.UUID, **kwargs)[source]¶ Initialises event attribute values directly from constructor kwargs.
-
originator_id
¶ Originator ID is the identity of the object that originated this event.
Returns: A UUID representing the identity of the originator. Return type: UUID
-
-
class
eventsourcing.domain.model.events.
EventWithTimestamp
(timestamp: Optional[decimal.Decimal] = None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have a timestamp value.
-
__init__
(timestamp: Optional[decimal.Decimal] = None, **kwargs)[source]¶ Initialises event attribute values directly from constructor kwargs.
-
timestamp
¶ A UNIX timestamp as a Decimal object.
-
-
class
eventsourcing.domain.model.events.
EventWithOriginatorVersion
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have an originator version number.
-
__init__
(originator_version: int, **kwargs)[source]¶ Initialises event attribute values directly from constructor kwargs.
-
originator_version
¶ Originator version is the version of the object that originated this event.
Returns: A integer representing the version of the originator.
-
-
class
eventsourcing.domain.model.events.
EventWithTimeuuid
(event_id: Optional[uuid.UUID] = None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have an UUIDv1 event ID.
-
class
eventsourcing.domain.model.events.
CreatedEvent
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Happens when something is created.
-
class
eventsourcing.domain.model.events.
AttributeChangedEvent
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Happens when the value of an attribute changes.
-
class
eventsourcing.domain.model.events.
DiscardedEvent
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Happens when something is discarded.
-
class
eventsourcing.domain.model.events.
LoggedEvent
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Happens when something is logged.
-
eventsourcing.domain.model.events.
subscribe
(handler: Callable[[Sequence[TEvent]], None], predicate: Optional[Callable[[Sequence[TEvent]], bool]] = None) → None[source]¶ Adds ‘handler’ to list of event handlers to be called if ‘predicate’ is satisfied.
If predicate is None, the handler will be called whenever an event is published.
Parameters: - handler (callable) – Will be called when an event is published.
- predicate (callable) – Conditions whether the handler will be called.
-
eventsourcing.domain.model.events.
unsubscribe
(handler: Callable[[Sequence[TEvent]], None], predicate: Optional[Callable[[Sequence[TEvent]], bool]] = None) → None[source]¶ Removes ‘handler’ from list of event handlers to be called if ‘predicate’ is satisfied.
Parameters: - handler (callable) – Previously subscribed handler.
- predicate (callable) – Previously subscribed predicate.
-
eventsourcing.domain.model.events.
publish
(events: Sequence[TEvent]) → None[source]¶ Published given ‘event’ by calling subscribed event handlers with the given ‘event’, except those with predicates that are not satisfied by the event.
Handlers are called in the order they are subscribed.
Parameters: events (DomainEvent) – Domain event to be published.
-
eventsourcing.domain.model.events.
assert_event_handlers_empty
() → None[source]¶ Raises EventHandlersNotEmptyError, unless there are no event handlers subscribed.
-
eventsourcing.domain.model.events.
clear_event_handlers
() → None[source]¶ Removes all previously subscribed event handlers.
-
class
eventsourcing.domain.model.events.
AbstractSnapshot
[source]¶ Bases:
eventsourcing.whitehead.ActualOccasion
-
topic
¶ Path to the class of the snapshotted entity.
-
state
¶ State of the snapshotted entity.
-
originator_id
¶ ID of the snapshotted entity.
-
originator_version
¶ Version of the last event applied to the entity.
-
entity¶
Base classes for domain model entities.
-
class
eventsourcing.domain.model.entity.
MetaDomainEntity
(name: str, *args, **kwargs)[source]¶ Bases:
abc.ABCMeta
-
class
eventsourcing.domain.model.entity.
DomainEntity
(id: uuid.UUID)[source]¶ Bases:
eventsourcing.domain.model.versioning.Upcastable
,eventsourcing.whitehead.EnduringObject
Supertype for domain model entity.
-
class
Event
(originator_id: uuid.UUID, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithOriginatorID
Supertype for events of domain model entities.
-
__check_obj__
(obj: TDomainEntity) → None[source]¶ Checks state of obj before mutating.
Parameters: obj – Domain entity to be checked. Raises: OriginatorIDError – if the originator_id is mismatched
-
-
classmethod
__create__
(originator_id: Optional[uuid.UUID] = None, event_class: Optional[Type[DomainEntity.Created[TDomainEntity]]] = None, **kwargs) → TDomainEntity[source]¶ Creates a new domain entity.
Constructs a “created” event, constructs the entity object from the event, publishes the “created” event, and returns the new domain entity object.
Parameters: - DomainEntity (cls) – Class of domain event
- originator_id – ID of the new domain entity (defaults to
uuid4()
). - event_class – Domain event class to be used for the “created” event.
- kwargs – Other named attribute values of the “created” event.
Returns: New domain entity object.
Return type:
-
class
Created
(originator_topic: str, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.CreatedEvent
,eventsourcing.domain.model.entity.Event
Triggered when an entity is created.
-
originator_topic
¶ Topic (a string) representing the class of the originating domain entity.
Return type: str
-
-
id
¶ The immutable ID of the domain entity.
This value is set using the
originator_id
of the “created” event constructed by__create__()
.An entity ID allows an instance to be referenced and distinguished from others, even though its state may change over time.
This attribute has the normal “public” format for a Python object attribute name, because by definition all domain entities have an ID.
-
__change_attribute__
(name: str, value: Any, **kwargs) → None[source]¶ Changes named attribute with the given value, by triggering an AttributeChanged event.
-
class
AttributeChanged
(originator_id: uuid.UUID, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.events.AttributeChangedEvent
Triggered when a named attribute is assigned a new value.
-
__mutate__
(obj: Optional[TDomainEntity]) → Optional[TDomainEntity][source]¶ Updates ‘obj’ with values from ‘self’.
Calls the ‘mutate()’ method.
Can be extended, but subclasses must call super and return an object to their caller.
Parameters: obj – object (normally a domain entity) to be mutated Returns: mutated object
-
-
class
Discarded
(originator_id: uuid.UUID, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DiscardedEvent
,eventsourcing.domain.model.entity.Event
Triggered when a DomainEntity is discarded.
-
__mutate__
(obj: Optional[TDomainEntity]) → Optional[TDomainEntity][source]¶ Updates ‘obj’ with values from ‘self’.
Calls the ‘mutate()’ method.
Can be extended, but subclasses must call super and return an object to their caller.
Parameters: obj – object (normally a domain entity) to be mutated Returns: mutated object
-
-
__assert_not_discarded__
() → None[source]¶ Asserts that this entity has not been discarded.
Raises EntityIsDiscarded exception if entity has been discarded already.
-
__trigger_event__
(event_class: Type[TDomainEvent], **kwargs) → None[source]¶ Constructs, applies, and publishes a domain event.
-
__mutate__
(event: TDomainEvent) → None[source]¶ Mutates this entity with the given event.
This method calls on the event object to mutate this entity, because the mutation behaviour of different types of events was usefully factored onto the event classes, and the event mutate() method is the most convenient way to defined behaviour in domain models.
However, as an alternative to implementing the mutate() method on domain model events, this method can be extended with a method that is capable of mutating an entity for all the domain event classes introduced by the entity class.
Similarly, this method can be overridden entirely in subclasses, so long as all of the mutation behaviour is implemented in the mutator function, including the mutation behaviour of the events defined on the library event classes that would no longer be invoked.
However, if the entity class defines a mutator function, or if a separate mutator function is used, then it must be involved in the event sourced repository used to replay events, which by default knows nothing about the domain entity class. In practice, this means having a repository for each kind of entity, rather than the application just having one repository, with each repository having a mutator function that can project the entity events into an entity.
-
__publish__
(event: Sequence[TDomainEvent]) → None[source]¶ Publishes given event for subscribers in the application.
Parameters: event – domain event or list of events
-
class
-
class
eventsourcing.domain.model.entity.
EntityWithHashchain
(*args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.DomainEntity
-
class
Event
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithHash
,eventsourcing.domain.model.entity.Event
Supertype for events of domain entities.
-
class
Created
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.Created
-
class
AttributeChanged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.AttributeChanged
-
class
Discarded
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.Discarded
-
classmethod
__create__
(originator_id: Optional[uuid.UUID] = None, event_class: Optional[Type[DomainEntity.Created[TEntityWithHashchain]]] = None, **kwargs) → TEntityWithHashchain[source]¶ Creates a new domain entity.
Constructs a “created” event, constructs the entity object from the event, publishes the “created” event, and returns the new domain entity object.
Parameters: - DomainEntity (cls) – Class of domain event
- originator_id – ID of the new domain entity (defaults to
uuid4()
). - event_class – Domain event class to be used for the “created” event.
- kwargs – Other named attribute values of the “created” event.
Returns: New domain entity object.
Return type:
-
class
-
class
eventsourcing.domain.model.entity.
VersionedEntity
(__version__: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.DomainEntity
-
__trigger_event__
(event_class: Type[TDomainEvent], **kwargs) → None[source]¶ Increments the version number when an event is triggered.
The event carries the version number that the originator will have when the originator is mutated with this event. (The event’s “originator” version isn’t the version of the originator before the event was triggered, but represents the result of the work of incrementing the version, which is then set in the event as normal. The Created event has version 0, and a newly created instance is at version 0. The second event has originator version 1, and so will the originator when the second event has been applied.
-
class
Event
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithOriginatorVersion
,eventsourcing.domain.model.entity.Event
Supertype for events of versioned entities.
-
__mutate__
(obj: Optional[TVersionedEntity]) → Optional[TVersionedEntity][source]¶ Updates ‘obj’ with values from ‘self’.
Calls the ‘mutate()’ method.
Can be extended, but subclasses must call super and return an object to their caller.
Parameters: obj – object (normally a domain entity) to be mutated Returns: mutated object
-
-
class
Created
(originator_version: int = 0, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a VersionedEntity is created.
-
-
class
eventsourcing.domain.model.entity.
EntityWithECC
(*, event_id, correlation_id, causation_id, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.DomainEntity
Entity whose events have event ID, correlation ID, and causation ID.
-
class
Event
(*, processed_event=None, application_name, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
-
class
Created
(originator_topic: str, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
-
class
-
class
eventsourcing.domain.model.entity.
TimestampedEntity
(__created_on__: decimal.Decimal, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.DomainEntity
-
class
Event
(originator_id: uuid.UUID, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.events.EventWithTimestamp
Supertype for events of timestamped entities.
-
class
Created
(originator_topic: str, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a TimestampedEntity is created.
-
class
-
class
eventsourcing.domain.model.entity.
TimestampedVersionedEntity
(__created_on__: decimal.Decimal, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedEntity
,eventsourcing.domain.model.entity.VersionedEntity
-
class
Event
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.Event
Supertype for events of timestamped, versioned entities.
-
class
Created
(originator_version: int = 0, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a TimestampedVersionedEntity is created.
-
class
-
class
eventsourcing.domain.model.entity.
TimeuuidedVersionedEntity
(event_id: uuid.UUID, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimeuuidedEntity
,eventsourcing.domain.model.entity.VersionedEntity
aggregate¶
Base classes for aggregates in a domain driven design.
-
class
eventsourcing.domain.model.aggregate.
BaseAggregateRoot
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
,typing.Generic
Root entity for an aggregate in a domain driven design.
-
class
Event
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
Supertype for base aggregate root events.
-
class
Created
(originator_version: int = 0, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.aggregate.Event
Triggered when an aggregate root is created.
-
class
AttributeChanged
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.entity.AttributeChanged
Triggered when an aggregate root attribute is changed.
-
class
Discarded
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.entity.Discarded
Triggered when an aggregate root is discarded.
-
class
-
class
eventsourcing.domain.model.aggregate.
AggregateRootWithHashchainedEvents
(*args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.EntityWithHashchain
,eventsourcing.domain.model.aggregate.BaseAggregateRoot
Extends aggregate root base class with hash-chained events.
-
class
Event
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.aggregate.Event
Supertype for aggregate events.
-
class
Created
(originator_version: int = 0, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.aggregate.Created
,eventsourcing.domain.model.aggregate.Event
Triggered when an aggregate root is created.
-
class
-
class
eventsourcing.domain.model.aggregate.
AggregateRoot
(*args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents
Original name for aggregate root base class with hash-chained events.
-
class
Event
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
Supertype for aggregate events.
-
class
Created
(originator_version: int = 0, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.aggregate.Created
Triggered when an aggregate root is created.
-
class
command¶
Commands as aggregates.
-
class
eventsourcing.domain.model.command.
Command
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.BaseAggregateRoot
-
class
Event
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
-
class
Created
(originator_version: int = 0, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.command.Event
,eventsourcing.domain.model.aggregate.Created
-
class
AttributeChanged
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.command.Event
,eventsourcing.domain.model.aggregate.AttributeChanged
-
class
Discarded
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.command.Event
,eventsourcing.domain.model.aggregate.Discarded
-
class
Done
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.command.Event
-
mutate
(obj: eventsourcing.domain.model.command.Command) → None[source]¶ Updates (“mutates”) given ‘obj’.
Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).
The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.
Parameters: obj – domain entity to be mutated
-
-
class
decorators¶
Decorators useful in domain models based on the classes in this library.
-
eventsourcing.domain.model.decorators.
subscribe_to
(*args) → Callable[source]¶ Decorator for making a custom event handler function subscribe to a certain class of event.
The decorated function will be called once for each matching event that is published, and will be given one argument, the event, when it is called. If events are published in lists, for example the AggregateRoot publishes a list of pending events when its __save__() method is called, then the decorated function will be called once for each event that is an instance of the given event_class.
Please note, this decorator isn’t suitable for use with object class methods. The decorator receives in Python 3 an unbound function, and defines a handler which it subscribes that calls the decorated function for each matching event. However the method isn’t called on the object, so the object instance is never available in the decorator, so the decorator can’t call a normal object method because it doesn’t have a value for ‘self’.
Parameters: event_class – type used to match published events, an event matches if it is an instance of this type. The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object.
@subscribe_to(Todo.Created) def new_todo_projection(event): todo = TodoProjection(id=event.originator_id, title=event.title) todo.save()
-
eventsourcing.domain.model.decorators.
mutator
(arg: Optional[Callable] = None) → Callable[source]¶ Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event.
It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here.
The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object.
class Entity(object): class Created(object): pass @mutator(Entity) def mutate(initial, event): raise NotImplementedError(type(event)) @mutate.register(Entity.Created) def _(initial, event): return initial(**event.__dict__) entity = mutate(None, Entity.Created())
-
eventsourcing.domain.model.decorators.
attribute
(getter: Callable) → property[source]¶ When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method __change_attribute__(), which publishes an AttributeChanged event.
-
eventsourcing.domain.model.decorators.
retry
(exc: Union[Type[Exception], Sequence[Type[Exception]]] = <class 'Exception'>, max_attempts: int = 1, wait: float = 0, stall: float = 0, verbose: bool = False) → Callable[source]¶ Retry decorator.
Parameters: - exc – List of exceptions that will cause the call to be retried if raised.
- max_attempts – Maximum number of attempts to try.
- wait – Amount of time to wait before retrying after an exception.
- stall – Amount of time to wait before the first attempt.
- verbose – If True, prints a message to STDOUT when retries occur.
Returns: Returns the value returned by decorated function.
-
eventsourcing.domain.model.decorators.
subclassevents
(cls: type) → type[source]¶ Decorator that avoids “boilerplate” subclassing of domain events.
For example, with this decorator you can do this:
@subclassevents class Example(AggregateRoot): class SomethingHappened(DomainEvent): pass
rather than this:
class Example(AggregateRoot): class Event(AggregateRoot.Event): pass class Created(Event, AggregateRoot.Created): pass class Discarded(Event, AggregateRoot.Discarded): pass class AttributeChanged(Event, AggregateRoot.AttributeChanged): pass class SomethingHappened(Event): pass
You can apply this to a tree of domain event classes by defining the base class with attribute ‘subclassevents = True’.
snapshot¶
Snapshotting is implemented in the domain layer as an event.
-
class
eventsourcing.domain.model.snapshot.
Snapshot
(originator_id: uuid.UUID, originator_version: int, topic: str, state: Optional[Dict[KT, VT]])[source]¶ Bases:
eventsourcing.domain.model.events.EventWithTimestamp
,eventsourcing.domain.model.events.EventWithOriginatorVersion
,eventsourcing.domain.model.events.EventWithOriginatorID
,eventsourcing.domain.model.events.AbstractSnapshot
-
__init__
(originator_id: uuid.UUID, originator_version: int, topic: str, state: Optional[Dict[KT, VT]])[source]¶ Initialises event attribute values directly from constructor kwargs.
-
topic
¶ Path to the class of the snapshotted entity.
-
state
¶ State of the snapshotted entity.
-
__mutate__
(obj: Optional[TEntity]) → Optional[TEntity][source]¶ Updates ‘obj’ with values from ‘self’.
Calls the ‘mutate()’ method.
Can be extended, but subclasses must call super and return an object to their caller.
Parameters: obj – object (normally a domain entity) to be mutated Returns: mutated object
-
versioning¶
Support for upcasting the state of older version of domain events is
implemented in base class
Upcastable
.
-
class
eventsourcing.domain.model.versioning.
Upcastable
[source]¶ Bases:
eventsourcing.whitehead.Event
For things that are upcastable.
http://code.fed.wiki.org/view/wyatt-software/view/remote-database-schema-migration
“I was sure that we could not get the schema for WyCash Plus right on the first try. I was familiar with Smaltalk-80’s object migration mechanisms. I designed a version that could serve us in a commercial software distribution environment.
“I chose to version each class independently and write that as a sequential integer in the serialized versions. Objects would be mutated to the current version on read. We supported all versions we ever had forever.
“I recorded mutation vectors for each version to the present. These could add, remove and reorder fields within an object. One-off mutation methods handled the rare case where the vectors were not enough description.
“We shipped migrations to our customers with each release to be performed on their own machines when needed without any intervention.”
Ward Cunningham
-
classmethod
__upcast_state__
(obj_state: Dict[KT, VT]) → Dict[KT, VT][source]¶ Upcasts obj_state from the version of the class when the object state was recorded, to be compatible with current version of the class.
-
classmethod
__upcast__
(obj_state: Dict[KT, VT], class_version: int) → Dict[KT, VT][source]¶ Must be overridden in domain event classes that set class attribute ‘__class_version__’ to a positive value.
This method is expected to upcast obj_state from method arg ‘class_version’ to the next version, and return obj_state.
One style of implementation is an if-else block, in which the conditional expressions match on the value of ‘class_version’.
The implementation of this method must support upcasting each version of the class from 0 to one less than the current version. For example: a domain class with “__class_version__ = 1” will need to support upcasting “class_version == 0” to version 1; and a domain class with “__class_version__ = 2” will need to support both upcasting “class_version == 0” to version 1, and also upcasting “class_version == 1” to version 2.
To support backward compatibility, the recorded state of old versions of an event class will need to be upcast to work with new versions of the software. Commonly, this involves supplying default values for attributes missing on old versions of events, after an event class has been changed by adding a new attribute.
In a situation where a new version of an event class needs to be used by existing software, it would be necessary also to support forward compatibility. Examples of this situation include the situation of deploying with rolling updates, and the situation where consumers would receive and attempt to handle the new version of the event but cannot be updated before the new event class version is deployed.
To support forward compatibility, it is necessary to restrict changes to be merely additive: either adding new attributes to an event, or adding new behaviours to the type of an existing attribute. Event attributes shouldn’t be removed (or renamed), existing attributes should not have aspects of their behaviour removed, and the semantics of an existing attribute should not change. If the type of an attribute value is changed, the new type should substitutable for the old type. The old software will then simply ignore new attributes, and it will simply ignore new aspects of new types of value.
For example, the type of an attribute value can be changed to use a subtype of the previous attribute value type. The possible range of a value can be reduced when changing the type of an attribute value, since all the values of the new type will be supported by the old software.
Increasing the possible range of a value will introduce values that cannot be used by the old software, and changing the type of an attribute value to a supertype will remove behaviour that the old software may depend on.
Of course, if the old software doesn’t in fact depend on aspects that are removed, then those aspects can be removed without actually breaking anything.
If backward and forward compatibility is required, and it is felt that an event class needs to be changed that would cause an attribute to be removed or the type of an attribute to be more general, or the range of values of an attribute to increase, then according to Greg Young’s book ‘Versioning in an Event Sourced System’ it is better to to add a new event type. However, if the old software would break because this new event type is not supported, then supporting forward compatibility would be elusive.
In summary, supporting forward compatibility restricts model changes to adding attributes to existing model classes (which implies the versioned event class’ upcast method will supply default values when upcasting the state of old versions of the event).
-
classmethod
timebucketedlog¶
Time-bucketed logs allow a sequence of the items that is sequenced by timestamp to be split across a number of different database partitions, which avoids one partition becoming very large (and then unworkable).
-
class
eventsourcing.domain.model.timebucketedlog.
Timebucketedlog
(name: uuid.UUID, bucket_size: Optional[str] = None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
-
class
Event
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
Supertype for events of time-bucketed log.
-
class
Started
(originator_version: int = 0, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.timebucketedlog.Event
-
class
-
class
eventsourcing.domain.model.timebucketedlog.
TimebucketedlogRepository
[source]¶ Bases:
eventsourcing.domain.model.repository.AbstractEntityRepository
-
class
eventsourcing.domain.model.timebucketedlog.
MessageLogged
(message: str, originator_id: uuid.UUID)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithTimestamp
,eventsourcing.domain.model.events.EventWithOriginatorID
,eventsourcing.domain.model.events.LoggedEvent
collection¶
Collections.
-
class
eventsourcing.domain.model.collection.
Collection
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
-
class
Event
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
Supertype for events of collection entities.
-
class
Created
(originator_version: int = 0, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.collection.Event
,eventsourcing.domain.model.entity.Created
Published when collection is created.
-
class
Discarded
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.collection.Event
,eventsourcing.domain.model.entity.Discarded
Published when collection is discarded.
-
class
EventWithItem
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.collection.Event
-
class
ItemAdded
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.collection.EventWithItem
-
mutate
(obj: eventsourcing.domain.model.collection.Collection) → None[source]¶ Updates (“mutates”) given ‘obj’.
Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).
The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.
Parameters: obj – domain entity to be mutated
-
-
class
ItemRemoved
(originator_version: int, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.collection.EventWithItem
-
mutate
(obj: eventsourcing.domain.model.collection.Collection) → None[source]¶ Updates (“mutates”) given ‘obj’.
Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).
The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.
Parameters: obj – domain entity to be mutated
-
-
class
array¶
A kind of collection, indexed by integer. Doesn’t need to replay all events to exist.
-
class
eventsourcing.domain.model.array.
ItemAssigned
(item, index, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
Occurs when an item is set at a position in an array.
-
class
eventsourcing.domain.model.array.
BigArray
(array_id, repo)[source]¶ Bases:
eventsourcing.domain.model.array.Array
A virtual array holding items in indexed positions, across a number of Array instances.
Getting and setting items at index position is supported. Slices are supported, and operate across the underlying arrays. Appending is also supported.
BigArray is designed to overcome the concern of needing a single large sequence that may not be suitably stored in any single partiton. In simple terms, if events of an aggregate can fit in a partition, we can use the same size partition to make a tree of arrays that will certainly be capable of sequencing all the events of the application in a single stream.
With normal size base arrays, enterprise applications can expect read and write time to be approximately constant with respect to the number of items in the array.
The array is composed of a tree of arrays, which gives the capacity equal to the size of each array to the power of the size of each array. If the arrays are limited to be about the maximum size of an aggregate event stream (a large number but not too many that would cause there to be too much data in any one partition, let’s say 1000s to be safe) then it would be possible to fit such a large number of aggregates in the corresponding BigArray, that we can be confident it would be full.
Write access time in the worst case, and the time to identify the index of the last item in the big array, is proportional to the log of the highest assigned index to base the underlying array size. Write time on average, and read time given an index, is constant with respect to the number of items in a BigArray.
Items can be appended in log time in a single thread. However, the time between reading the current last index and claiming the next position leads to contention and retries when there are lots of threads of execution all attempting to append items, which inherently limits throughput.
Todo: Not possible in Cassandra, but maybe do it in a transaction in SQLAlchemy?
An alternative to reading the last item before writing the next is to use an integer sequence generator to generate a stream of integers. Items can be assigned to index positions in a big array, according to the integers that are issued. Throughput will then be much better, and will be limited only by the rate at which the database can have events written to it (unless the number generator is quite slow).
An external integer sequence generator, such as Redis’ INCR command, or an auto-incrementing database column, may constitute a single point of failure.
-
class
eventsourcing.domain.model.array.
AbstractArrayRepository
(array_size=10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.repository.AbstractEntityRepository
Repository for sequence objects.