Event Sourcing With Equinox FSharp

Apr 22nd, 2020 - written by Kimserey with .

One of the concept which gained a lot of traction from the last five years is Event Sourcing. Event Sourcing changes the way data are handled, how data are stored, and how perception of the current state is constructed. In this post, we will explain what is Event Sourcing and will see how it can implement using Equinox, a .NET implementation of an event store.

Event Sourcing

When we deal with data, the quickest way is to store data with one to one mapping between object model and database schema. For example, if we are creating a todo list, we would hold the todos in an object containing a title, content, date and status. That object would have the exact representation in a database table. If we need to update the todo item, we would go ahead and update the todo in the database.

With event sourcing, we look at events rather than data. Instead of looking at the concrete todo state in the database, we look at the events that occurred and produced the latest state. In this example, we will have two events, Todo added and Todo updated. While a todo item can only be added once, it can be updated multiple times so the state of a specific todo will be constructed from one todo added event and potentially multiple todo updated. Each todo state is reconstructed from a stream of events which is also referred as aggregate in Domain Driven Design (DDD) terms.

An operation on a stream involves the following steps:

  1. reconstruct the latest state by retrieving all events and applying them one by one,
  2. once we have the state, we validate the change, representing the intent as an events,
  3. save the events int the aggregate’s stream,
  4. behind the scenes, serialize and deserialize events to and from our data storage.

This is where Equinox comes into play. Equinox is a set of libraries abstracting the event sourcing mechanism and specifically an abstraction over the streams. In the following example, we will look closely at how we can use Equinox to build a event sourced todo application.

Equinox

Equinox can be installed from Nuget. In this example, we will use the MemoryStore to demonstrate how an implementation of Equinox would look like. We start by installing the following:

  • Equinox
  • Equinox.MemoryStore

Building a stream implementation follows four steps:

  1. Define the events,
  2. Define the folding logic of the events,
  3. Map incoming requests to events representing the accepted changes,
  4. Creating a service which will serve as a single point of contact for the stream.

The events being the centre of the application, we start by creating our two events, Added and Updated. In the following example, I use F# as a language as the discriminated union makes it easy to model the different events.

1
2
3
4
5
6
7
8
9
module Events =
    type TodoData =  { id: int; title: string; completed: bool }

    type Event =
        | Added         of TodoData
        | Updated       of TodoData
        interface TypeShape.UnionContract.IUnionContract

    let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

We define a Todo type which will contain the data to be carried by the events. We also implement the TypeShape interface IUnionContract as we want to make our discriminated union compatible with the default serializer provided by Equinox, FsCodec. We define a variable codec which will be required later.

Once we have defined our events, we can define our folding logic.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
module Fold =
    type State = { items : Events.TodoData list; nextId : int }

    let initial = { items = []; nextId = 0 }

    let evolve s e =
        match e with
        | Events.Added item ->
            { s with
                items = item :: s.items
                nextId = s.nextId + 1 }

        | Events.Updated value ->
            let items =
                s.items |> List.map (function { id = id } when id = value.id -> value | item -> item)

            { s with items = items }

    let fold : State -> Events.Event seq -> State =
        Seq.fold evolve

We start by defining a State. The state will be a in-memory object which we construct by applying events from an initial state (or from snapshotted events) one by one. The application of event is done in the evolve which has the following signature:

1
evolve: State -> Event -> State

It takes the previous state plus an event and return the next state.

We can see that the state is unrelated from the Events.Todo type, it is not stored anywhere and is only reconstructed based on events. Therefore we introduced a nextId which serves as a counter to assign new ID to todos. The nextId is not a business data therefore isn’t stored in the event but it is a necessary piece of logic.

For Events.Added, we simply add the new event to the current list and bump the nextId. For Events.Updated, we simply find the todo of that particular ID and replace it.

Lastly we create a fold function which takes the state plus a sequence of events rather than a single event, this function follows the signature needed by Equinox.

Once we have defined the folding, we are able to repopulate the latest state, we can create a Command which will represent our available actions in our system.

1
2
3
4
5
6
7
8
9
10
11
12
13
type Command =
    | Add of Events.TodoData
    | Update of Events.TodoData

let interpret c (state : Fold.State) =
    match c with
    | Add value ->
        [Events.Added { value with id = state.nextId }]

    | Update value ->
        match state.items |> List.tryFind (function { id = id } -> id = value.id) with
        | Some current when current <> value -> [Events.Updated value]
        | _ -> []

