Aug 262012
 

In the previous part we added blog post comment functionality. In this part we’ll do some refactoring and change the memory image implementation to automatically retry domain logic on optimistic locking conflicts, giving us a simplified form of transactions. We’ll also change the event store to support multiple types of event streams in a single event store.

Other Parts

Code

You can find the code associated with this part at github on the part-5 branch.

Refactoring

The main changes we want to make before improving the current commit code to handle transient conflicts is to move it out of the PostsController, so that we use it from other parts of the application. But the current implementation is tied directly to Posts and PostEvents. So first we remove the following dependencies from this method:

  1. PostEvent conflict resolution.
  2. How to derive the PostId from the PostEvent.
  3. How to turn a PostId into a event stream identifier (currently just using toString)

To solve the first problem we introduce a new trait ConflictsWith:

/**
 * Compares committed events against an attempted events to check for
 * conflicts.
 */
trait ConflictsWith[-Event] {
  /**
   * Checks each committed event from `conflict` for conflicts with the `attempted` events.
   * Any committed events that conflict are returned.
   */
  def conflicting[A <: Event, B <: Event]
                 (conflict: Conflict[A], attempted: Seq[B]): Option[Conflict[A]]
}
object ConflictsWith {
  /**
   * Builds a new `ConflictsWith` based on the `conflicts` predicate.
   */
  def apply[Event](conflicts: (Event, Event) => Boolean) = new ConflictsWith[Event] {
    override def conflicting[A <: Event, B <: Event]
                            (conflict: Conflict[A], attempted: Seq[B]): Option[Conflict[A]] =
      conflict.filter(a => attempted.exists(b => conflicts(a.event, b)))
  }
}

ConflictsWith is a type class that defines how to resolve conflicts for a certain type of events. By making it a separate trait (instead of a function) the Scala compiler can automatically pass implementations as an implicit parameter where needed. The definition of the Conflict class has also changed slightly, to make it easier to filter the events while preserving meta-information.

The apply method of the ConflictsWith companion object is helpful to create instances of ConflictsWith based on a simple predicate. The predicate must check if two events conflict or not. The PostEvent conflict resolution implementation becomes:

  implicit val PostEventConflictsWith: ConflictsWith[PostEvent] = ConflictsWith {
    case (a: PostCommentEvent, b: PostCommentEvent) => a.commentId == b.commentId
    case (_: PostCommentEvent, _)                   => false
    case _                                          => true
  }

We use Scala’s convenient partial function syntax to define all the cases, without having to explicitly use the match keyword.

To solve the second and third problem, and to support different type of event streams in a single event store, we introduce another trait EventStreamType:

/**
 * Event stream type information.
 */
trait EventStreamType[StreamId, Event] {
  /**
   * Convert a stream identifier to a string. Used by the event store to persist
   * the stream identifier.
   */
  def toString(streamId: StreamId): String

  /**
   * Extract stream identifier from `event`.
   */
  def streamId(event: Event): StreamId

  /**
   * Cast `event` to the `Event` type.
   *
   * @throws ClassCastException if `event` is not of type `Event`.
   */
  def cast(event: Any): Event
}
object EventStreamType {
  def apply[StreamId, Event]
           (writeStreamId: StreamId => String, 
            eventToStreamId: Event => StreamId)
           (implicit manifest: Manifest[Event]) =
     // [...]
}

This trait captures the relationship between an event stream identifier type and the type of events that can be stored in the associated stream. It also provides methods to extract the event stream identifier from an event and to turn an event stream identifier into its string representation. Finally it allows casting a value of any type to the type of the event. With these two traits defined we can move the commit method from the PostsController into the MemoryImage class. But first we look into using the event store for different event stream types.

Multiple event stream types

Now that we’ve associated an event streams identifier type with the event type using the EventStreamType trait, we can make our event store contra-variant. In other words, when we have an event store of type EventStore[DomainEvent] we can use it as an EventStore[PostEvent], if PostEvent is a subtype of DomainEvent.

But we can only do this by changing event store methods that return events to take into account the EventStreamType, otherwise the Scala compiler will not allow us to treat a EventStore[DomainEvent] as a EventStore[PostEvent] (since reading from such a store could return DomainEvents, while the caller would only expect PostEvents). So the definition of CommitReader becomes:

