Skip to content
Marek Fišera edited this page Jan 25, 2018 · 27 revisions

Table of content

Packages

EventSourcing defines these NuGet packages (with dependencies):

Neptuo.EventSourcing.Domains for modeling domain (aggregates, events & commands).

Neptuo.EventSourcing for command and event delivery (hosting infrastructure).

Neptuo.EventSourcing.Formatters.Composite for serializing/deserializing events/commands using composite formatters.

Neptuo.EventSourcing.Formatters.Composite.Json for serializing/deserializing to the JSON.

Overview

EventSourcing is a set of implementation packages for concepts defined in Commands, Events and Models using full CQRS and ES principles.

The root namespace of this package is Neptuo and implementation classes are placed in corresponding namespaces where contracts are placed.

In this package, and in the event sourcing pattern, state of the object is represented as stream of events that the object passed through. Using this approach your objects remain all of its history.

Models (aggregates & processes)

An application domain project should reference only Neptuo.EventSourcing.Domains project/package.

AggregateRoot. Aggregates are implemented through the interface IAggregateRoot and package contains base class, called AggregateRoot. An object of this type contains key for unique identification. Any change of its state is made by public (or internal) methods which execute business logic and express new state by stream of new events.

These business methods can be either public or internal. This is based on the intended interaction with aggregate roots.

One approach is to define all public interactions with the domain through set of commands and place these commands in the some project as aggregate roots. Here we can define whole aggregate roots as internal describe all inputs as commands and outputs as events. No one outside of the domain knows anything about aggregates.

Another approach is to make aggregates public and use commands only from the client projects (web, desktop applications).

When extending AggregateRoot, business method can use Publish method to push new events. This method enqueues the events in the list of events for saving and also applies it to the appropriate Handle method. As mentioned above, aggregate state described as stream of events, so business methods only evaluates parameters and creates new events. These events are then passed to the Handle methods with concrete event type.

Persisting instance of an AggregateRoot is as simple as storing stream of (new) events to the underlying storage. On the other hand, loading current state means loading all saved events associated with key of the AggregateRoot and reapplying them on a fresh instance of a particular aggregate type. This two operations are implemented int the AggregateRootRepository<T>.

ProcessRoot. Aggregates can't interact with other aggregates, nor they can schedule another operations. To solve these needs, CQRS comes with processes or sagas. This pattern is implemented through ProcessRoot (and ProcessRootRepository<T>) which extends AggregateRoot with methods for scheduling new commands. These commands can internal in domains, where there is no reason to be published from outside of the domain. These commands scheduled after saving and publishing new events

Keys. Aggregates (and Processes) are identified by the property Key which is of type IKey. All events has property AggregateKey which identifies where the event was created. Using AggregateKey.Type can be found the of the aggregate the event belongs to.

It is suitable to name events that it contains name of the aggregate, where it belongs. So, all events for aggregate Order should be named like OrderCreated, OrderPayed or OrderCancelled.

An example aggregate could look this way:

public class Order : AggregateRoot, 
    IEventHandler<OrderCreated>, 
    IEventHandler<OrderPayed>
{
    public DateTime CreatedAt { get; private set; }
    public bool IsPayed { get; private set; }

    // This constructor is used to load existing state to the fresh instance of the Order.
    public Order(IKey key, IEnumerable<IEvent> events)
        : base(key, events)
    { }

    public Order()
    {
        Publish(new OrderCreated(Key, DateTime.Now));
    }

    Task IEventHandler<OrderCreated>.HandleAsync(OrderCreated payload)
    {
        CreatedAt = payload.CreatedAt;
    }

    public void Pay()
    {
        if (IsPayed)
            throw new OrderAlreadPayedException();

        Publish(new OrderPayed());
    }

    Task IEventHandler<OrderPayed>.HandleAsync(OrderPayed payload)
    {
        IsPayed = true;
    }
}

Events

This part is dedicated to implementation of Events. As described in the above section, application state is described as a stream of events. To let other parts of an application (read models or processes) know about changes, we need to publish the events on an event bus. Also, this publishing must follow ACID of transactions with the repository save operation.

All events used to describe state of the domain must implementant IEvent, also it is suitable to inherit from base class Event, because infrastructure can set properties like AggregateKey and Version automatically in the Publish method of the aggregate.

Components used by persistent implementation of event delivery

To accomplish this the AggregateRootRepository uses IEventStore to save events to an underlying store. Than all events are passed one by one to the PersistentEventDispatcher. Each is passed to the registered handlers and if such handler is decorated by IdentifierAttribute, information about delivery is persisted using IEventPublishingStore.

The implementation of IEventStore should store events for two purposes. The first is the permanent application state. The second is for persistent delivery to the handlers decorated with IdentifierAttribute. If the application crashes during publishing events, than after rebooting this temporal store is process using IEventPublishingStore and events are published to the remaining handlers. The value passed to IdentifierAttribute must be unique in the scope of the an application, so GUID is ideal candidate.

After application restart, events are delivered only to handlers decorated with [Identifier] attribute.

To accomplish this requirement the event bus is implemented as background process that has shared queue for storing and processing all events. Also there is a persistent storage where is saved for each event a list of handlers that was successfully fired. If the application crashes in the middle of publishing events, on next startup this storage is read through and checked if there are some events that weren't distributed to all handlers.

