Previously we spend some time preparing the code to support multiple kinds of events and data, rather than just supporting blog posts. In this part we’ll add user accounts, together with the required authentication and authorization code. We’ll again use event sourcing and the memory image to keep track of all users and currently active sessions. But the biggest changes to the application are related to security, and authorization in particular. It turns out event sourcing allows for an additional layer of authorization which allows us to whitelist any change a particular user is allowed to make.

Other parts

Code

You can find the code associated with this part on github on the part-6 branch.

Since quite some time has past since the previous part some other upgrades were made as well:

  • Scala 2.10.0 and Play 2.1-RC1 were released. The code has been updated accordingly. This mainly reduced the amount of code needed to work with JSON.
  • The stable version of Redis is now 2.6.7, which supports Lua scripts. So the 2.4.x Redis Watch/Multi/Exec event store implementation was removed.

User Accounts

Most applications need some level of user authentication and authorization support. Since we’re building an event sourced example application we’ll first define the events we need for managing users and active sessions:

sealed trait UserEvent extends DomainEvent {
  def userId: UserId
}
case class UserRegistered(userId: UserId, email: EmailAddress, displayName: String, password: Password) extends UserEvent
case class UserProfileChanged(userId: UserId, displayName: String) extends UserEvent
case class UserEmailAddressChanged(userId: UserId, email: EmailAddress) extends UserEvent
case class UserPasswordChanged(userId: UserId, password: Password) extends UserEvent
case class UserLoggedIn(userId: UserId, token: AuthenticationToken) extends UserEvent
case class UserLoggedOut(userId: UserId) extends UserEvent

From the event definitions it is quite clear what user related functionality our system supports. Besides registering, logging in and logging out, users can also change their profile information (display name), email address, and password.

To ensure we always hash passwords securely and never accidentally display them a custom Password class is defined, which uses the scrypt password hashing algorithm:

case class Password private (hash: String) {
  require(hash.startsWith("$s0$"), "invalid password hash")

  def verify(password: String): Boolean = SCryptUtil.check(password, hash)

  override def toString = "<PASSWORD-HASH>"
}
object Password {
  def fromHash(hash: String): Password = Password(hash)
  def fromPlainText(password: String): Password = Password(SCryptUtil.scrypt(password, 1 << 14, 8, 2))

  implicit val PasswordFormat: Format[Password] = valueFormat(fromHash)(_.hash)
}

Similar classes are defined for EmailAddress and AuthenticationToken. The authentication token is stored in the user’s session so we can lookup the current user when an HTTP request is made. This is implemented as part of the Users class:

case class Users(
  private val byId: Map[UserId, RegisteredUser] = Map.empty,
  private val byEmail: Map[EmailAddress, UserId] = Map.empty,
  private val byAuthenticationToken: Map[AuthenticationToken, UserId] = Map.empty) {

  // [... code omitted ...]

  def findByAuthenticationToken(token: AuthenticationToken): Option[RegisteredUser] =
    byAuthenticationToken.get(token).flatMap(byId.get)

  // [... code omitted ...]
}

Now that we not only store posts but users as well, the global state of the application is now captured in the ApplicationState class, which simply combines the Posts and Users classes and dispatches incoming updates to its parts:

case class ApplicationState(posts: Posts = Posts(), users: Users = Users()) {
  def update(event: DomainEvent, revision: StreamRevision) = event match {
    case event: PostEvent => copy(posts = posts.update(event, revision))
    case event: UserEvent => copy(users = users.update(event, revision))
    case _ => sys.error(s"unknown event: $event")
  }

  def updateMany(events: Seq[(DomainEvent, StreamRevision)]) = events.foldLeft(this) {
    case (state, (event, streamRevision)) => state.update(event, streamRevision)
  }
}

The User trait is defined as follows:

sealed trait User {
  def displayName: String

  /**
   * @return `Some(registeredUser)` if this is a registered user,
   * `None` otherwise.
   */
  def registered: Option[RegisteredUser] = None

  // [... authorization code ommitted ...]
}

By not having to worry about mapping classes to a (relational) database we can freely define our classes to match our application’s needs. In this case there are four different implementations of this trait. They all have a displayName and registered method in common, but are used in different situations:

/**
 * A deleted or non-existent user.
 */
case class UnknownUser(id: UserId) extends User {
  def displayName = "[deleted]"
}