/**
 * Reads commits from the event store.
 */
trait CommitReader[-Event] {
  // [...]

  /**
   * Reads all commits `since` (exclusive) up `to` (inclusive). Events are
   * filtered to only include events of type `E`.
   */
  def readCommits[E <: Event]
                 (since: StoreRevision, to: StoreRevision)
                 (implicit manifest: Manifest[E]): Stream[Commit[E]]

  /**
   * Reads all commits from the stream identified by `streamId` that occurred
   * `since` (exclusive) up `to` (inclusive).
   *
   * @throws ClassCastException the stream contained commits that did not
   *                            have the correct type `E`.
   */
  def readStream[StreamId, E <: Event]
                (streamId: StreamId, 
                 since: StreamRevision = StreamRevision.Initial, 
                 to: StreamRevision = StreamRevision.Maximum)
                (implicit descriptor: EventStreamType[StreamId, E]):
                Stream[Commit[E]]
}

Other methods in the event store traits are similarly changed. The methods can be used much like before, except that you’ll have to provide the expected event type (readCommits) or that Scala will look for an appropriate instance of EventStreamType (readStream):

implicit val PostEventStreamType: EventStreamType[PostId, PostEvent] = 
  EventStreamType(streamId => streamId.toString, event => event.postId)

val myCommits = eventStore.reader.readStream(myPostId)

Here the Scala compiler knows the type of myPostId and will infer that the events must be of type PostEvent[1], based on the existence of PostEventStreamType. So Scala will correctly infer the type of myCommits to be Stream[Commit[PostEvent]], so you will not have to do any type casting.

Commit parameters

Another change is that we introduce a new Changes type that combines the three parameters provided to the event store committer’s tryCommit method (streamId, expected stream revision, and the event to append). The interface is:

/**
 * Represents the changes that can be committed atomically to the event store.
 */
sealed trait Changes[Event] {
  type StreamId
  def eventStreamType: EventStreamType[StreamId, Event]

  def streamId: StreamId
  def expected: StreamRevision
  def events: Seq[Event]

  def withExpectedRevision(expected: StreamRevision): Changes[Event]
}
object Changes {
  // [...]

  def apply[StreamId, Event]
           (expected: StreamRevision, event: Event)
           (implicit streamType: EventStreamType[StreamId, Event]): Changes[Event] =
    apply(streamType.streamId(event), expected, event)
}

So besides the three parameters it also captures the type of the stream identifier and the associated EventStreamType instance, so that only events of the correct type can be committed to the event stream. The withExpectedRevision returns a new copy of the changes with an updated expected StreamRevision, which is useful for retrying a commit in the case of resolved conflicts.

With all these changes in place we can get back to solving transient transaction conflicts.

Atomic memory image modifications

Going back to our application’s storage architecture the following picture emerges:

Remember that in the blog post application there is one event store, each with many memory images (one per application server), all concurrently committing new events, based on the actions performed by the users. We already detect the conflicts that can occur when users work with (potentially) outdated information by comparing the expected stream revision to the actual revision (see the previous part for more details). Let’s call these kind of conflicts user conflicts.

But by adding the blog comment functionality we introduced a potential conflict in the interaction between the memory image and the event store. In the add comment action of PostsController we first read the current state of a blog post and use it to assign a CommentId to the new comment. We then commit a CommentAdded event. If multiple users happen to add comments to a blog post simultaneously, an unresolvable conflict will occur: both new comments will have the same id. This is a transaction conflict[2][3].

In the current version of the application the commit fails and we’ll notify the user that someone else added a comment with the same id. The user must then resubmit the comment, which will then succeed. But why ask the user to retry an action for these kind of transient conflicts, when computers are much better at doing repetitive work automatically?

What we have to do is to automatically rerun the code that reads the memory image and then generates the event. So we will replace the MemoryImage‘s tryCommit method with a new modify method:

class MemoryImage[State, Event] /* [...] */ {
  // [...]

  def modify[A](body: State => Transaction[Event, A]): A = ...
}

The body parameter is a function that uses the current state of the memory image and returns a Transaction value. This transaction value contains all the information we need to try to commit to the event store. If it turns out that a transient transaction conflict occurred, we will rerun the body function and retry the commit. When the transaction succeeds we return the value returned by body.