Commands

Commands represents user (or system) input to the domain. As for Events, any Command that was accepted by the infrastructure is saved to the persistent storage and so its execution is guaranteed.

Components used by persistent implementation of command delivery

An implementation of ICommandStore is used for persisting all commands. Every time a command is passed to the PersistenCommandDispatcher, the instance of ICommandStore is used to save serialized instance of the command.

After the command is handled, the instance of ICommandPublishingStore is used to save an information about successful delivery. This way we can guarantee that any infrastructure exception or system failure doesn't abort command processing, because when the application is rebooted, the command is handled again.

Command handlers

The command handlers are a thin mapping layer between aggregate root methods and commands. Their only responsibility is to load the instance of an aggregate root, call method on it and store back to storage. This process is wrapped in AggregateRootCommandHandler<T>. Command handling method can be simple as:

public Task HandleAsync(PayOrder command)
{
    return Execute(command.OrderKey, order => order.Pay(command.CardNumber));
}

Any domain exception should inherit from AggregateRootException. There are properties for transferring AggregateKey, where the exception raised, and for CommandKey. This property is useful for pairing any domain exception with a key of the command that cause it. In a Handle method we can simply call WithCommand(commandKey) and then fluently Execute(aggregateKey, (aggregate) => ...) to decorate execution of a method on the aggregate with code, that fills CommandKey on any raised domain exception.

Command persistent delivery doesn't require decorating handlers with IdentifierAttribute (as with event handlers). As each command can has only one handler, persistent delivery is automatically handled by the PersistentCommandDispatcher and instance of ICommandPublishingStore that is passed in.

Formatters

The IFormatter is in the heart of the EventSourcing. Any implementation of IFormatter can be used, but CompositeTypeFormatter is best suited, because it supports versioned events/commands and has support for metadata defined in the envelope.

When using CompositeTypeFormatter we should use different instance for events and different for commands. This is because of different internal properties (defined on Command and Event default implementations). It supports skipping these properties in the constructor of events.

public class OrderCreated : Event
{
    public OrderCreated()
    { ... }
}

The event class should in fact define constructor which takes (in addition to the standard event parameters) version and aggregateKey and this constructor will be used during the deserialization.

This behavior is encapsulated in the CompositeEventFormatter (and CompositeCommandFormatter). So no special deserialization constructor is needed.

Converters and event/command serialization to the JSON

Events and commands can contain properties of any type, but for each type, there must be registered converter in the Converts.Repository between the type and the JToken (from the Newtonsoft.Json).

Converts.Repository
    .AddJsonEnumSearchHandler()
    .AddJsonObjectSearchHandler()
    .AddJsonPrimitivesSearchHandler()
    .AddJsonKey()
    .AddJsonTimeSpan();

The Neptuo.EventSourcing.Formatters.Composite.Json and Neptuo.Formatters.Composite.Json packages comes with a bunch of useful converters. Also, there is source code of how to implement such a converter.

With this registration in the bootstrap of the application, you can use properties of any enum type, GuidKey, StringKey and these keys in the properties of type IKey.

Then some primitive types like int, long, double, float, string, bool, decimal, DateTime, TimeSpan, Uri and nullable versions of these types.

Rebuilding read-models

When we introduce new read model, we need to re-apply all events on it. Not really all events, but all that the model is subscribed at. To accomplish this the package EventSourcing introduces service Rebuilder (in the Neptuo.ReadModels namespace). It is simple component that takes instance of IEventRebuilderStore, as source of the truth (= events). Than we can register any number of event handler, as it implements IEventHandlerCollection. When everything is set up, we can call RunAsync to apply events on handlers.

Under the hood to finds all event types for which passed handlers implements IEventHandler. Then it asks underlying event to load all events of those types and apply them on handlers.

In addition we can call AddAggregateVersionFilter to filter out some concrete aggregates or older events raised on them.

Snapshots

Snapshot is a memento created from current aggregate/process state. It's purpose is to act as a cheap-to-load cache of an aggregate/process. It is bound to the concrete version of aggregate/process.

When loading aggregate with snapshot, only the newer events are re-applied. So the snapshot must hold the whole state of the aggregate/process.

Strategies for snapshot taking frequency can vary based on the aggregate type. We can even use strategy that creates snapshots after every command, in such a case the event store is only a 'backup' for the aggregate state. Snapshots are considered as a cache, so loosing snapshot must not be critical for the application.

To make aggregate root snapshot compatible, we must define constructor which takes IKey, ISnapshot and IEnumerable<IEvent>. The additional parameter is the instance of the snapshot, which is applied to the method LoadSnapshot from the base class AggregateRoot and this method must be overridden in the concrete aggregate/process. Then, we need to create ISnapshotProvider which is responsible of creating snapshot instance. It is used by the infrastructure right after persisting events to the store.

After snapshot is created, it is passed to the ISnapshotStore which is responsible of saving it the storage, and, then in the aggregate loading phase, loading the snapshot back.

To summarize it, four things must be done to support snapshots:

  • Define constructor in the aggregate/process which takes instance of the snapshot.
  • Override LoadSnapshot method in the aggregate/process.
  • Provide implementation of ISnapshotProvider as a service for creating instances of snapshots.
  • Provide implementation of ISnapshotStore for saving and load snapshot instances.
Clone this wiki locally