In the first part of this series we developed a simple blogging application that uses event sourcing to keep track of all changes made by the users. However, we did not yet write these events to durable storage. This means all data is lost when the application is restarted, which is clearly not acceptable. Saving and restoring events will be the responsibility of the event store, which we’ll start implementing in this part. But before we get to actually writing events to disk, we must first tackle the problem of maintaining data consistency when using event sourcing.

Other Parts

The code

You can find the code on github in the part-2 branch. Use git clone -b part-2 https://github.com/zilverline/event-sourced-blog-example.git to checkout the correct branch. If you already retrieved the previous part, then using git pull and git checkout part-2 should do the trick.

Living in a distributed world

Something you’ll quickly have to learn when you develop web applications is that the internet is a truly distributed environment. You’ll often run your application across multiple servers (for performance and/or fault-tolerance), while users access your application from different devices all across the globe. All of these servers and devices will have a different view of the world. In a distributed world there is no such thing as a single, global truth. As soon as a web page is rendered by a server and the page is displayed by the client, the data on the page is already out of date. If you do not handle this correctly, some surprising things may happen.

An easy way to see this is to start editing a blog post, then use another browser window to delete the same blog post, and then submit the original edit form. In the application from part 1 this results in a NoSuchElementException while processing the event to update the current state. In the Rails getting started application you get a ActiveRecord::RecordNotFound. Other frameworks that I know of aren’t any better at handling this. Back button usage and double-clicking also causes many of the same problems. And the more collaberative your application is, the more likely it is that these inconsistencies and conflicts will occur for real.

Many current web applications deal with this by relying on a combination of ad-hoc solutions, prayer, and luck1. You just have to hope that the occasional lost update, exception or other glitch won’t cause too much trouble.

Write consistency

Since events are forever, we cannot afford to let these glitches go undetected. Otherwise we could get strange, meaningless event sequences, such as having a blog post being edited after it has been deleted! So one major responsibility of an event store is to detect these conflicts and prevent storing inconsistent events.

To implement this our event store is going to keep track of multiple event streams. Each event stream has an associated stream revision, which is defined to be equal to the number of commits for that particular event stream. This makes the stream revision a gapless, strictly increasing sequence number.

When you commit additional events to an event stream, you will have to specify the stream revision that you expect the stream to currently have. If the expected stream revision is lower than the current stream revision, there is a conflict. This approach is also known as optimistic concurrency control.

We use multiple event streams so that commits that go to different event streams do not conflict: each event stream defines a consistency boundary. In our example application we’ll map each blog post to its own event stream. So editing or deleting two different posts never cause conflicts.

Read consistency

There is another problem that we have to solve, related to using a memory image. In a database oriented application the central database is considered to give a single, up-to-date, consistent view of the current state of the application. Database transactions are used to control concurrency conflicts (but make sure you choose the correct isolation level!). Since the database is a single, centralized component any read after a transaction commit will return the updated data (until you introduce master-slave replication or caching).

With a memory image the situation is slightly different. After changes are successfully committed to the event store, the memory images of every application server is not immediately updated! The newly committed events must first be transferred to the application servers and applied to the memory image before the effects of the change becomes visible:

So to avoid having a fast client not seeing the results of the action they just performed, we’ll also keep track of the event store revision, which is defined to be equal to the total number of commits to the event store. Again, this produces a gapless, strictly increasing sequence number. Whenever a client is redirected to see the results of a commit, we’ll check the current store revision to check if our memory image is up-to-date.