/**
 * A user that we know the name of, but nothing else. Consider a comment posted
 * by a guest (where they only have to specify their name).
 */
case class PseudonymousUser(displayName: String) extends User

/**
 * A visitor to the site is represented by the guest user object.
 */
case object GuestUser extends User {
  def displayName = "Guest"

  // [... authorization code ommitted ...]
}

/**
 * A registered user who may have an active session identified by the
 * `authenticationToken`.
 */
case class RegisteredUser(
  id: UserId,
  revision: StreamRevision,
  email: EmailAddress,
  displayName: String,
  password: Password,
  authenticationToken: Option[AuthenticationToken] = None
) extends User {

override def registered = Some(this)

  // [... authorization code ommitted ...]
}

The current user

In most applications there is a current user. We want this user to be available to all our controller actions and views. We also want to store the current user’s id as part of a commit to our event store, so that we can easily trace who performed a specific action.

Instead of reimplementing this in every controller action we’ll add some helper methods, so that controller actions can stay focused. Play! makes it easy to do so using action composition.

First a new trait is defined so that we can decouple the controller from the raw memory image API. An instance of this trait will be passed to each controller when constructed:

/**
 * Actions available to controllers that make use of a memory image.
 */
trait ControllerActions[State, -Event] extends Controller { outer =>
  /**
   * The type of blocks that only need to query the current state of the
   * memory image.
   */
  type QueryBlock[A] = State => ApplicationRequest[A] => Result

  /**
   * The type of blocks that wish to read and modify the memory image.
   */
  type CommandBlock[A] = State => ApplicationRequest[A] => Transaction[Event, Result]

  /**
   * Runs the given `block` using the current state of the memory image.
   */
  def QueryAction(block: QueryBlock[AnyContent]): Action[AnyContent]

  /**
   * Runs the given `block` using the current state of the memory image
   * and applies the resulting transaction.
   */
  def CommandAction(block: CommandBlock[AnyContent]): Action[AnyContent]

  // [... code omitted ...]
}

As you can see two kinds of actions are provided:

  1. Query actions – used when you want to access the current user and application state, but do not need to modify it.
  2. Command actions – used when you want to generate and commit new events.

Each action expects a callback block as a parameter. The block will be invoked with the current memory image state and an ApplicationRequest, which wraps the standard Play! Request and adds additional user related information:

trait CurrentUserContext {
  /**
   * The current authenticated user or the guest user.
   */
  def currentUser: User
}

trait UsersContext {
  /**
   * All known users and sessions.
   */
  def users: Users
}

/**
 * Context for use as an implicit parameter to views.
 */
trait ViewContext extends CurrentUserContext {
  def flash: Flash
}

/**
 * Extend Play’s `Request` with user context information.
*/
class ApplicationRequest[A](
  request: Request[A],
  val currentUser: User,
  val users: Users
) extends WrappedRequest(request) with ViewContext with UsersContext

So instead of passing the Play! Request to views we now pass it a ViewContext, so that we can easily control which information is available to a view (in this case, just the current user and the flash message information).

The implementation of this trait uses a MemoryImage based on the ApplicationState class and can be found at MemoryImageActions.scala. Here’s the implementation of the QueryAction method:

/**
 * Implements `ControllerActions` using the (global) memory image containing
 * the `ApplicationState`.
 */
class MemoryImageActions
  (memoryImage: MemoryImage[ApplicationState, DomainEvent])
extends ControllerActions[ApplicationState, DomainEvent] {

  override def QueryAction(block: QueryBlock[AnyContent]) = Action { request =>
    val state = memoryImage.get
    block(state)(buildApplicationRequest(request, state))
  }

  // [... CommandAction omitted ...]

  private def buildApplicationRequest[A](request: Request[A], state: ApplicationState) = {
    val currentUser = request.session.get("authenticationToken")
      .flatMap(AuthenticationToken.fromString)
      .flatMap(state.users.findByAuthenticationToken)
      .getOrElse(GuestUser)

    new ApplicationRequest(request, currentUser, state.users)
  }
}

As you can see a QueryAction is implemented as a Play! Action that reads the current state of the memory image and uses it to build the ApplicationRequest with the current user. If there is no authentication token the GuestUser is used instead. The provided block is then invoked. The CommandAction is similar, but uses the MemoryImage.modify to also commit the generated events.