The possible values for a Transaction are shown below:

/**
 * The transaction to commit to the event store when modifying the memory image.
 */
sealed trait Transaction[+Event, +A] {
  /**
   * Maps the result of this transaction from `A` to `B` using `f`.
   */
  def map[B](f: A => B): Transaction[Event, B]
}
object Transaction {
  /**
   * Transaction result that completes with `onAbort` when run,
   * without committing anything the event store.
   */
  def abort[A](onAbort: => A): Transaction[Nothing, A] = 
    new TransactionAbort(() => onAbort)

  implicit def ChangesOps[Event](changes: Changes[Event]) = new ChangesOps(changes)
  class ChangesOps[Event](changes: Changes[Event]) {
    /**
     * Transaction result that will commit the  `changes` to the event store.
     */
    def commit[A](onCommit: => A, onConflict: Conflict[Event] => A)
                 (implicit conflictsWith: ConflictsWith[Event]): 
                 Transaction[Event, A] =
      new TransactionCommit(changes, () => onCommit, onConflict, conflictsWith)
  }

  implicit def OptionTransactionOps[Event, A](m: Option[Transaction[Event, A]]) = new OptionTransactionOps(m)
  class OptionTransactionOps[Event, A](value: Option[Transaction[Event, A]]) {
    /**
     * Turns an `Option[Transaction[Event, A]]` into `Transaction[Event, Option[A]]`.
     */
    def sequence: Transaction[Event, Option[A]] = value match {
      case None              => abort(None)
      case Some(transaction) => transaction.map(Some(_))
    }
  }
}

The TransactionCommit and TransactionAbort (not listed) are simple case classes to hold parameters until we are ready to commit (or abort) the transaction. The ChangesOps enriches the Changes class with a new commit method that instantiates a TransactionCommit (Scala 2.10 will offer a nicer syntax for “extending” existing classes with new methods using implicit classes). Values of type Option[Transaction[Event, A]] are also extended with the sequence method, which flips the optionality to return a Transaction[Event, Option[A]]

To modify the memory image safely, we can now write code like:

memoryImage.modify { posts => Changes(/* ... */).commit(/* ... */) }

The flow chart below describes the full MemoryImage modify algorithm, including user and transaction conflict resolution:

That’s quite a few steps! Let’s go through each step in the left column:

  1. Read the current state of the memory image.
  2. Pass the state to the provided transaction body.
  3. Is the result of the transaction an abort? If yes, return onAbort result and stop.
  4. Try to commit the Changes to the event store using the expected stream revision.
  5. Did the commit succeed? If yes, return the onCommit result and stop.
  6. Was there an update to the event stream that was not yet applied to the memory image state used to run this transaction? If yes, we have a transaction conflict and go back to step 1 to retry.
  7. Is there an unresolvable user conflict? If yes, return the result of the onConflict handler and stop.
  8. Try to commit again, but now use the actual stream revision.
  9. Did the commit succeed? If yes, return the onCommit result and stop. If not, we have a transaction conflict, so go back to step 1 to retry.

This entire process is implemented in MemoryImage.modify. The method is a bit larger than I would like, but it seems to be one of those cases that don’t get any more understandable when split up into smaller pieces.

Now that we have basic transaction support in place we can modify the controller methods to work with this. Here’s the post edit action together with the updatePost helper method:

object edit {
  // [...]
  def submit(id: PostId, expected: StreamRevision) = Action { implicit request =>
    updatePost(id) { post =>

      postContentForm.bindFromRequest.fold(
        formWithErrors =>
          abort(BadRequest(views.html.posts.edit(id, expected, formWithErrors))),

        postContent =>
          Changes(expected, PostEdited(id, postContent): PostEvent).commit(
            onCommit = 
              Redirect(routes.PostsController.show(id)).flashing("info" -> "Post saved."),
            onConflict = conflict => 
              Conflict(views.html.posts.edit(id, conflict.actual, postContentForm.fill(postContent), conflict.events))))

    } getOrElse {
      notFound
    }
  }
}

/**
 * Runs the transaction `body` against the post identified by `postId` and
 * returns the result, if it exists. Otherwise `None` is returned.
 */