There are a couple of ways to implement this:

  • Don’t track the store revision at all. The application servers will usually be notified of the change before the client completes the HTTP redirect, so the client will usually see the changes it just made. This may be good enough in situations where highly volatile data is the norm, such as financial markets. For a blog posting application, users expect their changes to be immediately visible, so we need to look at other solutions.
  • Only run a single application server or ensure all clients always access the same application server (for example, by using sticky sessions, or having one primary server and another server just for failover purposes). The client should always see the result of its actions if we update the server’s memory image before responding.
  • Save the store revision as part of the user’s session state, such as a cookie. Before accessing the memory image ensure it is up-to-date with respect to the revision saved in the cookie or session. Unfortunately, using mutating cookies can lead to race conditions.
  • Pass the most recent store revision as a parameter in the redirect URL. This means that every POST action needs to take care of correctly passing in the new store revision whenever a commit succeeds.
  • Read the current event store revision whenever the memory image is accessed. Wait until at least this many commits have been applied to the memory image before rendering the response. This is the easiest to implement, so we’ll use this. It is however not the most performant!

The Event Store API

If the above got you thoroughly confused, don’t worry too much. The event store implementation is actually not that complicated. To make the first implementation easier to understand we will not worry about durability yet. In the next part of this series we’ll build an implementation that uses an actual database (Redis). We’ll then compare the fake implementation to the Redis implementation to check that both behave the same.

Store and stream revisions

The store and stream revisions are represented by the StoreRevision and StreamRevision classes and can be found in EventStore.scala. They simply wrap a Long value. The main reason not to use plain Longs is to avoid accidental mixups. Plain numbers that are related but actually mean different things are hard to keep straight otherwise!

Conflicts and commits

Committing events to the event store uses the EventCommitter interface:

/**
 * Commits events to an event store.
 */
trait EventCommitter[Event] {
  def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event]
}

The tryCommit method will attempt to commit the given event to the event stream. The result type is defined as follows:

/**
 * The result of a commit attempt is either a `Conflict` or a successful `Commit`.
 */
type CommitResult[+Event] = Either[Conflict[Event], Commit[Event]]

/**
 * A successful commit to `streamId`.
 */
case class Commit[+Event](
  storeRevision: StoreRevision,
  timestamp: Long,
  streamId: String,
  streamRevision: StreamRevision,
  events: Seq[Event])

/**
 * The conflict that occurred while trying to commit to `streamId`.
 */
case class Conflict[+Event](
  streamId: String,
  actual: StreamRevision,
  expected: StreamRevision,
  conflicting: Seq[Commit[Event]])

In other words, tryCommit returns either a Conflict or a Commit. The conflict will contain the actual stream revision and all conflicting commits (any commit that happened since the expected stream revision). When a commit succeeds, it will contain the committed event as well as some metadata, such as the timestamp of the commit.

You may have noticed that the Commit class allows for multiple events in a single commit. This can often be useful if you want a group of events to be treated atomically by subscribers. Another reason for this is that as your application evolves some events may no longer be of interest. These events can then be filtered out without altering the store or stream revisions the commit is part of. In other words, you may have commits that no longer hold any events!

Event notification

To rebuild the memory image and receive commits as they happen the CommitPublisher interface is defined:

/**
 * Publishes successful commits to subscribers.
 */
trait CommitPublisher[Event] {
  /**
   * Notifies `listener` of all commits that happened `since`. Notification happens asynchronously.
   */
  def subscribe(since: StoreRevision)(listener: Commit[Event] => Unit): Subscription
}

/**
 * A subscription that can be cancelled.
 */
trait Subscription {
  def cancel(): Unit
}

After subscribing to the event store all commits that happened after the since store revision will be passed to the listener callback. The event store will ensure that commits are send to the listener in-order and without any gaps or duplicates. Notice that notification of commits happens asynchronously.

Reading events and the event store API

Two more interfaces are defined that complete the event store API:

/**
 * Reads commits from the event store.
 */
trait CommitReader[Event] {
  def storeRevision: StoreRevision
  def readCommits(since: StoreRevision, to: StoreRevision): Stream[Commit[Event]]
  def streamRevision(streamId: String): StreamRevision
  def readStream(streamId: String, since: StreamRevision, to: StreamRevision): Stream[Commit[Event]]
}