Finally, we do not want to expose the entire application state to each controller. The UsersController only needs to see the users, while the PostsController only needs to see the posts. The view method on ControllerActions takes care of that:

trait ControllerActions[State, -Event] extends Controller { outer =>
  // [... action methods omitted ...]

  /**
   * Only expose part of the state of the memory image using the provided
   * function `f`.
   */
  def view[S, E <: Event](f: State => S) = new ControllerActions[S, E] {
    def QueryAction(block: QueryBlock[AnyContent]) = outer.QueryAction { state => request =>
      block(f(state))(request)
    }
    def CommandAction(block: CommandBlock[AnyContent]) = outer.CommandAction { state => request =>
      block(f(state))(request)
    }
}

Controllers that use the memory image can now simply have an instance of ControllerActions passed in at construction time to define actions. For example, the PostsController.index action now becomes

object PostsController
  extends PostsController(Global.MemoryImageActions.view(_.posts))
class PostsController(actions: ControllerActions[Posts, PostEvent]) {
  // Import the action and controller methods directly.
  import actions._

  /**
   * Show an overview of the most recent blog posts.
   */
  def index = QueryAction { posts => implicit request =>
    Ok(views.html.posts.index(posts.mostRecent(20)))
  }

  // [... other code omitted ...]
}

Uniqueness of email addresses

One common requirement is that email addresses must be unique across all users. In an application backed by a relational database this is as easy as adding a unique constraint1, but an event store does not provide anything similar. We could use the email address as the user’s event stream identifier, but that makes it hard for a user to change their email address without creating a brand new event stream. So what we’ll do instead is to store a map of email address to user ids and fill this map on demand. For unit testing the following implementation works fine:

val claimedEmailAddresses = collection.mutable.Map.empty[EmailAddress, UserId]
def claim(email: EmailAddress, requestedUserId: UserId): UserId =
  claimedEmailAddress.getOrElseUpdate(email, requestedUserId)

In other words, the first time an email address is used a new user id is assigned. Afterwards, the same user id is always returned. For the production implementation we use a Redis hash together with the HSETNX command:

class RedisEmailRegistry(jedis: Jedis, redisKey: String) {
  def claim(email: EmailAddress, requestedUserId: UserId): UserId = {
    val result: Long = jedis.hsetnx(redisKey, email.toString, requestedUserId.toString)
    result match {
      case 0L =>
        val existingUserId = jedis.hget(redisKey, email.toString)
        UserId.fromString(existingUserId).getOrElse(sys.error(s"cannot parse user id: $existingUserId"))
      case 1L =>
        requestedUserId
      case _ =>
        sys.error(s"unexpected Redis return value: $result")
    }
  }
}

So when we register a new user we first map the email address to a user id and then generate the UserRegistered event. If the email address is already taken the event store will return a conflict, since we always expect the event stream to be empty for a new user:

class UsersController(
  actions: ControllerActions[Users, UserEvent],
  claimEmailAddress: (EmailAddress, UserId) => UserId
) {
  import actions._

  // [... other actions omitted ...]

  def register = CommandAction { state => implicit request =>
    val form = registrationForm.bindFromRequest
    form.fold(
      formWithErrors =>
        abort(BadRequest(views.html.users.register(formWithErrors))),
      registration => {
        val (email, displayName, password) = registration
        val userId = claimEmailAddress(email, UserId.generate)
        Changes(
          StreamRevision.Initial,
          UserRegistered(userId, email, displayName, password): UserEvent)
          .commit(
            onCommit = Redirect(routes.UsersController.registered),
            onConflict = _ => BadRequest(views.html.users.register(form.withGlobalError("duplicate.account"))))
    })
  }

Authors and authorization

When it comes to security authentication is often the “easy” part, mainly limiting itself to user registration and logging in. Authorization is often much harder, since it affects the entire application. Now that we have the concept of a user we change blog posts and comments to include the user id of the author or commenter. We then implement authorization rules to ensure only the author can delete a blog post, etc. We add the authorization methods the User trait:

trait User {
  // [... code omitted ...]

  def canAddPost: Boolean = false
  def canAddComment(post: Post): Boolean = false
  def canEditPost(post: Post): Boolean = false
  def canDeletePost(post: Post): Boolean = false
  def canDeleteComment(post: Post, comment: Comment): Boolean = false

  // [... code omitted ...]
}

By default a user is not authorized to do anything, except for a RegisteredUser or a GuestUser:

case object GuestUser extends User {
  // [... other code omitted ...]

  override def canAddComment(post: Post) = true
}

case class RegisteredUser(/* ... code omitted ... */) extends User {
  // [... other code omitted ...]

  override def canAddPost = true
  override def canAddComment(post: Post) = true
  override def canEditPost(post: Post) = post.isAuthoredBy(this)
  override def canDeletePost(post: Post) = post.isAuthoredBy(this)
  override def canDeleteComment(post: Post, comment: Comment) =
  post.isAuthoredBy(this) || comment.isAuthoredBy(this)

}

As you can see a guest user can only add comments to a post. Registered users can add posts, comments, and can edit or delete posts and comments authored by themselves.

Now controllers and views can easily invoke the correct authorization method to see if the user is allowed to see a certain button or to perform a certain action. Here’s is an example when adding a post (from PostsController), which uses the AuthenticatedCommandAction to require a logged in user:

def edit(id: PostId, expected: StreamRevision) = AuthenticatedCommandAction {
  user => posts => implicit request =>
    posts.get(id).filter(user.canEditPost) map { post =>
      // [... code omitted ...]
    } getOrElse {
      abort(notFound)
    }
  }

Event authorization

Unfortunately, it is easy to forget to add the correct authorization check to a controller action, since any action is allowed by default. And an attacker only has to find a single mistake to break the security of an application. With event sourcing we can optionally add an additional layer of security: we can check the events being committed against the authorization level of a user. To do this we add one additional method to the User trait:

trait User {
  // [... code omitted ...]

  def authorizeEvent(state: ApplicationState): DomainEvent => Boolean =
    event => false
}

This method takes the current ApplicationState and returns a function that checks if this user is allowed to commit a specific domain event. The default implementation always forbids any change.

For guest users and registered users we override this method. Here’s the code for the guest user:

case object GuestUser extends User {
  // [... code omitted ...]

  override def authorizeEvent(state: ApplicationState) = {
    case event: UserRegistered => true
    case event: UserLoggedIn => true
    case event: CommentAdded => state.posts.get(event.postId).exists(this.canAddComment)
    case _ => false
  }
}

Now we have everything to implement the CommandAction helper method from the MemoryImageActions class:

class MemoryImageActions(memoryImage: MemoryImage[ApplicationState, DomainEvent])
  extends ControllerActions[ApplicationState, DomainEvent] {
  // [... code omitted ...]

  override def CommandAction(block: CommandBlock[AnyContent]) = Action { request =>
    memoryImage.modify { state =>
      val applicationRequest = buildApplicationRequest(request, state)
      val currentUser = applicationRequest.currentUser
      val transaction = block(state)(applicationRequest)
      if (transaction.events.forall(currentUser.authorizeEvent(state))) {
        transaction.withHeaders(currentUser.registered.map(user => "currentUserId" -> user.id.toString).toSeq: _*)
      } else {
        Transaction.abort(notFound(request))
      }
    }
  }

  // [... code omitted ...]
}

As you can see it first builds the ApplicationRequest, just like the QueryAction method. But it then inspects the transaction returned by the provided block to see if the current user is authorized to commit all the generated events. If so, it adds the current user id as a header to the commit and applies it to the memory image. Otherwise, the transaction is aborted with a 404 Not Found response.

The main advantage of this level of authorization is that events must be explicitly whitelisted. So when new functionality is added it will simply not work until the whitelist is updated. This is the opposite of views and controllers, where new functionality works by default and authorization checks must be explicitly added. A step that is easy to forget!

Summary

Adding users to our example application caused some major re-design to manage authentication and authorization. Compared to this event sourcing was just a minor detail, the same kind of re-design would have been needed if users were stored in a traditional database. Event sourcing does provide an additional layer of security by allowing us to authorize committed events. We can also fully audit all changes, as every commit includes the committing user’s id.

We have now build a fairly complete application. In the next part we’ll start making use of our event sourced model to integrate with the outside world. We’ll do this by reacting to the events as they are committed to the event store.

Footnotes:

  1. Actually handling a unique constraint violation can be quite hard however. Often you’ll need to catch an exception from an ORM framework when the transaction is committed and then try to derive which unique constraint was violated.