def updatePost[A](id: PostId)
                 (body: Post => Transaction[PostEvent, A]): Option[A] =
  memoryImage.modify { _.get(id).map(body).sequence }

As you can see, if the form validation fails, we abort the transaction and return a BadRequest as usual. Otherwise we return a commit result with the PostEdited event and the appropriate onCommit and onConflict handlers.

Summary

With the addition of the modify method to the MemoryImage we now have a place to put domain logic. Since the example application’s logic is still extremely simple, we can just keep it inside the PostsController. As your application gets more complicated, you should extract the domain logic into separate functions and classes.

The event store (and memory image) now also support multiple types of event streams. We’ll use that in the next part to add user accounts to the example application. We’ll also look into ensuring uniqueness of the email addresses associated with these user accounts.

Footnotes:

[1] Of course, the event store as a boundary. The stored events our outside of the application's control and there is no guarantee that the events in a stream are of the correct type, or can be deserialized. This is why we fail fast when we get bad data out of the event store, so the problem can be detected and fixed quickly.

[2] Transient transaction conflicts are not specific to memory images or event stores. In relational database with multi-version concurrency control (such as Oracle or PostgreSQL) you can get a "non-serializable transaction" error, in which case you should also retry your transaction.

[3] Instead of trying to resolve transient conflicts, we can also prevent them by adhering to the Single Writer Principle. By making sure only one application server can write to the event store, there can be no conflicts. All Changes can then be send to this writer process to be committed to the event store. This is actually a great solution, but does require some kind of cluster communication to decide which server is the "writer". Currently cluster communication (and leadership election) is not provided out-of-the-box by Play! or Akka, but Akka 2.1 should have built-in cluster management.

  2 Responses to “Simple event sourcing – refactoring and transactions (part 5)”

  1. Hi Erik,

    I really enjoyed this series and am pleased that you went through the trouble to do this. Some of my comments/questions:

    1) In your comments for part 5 you mention the single writer principle -> I know LMAX definitely followed this for their trading system but you should also point out the complexity added with things like proper failover when adhering to it.
    2) How would you go back about implementing ‘snapshots’ of your memory image? This is a concept that I almost always see when discussing event sourcing and it would be interesting to see an implementation.
    3) Further, do you feel your work as is makes upgrading simple when significant new features are requested?

    • Hi Gary,

      Glad you like this series. I’m still working on more parts, but house renovations take a big chunk of my time at the moment. Anyway, to try to answer your questions:

      1. With the single writer principle you’ll need some form of leadership election in the cluster to handle fail-over. That’s why I didn’t implement this in this example, since you’d need something like Zookeeper, Hazelcast, or Akka’s upcoming clustering support to do this correctly. Of course, now the example relies on a centralised Redis database, so the problem is still present. Hopefully Redis will soon become fully fault-tolerant (see Redis Sentinel). Still, with a single writer parts of the code become quite a bit simpler (less need for retries or locks) and performance is probably better when there is high contention.

      2. Without having a memory image aggregates need to be restored from events on every transaction. It makes sense to add snapshots to avoid replaying many events. If you can store everything in memory, you only need to replay events on startup. If this starts to take up significant time, you can use rolling restarts to hide this time. If this is still too slow, you can try taking a snapshot of the entire memory image. This is similar to what LMAX is doing (load latest memory image snapshot + apply any newer events). There’s also the option of not loading the entire memory image on startup, but to try to do so on demand. This gets closer to traditional caching, except that the cache is always up to date (you can push events to the cache immediately). This doesn’t work very well for the parts of the memory image that need global information (like the order of added blog posts in the example). For this you could replay events on startup, use snapshots, or use an external (on-disk) index. Lots of opportunities :)

      3. I’m not sure yet. Dealing with older events can be really painful. I’ll try to show an example in the next part of this series. However, recently I’ve been dealing with quite a bit of schema/data migrations in a traditional RDBMS application, and this is not a piece of cake either. Especially since so much information was never stored, so you’ll have to try to reverse engineer or “invent” information to deal with newer features. It’s also very hard to ensure the correctness of these migrations and to not destroy anything permanently. With events you can always go back to the original meaning or just fix things with some new code and a restart.

      Regards,
      Erik

 Leave a Reply

(required)

(required)

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>