/**
 * The event store API.
 */
trait EventStore[Event] {
  def reader: CommitReader[Event]
  def committer: EventCommitter[Event]
  def publisher: CommitPublisher[Event]
  def close(): Unit
}

The CommitReader interface allows access to stored commits (using Scala’s lazy Streams to avoid having to load all commits in memory at once) and the EventStore interface simply combines the other interfaces and a close method into a single unit.

The fake event store implementation

The FakeEventStore implementation can be found in FakeEventStore.scala. It uses a simple Vector (Scala’s immutable equivalent to Java’s ArrayList) to store all events committed to the store and a map of streamId to Vector for the commits associated with each event stream. All commits are stored in their original order. Scala’s Software Transactional Memory (STM) is again used to ensure consistency:

class FakeEventStore[Event] extends EventStore[Event] {
  // [...]
  private[this] val commits = Ref(Vector.empty[Commit[Event]]).single
  private[this] val streams = Ref(Map.empty[String, Vector[Commit[Event]]]).single

  override object reader extends CommitReader[Event] {
    override def storeRevision = StoreRevision(commits().size)

    override def readCommits(since: StoreRevision, to: StoreRevision): Stream[Commit[Event]] = {
      commits().slice(
        (since.value min Int.MaxValue).toInt,
        (to.value min Int.MaxValue).toInt).toStream
    }
    // [...]
  }
  // [...]
}

The CommitReader implementation just uses regular Scala collection manipulation functions. Notice that the implementation of storeRevision is precisely the definition of a store revision as discussed in “read consistency” above! The streamRevision and readStream methods (not shown) are implemented similarly.

The EventCommitter tryCommit method is also straightforward:

override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] = {
  require(Txn.findCurrent.isEmpty, "the fake event store cannot participate in an STM transaction, just like a real event store")

  atomic { implicit txn =>
    val actual = streamRevision(streamId)

    if (expected < actual) {
      val conflicting = readStream(streamId, since = expected)
      Left(Conflict(streamId, actual, expected, conflicting))
    } else if (expected > actual) {
      throw new IllegalArgumentException("expected revision %d greater than actual revision %d" format (expected.value, actual.value))
    } else {
      val commit = Commit(storeRevision.next, DateTimeUtils.currentTimeMillis, streamId, actual.next, Seq(event))
      commits.transform(_ :+ commit)
      streams.transform(streams => streams.updated(streamId, streams.getOrElse(streamId, Vector.empty) :+ commit))
      Right(commit)
    }
  }
}

First a check is done to ensure that no STM transaction is currently active, to make sure that we correctly mimic a real event store. Then the expected stream revision is checked against the actual revision. If there is a mismatch a conflict or error is reported. If they match, a new Commit is instantiated and appended to the commits and streams collections. The new commit is then returned. All this is done inside a STM transaction to ensure atomicity.

The final part of the event store implementation deals with notification and is the most tricky. STM again helps to keep complexity under control:

private[this] val closed = Ref(false).single
private[this] val executor = Executors.newCachedThreadPool

override object publisher extends CommitPublisher[Event] {
  override def subscribe(since: StoreRevision)(listener: Commit[Event] => Unit): Subscription = {
    val cancelled = Ref(false).single
    val last = Ref(since).single

    executor.execute(new Runnable {
      @tailrec override def run {
        // Wait for new commits or subscription termination.
        val pending = atomic { implicit txn =>
          if (closed() || cancelled()) None else {
            val pending = commits().drop(last().value.toInt)
            if (pending.isEmpty) retry
            else Some(pending)
          }
        }
        pending match {
          case None => // Stop.
          case Some(commits) =>
            // Notify listener and go back to run.
            commits.foreach { commit =>
              listener(commit)
              last() = commit.storeRevision
            }
            run
        }
      }
    })

    // Return a subscription instance that can be used for cancellation.
    new Subscription {
      override def cancel() = cancelled.set(true)
      override def toString = "Subscription(" + last() + ", " + cancelled() + ", " + FakeEventStore.this + ")"
    }
  }
}