Similarly to the Events, we create a discriminated union for the Command (follow the command pattern is not a requirement, but it often fits well with event sourcing). Note however, that unlike events, commands are not stored and hence don’t require to be serialized or deserialized so we are free to use any type representation.

Here we define Add and Update. Our Events are in past tense while our Command are in imperative mood. Events already occurred and have already been saved as truth while Commands represent a desire or intent that’s yet to be accepted into the aggregate’s timeline

In the interpret, we take the current state with the command, and generate events out from that point. We don’t act on the state, we generate events which will be folded onto the state later on.

And that concludes our definition of a stream. The last part will be to use the Events, Fold, Command and interpret to build a Service which will contain the functionality:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Service (resolve : string -> Equinox.Stream<Events.Event, Fold.State>) =

    let handle clientId command =
        let stream = resolve clientId
        stream.Transact(fun state ->
            let events = interpret command state
            let newState = Fold.fold state events
            newState.items, events)

    member __.List(clientId) : Async<Events.Todo seq> =
        let stream = resolve clientId
        stream.Query (fun s -> s.items |> Seq.ofList)

    member __.Create(clientId, template: Events.Todo) : Async<Events.Todo> = async {
        let! newState = handle clientId (Command.Add template)
        return List.head newState }

    member __.Patch(clientId, item: Events.Todo) : Async<Events.Todo> = async {
        let! newState = handle clientId (Command.Update item)
        return List.find (fun x -> x.id = item.id) newState }

The Service exposes List, Create and Patch which allows respectively to list all todos, create a new todo and update an existing todo. The argument resolve is a function which takes a stream ID and returns the instance of the stream. Given the stream, we are able to either use #.Query allowing us to query the latest state of the stream, or #.Transact allowing us to perform an action on the stream.

#.Transact manages accepting changes into the aggregate. It obtains the current state (using initial, fold, events, snapshots and/or caches etc), presents that to a function you supply, and takes care of saving the changes the function decided are appropriate given that state and the input Command. There’s also an overload of #.Transact that allows the function passed as argument to yield a response.

Lastly we can register the service as a singleton:

1
2
3
4
5
6
7
8
9
10
11
12
services.AddSingleton<TodoBackend.Service>(fun sc ->

    // The Events for each Stream are held in here (NOTE: Volatile: not saves when Host is stopped)
    let memoryStore = VolatileStore()

    let resolver = 
        Equinox.MemoryStore.Resolver(memoryStore, Todo.Events.codec, Todo.Fold.fold, Todo.Fold.initial)

    let streamName id =
        FsCodec.StreamName.create "Todos" id

    Todo.Service(fun id -> Equinox.Stream(Serilog.Log.Logger, resolver.Resolve (streamName id), maxAttempts = 3)))

A stream is created using a resolver, and a resolver is given from a concrete implementation of an Equinox store. In this example we use the MemoryStore therefore we create a resolver with the VolatileStore. And we construct the stream name using FsCodec.StreamName.create facility (more info about StreamName here).

We can now use the service in a regular ASPNET Core API Controller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[<CLIMutable>]
type TodoDto = { id: int; title: string; completed: bool }

[<ApiController>]
[<Route("[controller]")>]
type TodoController (service: Service) =
    inherit ControllerBase()

    let clientId = Guid.Empty.ToString("N")

    [<HttpGet>]
    member __.Get() = async {
        let! xs = service.List(clientId)
        return xs
    }

    [<HttpPost>]
    member __.Post([<FromBody>]value : TodoDto) : Async<TodoDto> = async {
        let! _ = service.Create(clientId, { id = 0; title = value.title; completed = false })
        return value
    }

    [<HttpPatch "{id}">]
    member __.Patch(id, [<FromBody>]value : TodoDto) : Async<TodoDto> = async {
        let! _ = service.Patch(clientId, { id = id; title = value.title; completed = value.completed })
        return value
    }

The events for a given aggregate are stored in a stream. Each #.Query or #.Transact references a StreamName which in our examples takes a clientId and maps it to the individual stream that will hold that client’s todo list, e.g. “Todos-Customer1234”. And that’s it, we now have an event sourced todo application with two events, Added and Updated.

Conclusion

Today we looked into Equinox, a .NET Event Sourcing platform. We started by giving a brief introduction of event sourcing and then moved on to implementing a TODO application using Equinox and its memory store. We took a step by step approach of defining a model via events, then defining the logic of folding events to rebuild the state, then looked into how to interpret commands to generate events and lastly how to bind everything together with the memory store. I hope you liked this post and I see you on the next one!

External Sources

Designed, built and maintained by Kimserey Lam.