-
Notifications
You must be signed in to change notification settings - Fork 5
EventSourcing
Table of content
- Packages
- Models (aggregates & processes)
- Events
- Commands
- Command handlers
- Formatters
- Rebuilding read-models
- Snapshots
EventSourcing defines these NuGet packages (with dependencies):
Neptuo.EventSourcing.Domains for modeling domain (aggregates, events & commands).
Neptuo.EventSourcing for command and event delivery (hosting infrastructure).
Neptuo.EventSourcing.Formatters.Composite for serializing/deserializing events/commands using composite formatters.
Neptuo.EventSourcing.Formatters.Composite.Json for serializing/deserializing to the JSON.
EventSourcing is a set of implementation packages for concepts defined in Commands, Events and Models using full CQRS and ES principles.
The root namespace of this package is Neptuo and implementation classes are placed in corresponding namespaces where contracts are placed.
In this package, and in the event sourcing pattern, state of the object is represented as stream of events that the object passed through. Using this approach your objects remain all of its history.
An application domain project should reference only
Neptuo.EventSourcing.Domains
project/package.
AggregateRoot. Aggregates are implemented through the interface IAggregateRoot
and package contains base class, called AggregateRoot
. An object of this type contains key for unique identification. Any change of its state is made by public (or internal) methods which execute business logic and express new state by stream of new events.
These business methods can be either public or internal. This is based on the intended interaction with aggregate roots.
One approach is to define all public interactions with the domain through set of commands and place these commands in the some project as aggregate roots. Here we can define whole aggregate roots as internal describe all inputs as commands and outputs as events. No one outside of the domain knows anything about aggregates.
Another approach is to make aggregates public and use commands only from the client projects (web, desktop applications).
When extending AggregateRoot
, business method can use Publish
method to push new events. This method enqueues the events in the list of events for saving and also applies it to the appropriate Handle
method. As mentioned above, aggregate state described as stream of events, so business methods only evaluates parameters and creates new events. These events are then passed to the Handle
methods with concrete event type.
Persisting instance of an AggregateRoot
is as simple as storing stream of (new) events to the underlying storage. On the other hand, loading current state means loading all saved events associated with key of the AggregateRoot
and reapplying them on a fresh instance of a particular aggregate type. This two operations are implemented int the AggregateRootRepository<T>
.
ProcessRoot. Aggregates can't interact with other aggregates, nor they can schedule another operations. To solve these needs, CQRS comes with processes or sagas. This pattern is implemented through ProcessRoot
(and ProcessRootRepository<T>
) which extends AggregateRoot
with methods for scheduling new commands. These commands can internal in domains, where there is no reason to be published from outside of the domain. These commands scheduled after saving and publishing new events
Keys. Aggregates (and Processes) are identified by the property
Key
which is of typeIKey
. All events has propertyAggregateKey
which identifies where the event was created. UsingAggregateKey.Type
can be found the of the aggregate the event belongs to.
It is suitable to name events that it contains name of the aggregate, where it belongs. So, all events for aggregate
Order
should be named likeOrderCreated
,OrderPayed
orOrderCancelled
.
An example aggregate could look this way:
public class Order : AggregateRoot,
IEventHandler<OrderCreated>,
IEventHandler<OrderPayed>
{
public DateTime CreatedAt { get; private set; }
public bool IsPayed { get; private set; }
// This constructor is used to load existing state to the fresh instance of the Order.
public Order(IKey key, IEnumerable<IEvent> events)
: base(key, events)
{ }
public Order()
{
Publish(new OrderCreated(Key, DateTime.Now));
}
Task IEventHandler<OrderCreated>.HandleAsync(OrderCreated payload)
{
CreatedAt = payload.CreatedAt;
}
public void Pay()
{
if (IsPayed)
throw new OrderAlreadPayedException();
Publish(new OrderPayed());
}
Task IEventHandler<OrderPayed>.HandleAsync(OrderPayed payload)
{
IsPayed = true;
}
}
This part is dedicated to implementation of Events. As described in the above section, application state is described as a stream of events. To let other parts of an application (read models or processes) know about changes, we need to publish the events on an event bus. Also, this publishing must follow ACID of transactions with the repository save operation.
All events used to describe state of the domain must implementant IEvent
, also it is suitable to inherit from base class Event
, because infrastructure can set properties like AggregateKey
and Version
automatically in the Publish
method of the aggregate.
Components used by persistent implementation of event delivery
To accomplish this the AggregateRootRepository
uses IEventStore
to save events to an underlying store. Than all events are passed one by one to the PersistentEventDispatcher
. Each is passed to the registered handlers and if such handler is decorated by IdentifierAttribute
, information about delivery is persisted using IEventPublishingStore
.
The implementation of IEventStore
should store events for two purposes. The first is the permanent application state. The second is for persistent delivery to the handlers decorated with IdentifierAttribute
. If the application crashes during publishing events, than after rebooting this temporal store is process using IEventPublishingStore
and events are published to the remaining handlers. The value passed to IdentifierAttribute
must be unique in the scope of the an application, so GUID
is ideal candidate.
After application restart, events are delivered only to handlers decorated with
[Identifier]
attribute.
To accomplish this requirement the event bus is implemented as background process that has shared queue for storing and processing all events. Also there is a persistent storage where is saved for each event a list of handlers that was successfully fired. If the application crashes in the middle of publishing events, on next startup this storage is read through and checked if there are some events that weren't distributed to all handlers.
Commands represents user (or system) input to the domain. As for Events, any Command that was accepted by the infrastructure is saved to the persistent storage and so its execution is guaranteed.
Components used by persistent implementation of command delivery
An implementation of ICommandStore
is used for persisting all commands. Every time a command is passed to the PersistenCommandDispatcher
, the instance of ICommandStore
is used to save serialized instance of the command.
After the command is handled, the instance of ICommandPublishingStore
is used to save an information about successful delivery. This way we can guarantee that any infrastructure exception or system failure doesn't abort command processing, because when the application is rebooted, the command is handled again.
The command handlers are a thin mapping layer between aggregate root methods and commands. Their only responsibility is to load the instance of an aggregate root, call method on it and store back to storage. This process is wrapped in AggregateRootCommandHandler<T>
. Command handling method can be simple as:
public Task HandleAsync(PayOrder command)
{
return Execute(command.OrderKey, order => order.Pay(command.CardNumber));
}
Any domain exception should inherit from AggregateRootException
. There are properties for transferring AggregateKey, where the exception raised, and for CommandKey. This property is useful for pairing any domain exception with a key of the command that cause it. In a Handle
method we can simply call WithCommand(commandKey)
and then fluently Execute(aggregateKey, (aggregate) => ...)
to decorate execution of a method on the aggregate with code, that fills CommandKey on any raised domain exception.
Command persistent delivery doesn't require decorating handlers with IdentifierAttribute
(as with event handlers). As each command can has only one handler, persistent delivery is automatically handled by the PersistentCommandDispatcher
and instance of ICommandPublishingStore
that is passed in.
The IFormatter
is in the heart of the EventSourcing. Any implementation of IFormatter
can be used, but CompositeTypeFormatter
is best suited, because it supports versioned events/commands and has support for metadata defined in the envelope.
When using CompositeTypeFormatter
we should use different instance for events and different for commands. This is because of different internal properties (defined on Command
and Event
default implementations). It supports skipping these properties in the constructor of events.
public class OrderCreated : Event
{
public OrderCreated()
{ ... }
}
The event class should in fact define constructor which takes (in addition to the standard event parameters) version
and aggregateKey
and this constructor will be used during the deserialization.
This behavior is encapsulated in the CompositeEventFormatter
(and CompositeCommandFormatter
). So no special deserialization constructor is needed.
Events and commands can contain properties of any type, but for each type, there must be registered converter in the Converts.Repository
between the type and the JToken
(from the Newtonsoft.Json).
Converts.Repository
.AddJsonEnumSearchHandler()
.AddJsonObjectSearchHandler()
.AddJsonPrimitivesSearchHandler()
.AddJsonKey()
.AddJsonTimeSpan();
The Neptuo.EventSourcing.Formatters.Composite.Json
and Neptuo.Formatters.Composite.Json
packages comes with a bunch of useful converters. Also, there is source code of how to implement such a converter.
With this registration in the bootstrap of the application, you can use properties of any enum
type, GuidKey
, StringKey
and these keys in the properties of type IKey
.
Then some primitive types like int
, long
, double
, float
, string
, bool
, decimal
, DateTime
, TimeSpan
, Uri
and nullable versions of these types.
When we introduce new read model, we need to re-apply all events on it. Not really all events, but all that the model is subscribed at. To accomplish this the package EventSourcing
introduces service Rebuilder
(in the Neptuo.ReadModels
namespace). It is simple component that takes instance of IEventRebuilderStore
, as source of the truth (= events). Than we can register any number of event handler, as it implements IEventHandlerCollection
. When everything is set up, we can call RunAsync
to apply events on handlers.
Under the hood to finds all event types for which passed handlers implements IEventHandler
. Then it asks underlying event to load all events of those types and apply them on handlers.
In addition we can call AddAggregateVersionFilter
to filter out some concrete aggregates or older events raised on them.
Snapshot is a memento created from current aggregate/process state. It's purpose is to act as a cheap-to-load cache of an aggregate/process. It is bound to the concrete version of aggregate/process.
When loading aggregate with snapshot, only the newer events are re-applied. So the snapshot must hold the whole state of the aggregate/process.
Strategies for snapshot taking frequency can vary based on the aggregate type. We can even use strategy that creates snapshots after every command, in such a case the event store is only a 'backup' for the aggregate state. Snapshots are considered as a cache, so loosing snapshot must not be critical for the application.
To make aggregate root snapshot compatible, we must define constructor which takes IKey
, ISnapshot
and IEnumerable<IEvent>
. The additional parameter is the instance of the snapshot, which is applied to the method LoadSnapshot
from the base class AggregateRoot
and this method must be overridden in the concrete aggregate/process. Then, we need to create ISnapshotProvider
which is responsible of creating snapshot instance. It is used by the infrastructure right after persisting events to the store.
After snapshot is created, it is passed to the ISnapshotStore
which is responsible of saving it the storage, and, then in the aggregate loading phase, loading the snapshot back.
To summarize it, four things must be done to support snapshots:
- Define constructor in the aggregate/process which takes instance of the snapshot.
- Override
LoadSnapshot
method in the aggregate/process. - Provide implementation of
ISnapshotProvider
as a service for creating instances of snapshots. - Provide implementation of
ISnapshotStore
for saving and load snapshot instances.