The closed reference is used to communicate that the event store has been closed. The executor thread pool is used to run each subscription on its own background thread2 so that we can correctly mimic a real event store.

When a subscription is made, we create two more references: cancelled to communicate that this subscription has been cancelled and last to keep track of the last commit that has been passed to the listener.

A notification thread is then started that continuously checks if there are any new commits that the listener needs to be notified of. If there are none, the STM transaction is retried. Retry internally uses blocking to avoid needless looping to check for new event store commits.

If the event store is closed or the subscription is cancelled the subscription thread terminates.

The tests for the event store can be found in EventStoreSpec.scala. The tests are implemented as a trait and the FakeEventStoreSpec simply extends this trait. This allows us to use the same tests for all event store implementations.

Memory Image

The memory image itself is now implemented in its own class. It basically wraps an EventStore while allowing access to the current state.

/**
 * A `MemoryImage` tracks an underlying event store and uses the provided
 * `initialState` and `update` to project the committed events onto the
 * current state.
 */
class MemoryImage[State, Event] private
  (eventStore: EventStore[Event])
  (initialState: State)
  (update: (State, Commit[Event]) => State)
extends EventCommitter[Event] {
  private[this] val state = Ref(initialState)
  private[this] val revision = Ref(StoreRevision.Initial)

  /**
   * The current state of the memory image with at least all commits applied
   * that have been committed to the underlying event store.
   */
  def get: State = {
    val minimum = eventStore.reader.storeRevision
    atomic { implicit txn =>
      if (revision() < minimum) retry
      else state()
    }
  }

  /**
   * Commits an event to the underlying event store. The memory image will be
   * updated if the commit succeeds.
   */
  override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] =
    eventStore.committer.tryCommit(streamId, expected, event)

  override def toString = "MemoryImage(%s, %s)".format(revision.single.get, eventStore)

  // Subscribe to the underlying event store and apply every commit to the
  // current state using the provided `update` function.
  eventStore.publisher.subscribe(StoreRevision.Initial) { commit =>
    atomic { implicit txn =>
      require(revision().next == commit.storeRevision, "expected: " + revision().next + ", got " + commit.storeRevision)

      state.transform(s => update(s, commit))
      revision() = commit.storeRevision
    }
  }
}

The memory image takes three constructor parameters: the event store, the initial state, and the update function that takes the current state and a commit to produce the updated state. The memory image also implements the EventCommitter interface and simply passes any commit attempts to the underlying event store.

The get method first checks if the current memory image is up-to-date. If it isn’t, it blocks until the required number of commits have been applied. Otherwise it simply returns the current state. This ensures the required level of read consistency.

When the memory image is instantiated it also subscribes to the underlying event store and uses the provided update function to ensure the current state reflects all commits received from the event store.

The Posts Controller

Now that we have a (fake) implementation of the event store, we can start adjusting the PostsController to use it to store events. Before we simply had a posts reference to the latest data and a commit method to commit events. We’ll re-implement these to use the memory image:

class PostsController(memoryImage: MemoryImage[Posts, PostEvent]) extends Controller {
  /**
   * The current blog posts from the memory image.
   */
  def posts(): Posts = memoryImage.get

  /**
   * Commits an event and applies it to the current state. If successful the
   * provided callback `f` is applied to the `commit`. Otherwise a conflict
   * result is returned.
   */
  private[this] def commit(expected: StreamRevision, event: PostEvent)
                          (f: Commit[PostEvent] => Result): Result = {
    memoryImage.tryCommit(event.postId.toString, expected, event) match {
      case Left(conflict) => Conflict(todo())
      case Right(commit)  => f(commit)
    }
  }

As you can see the commit method parameter list was expanded to include the expected stream revision and a callback used to render an HTTP response if the commit succeeds. However, if a conflict is detected an HTTP 409 (Conflict) status code is returned. Giving the client detailed information on the conflict is not implemented yet, so we simply render a todo page.

The controller actions that generate events are all related to HTTP POST requests and need to be adjusted to handle the new expected stream revision parameter and conflict response. Since only the client knows the revision used to render the page by the time the POST request is submitted, the expected stream revision is added as a new URL parameter. Listed below are the show and submit actions related to the “edit post” page:

def show(id: PostId) = Action { implicit request =>
  posts().get(id) match {
    case Some(post) => Ok(views.html.posts.edit(post.id, post.revision, postContentForm.fill(post.content)))
    case None       => NotFound(notFound(request, None))
  }
}

def submit(id: PostId, expected: StreamRevision) = Action { implicit request =>
  postContentForm.bindFromRequest.fold(
    formWithErrors => BadRequest(views.html.posts.edit(id, expected, formWithErrors)),
    postContent =>
      commit(expected, PostEdited(id, postContent)) { commit =>
        Redirect(routes.PostsController.show(id)).flashing("info" -> "Post saved.")
      }
  )
}

Compared to the previous version there are two main changes:

  1. The revision of the post is passed to the “edit post” view.
  2. The expected revision is added as a parameter to the form submit action, so will need to be passed in as part of the URL. This is configured in the conf/routes file.

Since Play! 2 view templates and URLs are strongly typed it is also impossible to forget to pass this additional expected revision parameter, something that can happen easily with many other solutions, such as hidden “version” form fields.

The blog posts models

Since we need to know the current revision of a blog post when rendering the edit or delete actions we need to keep track of this in our model classes. This is done by adding a new field to the Post class and setting this based on the stream revision associated with the event:

def update(event: PostEvent, revision: StreamRevision): Posts = event match {
  case PostAdded(id, content) =>
    this.copy(byId = byId.updated(id, Post(id, revision, content)), orderedByTimeAdded = orderedByTimeAdded :+ id)
  case PostEdited(id, content) =>
    this.copy(byId = byId.updated(id, byId(id).copy(revision = revision, content = content)))
  case PostDeleted(id) =>
    this.copy(byId = byId - id, orderedByTimeAdded = orderedByTimeAdded.filterNot(_ == id))
}

This concludes our tour of the code. There were only minor changes to the code related to the application functionality, as most of the code is related to introducing the event store as a piece of application infrastructure.

Performance

Introducing the fake event store causes about a 3%-10% drop in performance compared to having no event store at all. Writes were affected the most, while read performance is almost the same. This is to be expected, since writes now use a background thread to update the memory image, while reads only add a quick check to the memory image to see if it is up-to-date with respect to the event store. Of course, we’re still not persisting the events to durable storage so we are mostly CPU bound. Writing to disk will obviously introduce new bottlenecks and we’ll go into a more detailed performance analysis in the next part.

Summary

With the introduction of the fake event store implementation our application’s architecture is now in place. The fake event store has the same behavior as a real event store (except for durability) and is very useful in combination with automated testing. The tests for the fake event store will also be used to check that the real event store behaves correctly. No architectural changes will be needed to our application when this new event store is available.

In the next part we’ll use Redis to implement an event store that will actually write the events to durable storage and will be used to replay the events to restore the memory image when the application server is started.

Footnotes:

  1. Primarily by defining various database constraints and by using row-level optimistic locking. These can help detect and prevent consistency problems, but do not help much with resolving these conflicts, something we’ll come back to in a later part of this series. 

  2. Since we do not expect to have many concurrent subscriptions to a single event store from a single JVM, threads are perfectly fine and do not limit scalability or performance. If you need many concurrent subscribers (for example, to push events to clients) then you should probably put an event bus or actor between the event store subscription and the clients.