Aug 082012
 

After our deep dive into a Redis event store implementation we’re now getting back to actually adding functionality to the blogging application. Like the Getting started with Rails guide we’ll add the capability to add comments to blog posts. Adding this functionality is straightforward, but it will require us to look into resolving conflicts when multiple people make modifications to the same blog post or comment concurrently.

Other Parts

Events

To add the new functionality to our application we will first define the new events and supporting data types. Notice that by focusing on the events we are actually thinking about the behaviour of the application first, instead of just the data model. In such a simple application as this blogging application it’s a rather subtle distinction, but when your application is more complicated events can help you to get a good understanding of what your application is supposed to do. Here are the event definitions for adding and deleting comments:

case class CommentId(value: Int)
object CommentId {
  implicit val CommentIdFormat: Format[CommentId] = valueFormat(apply)(unapply)
  implicit val CommentIdOrdering: Ordering[CommentId] = Ordering.by(_.value)
}

case class CommentContent(commenter: String, body: String)

// [...]

sealed trait PostCommentEvent extends PostEvent {
  def commentId: CommentId
}
case class CommentAdded(postId: PostId, commentId: CommentId, content: CommentContent) extends PostCommentEvent
case class CommentDeleted(postId: PostId, commentId: CommentId) extends PostCommentEvent

object PostEvent {
  // [...]

  implicit val CommentContentFormat: Format[CommentContent] = objectFormat("commenter", "body")(CommentContent.apply)(CommentContent.unapply)

  implicit val PostEventFormat: Format[PostEvent] = typeChoiceFormat(
    "PostAdded"      -> objectFormat("postId", "content")(PostAdded.apply)(PostAdded.unapply),
    "PostEdited"     -> objectFormat("postId", "content")(PostEdited.apply)(PostEdited.unapply),
    "PostDeleted"    -> objectFormat("postId")(PostDeleted.apply)(PostDeleted.unapply),
    "CommentAdded"   -> objectFormat("postId", "commentId", "content")(CommentAdded.apply)(CommentAdded.unapply),
    "CommentDeleted" -> objectFormat("postId", "commentId")(CommentDeleted.apply)(CommentDeleted.unapply))
}

Since comments are always part of a blog post, we’ll can use a simple sequential comment identifier. The first comment of a post will have id CommentId(1), the second one CommentId(2), etc. Obviously, we could also generate UUIDs for comments (and it would actually simplify the code), but by working with a sequential identifier we’ll slowly introduce some “domain” logic into our example application. We’ll also use CommentId as a key in a sorted map, so we need to define the Ordering (line 4), which is based on the underlying numeric value.

CommentContent is a simple container for the name of the commenter and the comment text.

The CommentAdded and CommentDeleted events are both subtypes of the PostCommentEvent trait, which in turn extends PostEvent. This makes it easier to distinguish comment related events which will be useful for resolving conflicts automatically.

To store the events in the event store, we also define and extend the necessary Format instances.

Models

Now that we have defined the new events, we can adjust our view models to keep track of comments as part of the post class:

case class Post(
  id: PostId,
  revision: StreamRevision,
  content: PostContent,
  nextCommentId: CommentId = CommentId(1),
  comments: SortedMap[CommentId, CommentContent] = SortedMap.empty)

case class Posts(byId: Map[PostId, Post] = Map.empty, orderedByTimeAdded: Seq[PostId] = Vector.empty) {
  // [...]

  def update(event: PostEvent, revision: StreamRevision): Posts = event match {
    // [...]
    case CommentAdded(id, commentId, content) =>
      modify(id) { post =>
        post.copy(
          revision = revision,
          nextCommentId = CommentId(commentId.value + 1),
          comments = post.comments.updated(commentId, content))
      }
    case CommentDeleted(id, commentId) =>
      modify(id) { post =>
        post.copy(
          revision = revision,
          comments = post.comments - commentId)
      }
  }

  private[this] def modify(id: PostId)(f: Post => Post) = 
    this.copy(byId = byId.updated(id, f(byId(id))))
}

The Post class is modified to keep track of the next available CommentId (starting at 1) and also tracks the comments, using a SortedMap[1] from CommentId to CommentContent. Remember that the representation of the view model is not tied to any kind of database schema, so we can easily change it whenever we need to. It will be automatically rebuild whenever we restart the application!

The update method also has to be changed to match against the new events and update the Post class accordingly. Updating nested, immutable data structures is a bit more involved than the mutable equivalent, so we use a simple helper method to take care of the first two levels of nesting. There are more general solutions (PDF) to this problem, but for now the modify method will do.

Views and controller

Next we add the required routes to conf/routes:

POST    /posts/:postId/comments                    ⏎
  controllers.PostsController.comments.add(postId: PostId, r: StreamRevision)
POST    /posts/:postId/comments/:commentId/delete  ⏎
  controllers.PostsController.comments.delete(postId: PostId, r: StreamRevision, commentId: CommentId)

Listing, deleting, and adding comments is all part of the show.scala.html template.

  <!-- ... -->

  @if(post.comments.nonEmpty) {
    <h2>Comments</h2>
    @for((commentId, commentContent) <- post.comments) {
      <br/>
      <div style="display: inline-block;">
        <form class="form-inline" style="display: inline-block;" action="@routes.PostsController.comments.delete(post.id, post.revision, commentId)" method="POST">
          <fieldset><button>&times;</button></fieldset>
        </form>
        &nbsp; @commentContent.body -- <em>by @commentContent.commenter</em>
      </div>
    }
  }

  <br/>
  <h2>Add a comment:</h2>

  @helper.form(action = routes.PostsController.comments.add(post.id, post.revision)) {
    @globalErrorsPanel(form)
    @conflictsMessagePanel(conflicts)
    <fieldset>
      @helper.inputText(form("commenter"), '_label -> "Commenter", 'required -> "required")
      @helper.textarea(form("body"), '_label -> "Body", 'cols -> 80, 'rows -> 10)
    </fieldset>

    <fieldset>
      <button class="btn btn-primary">Submit</button>
    </fieldset>
  }

As you can see, the template is quite straightforward. The only thing that might be unfamiliar is the invocation of the conflictsMessagePanel template, which we’ll get back to later.

Finally, we need to implement two new controller methods, which can be found in the comments singleton object inside of PostsController:

  // [...]

  private[this] def withPost(postId: PostId)(found: Post => Result)(implicit request: Request[_]) = {
    posts().get(postId).map(found).getOrElse(notFound(request))
  }
      
  object comments {
    val commentContentForm = Form(mapping(
      "commenter" -> trimmedText.verifying(minLength(3)),
      "body" -> trimmedText.verifying(minLength(3)))(CommentContent.apply)(CommentContent.unapply))

    def add(postId: PostId, expected: StreamRevision) = Action { implicit request =>
      withPost(postId) { post =>
        commentContentForm.bindFromRequest.fold(
          formWithErrors => BadRequest(views.html.posts.show(post, formWithErrors)),
          commentContent =>
            commit(expected, CommentAdded(postId, post.nextCommentId, commentContent))(
              onCommit = Redirect(routes.PostsController.show(postId)).flashing("info" -> "Comment added."),
              onConflict = (actual, conflicts) => Conflict(views.html.posts.show(post, commentContentForm.fill(commentContent), conflicts))))
      }
    }

    def delete(postId: PostId, expected: StreamRevision, commentId: CommentId) = Action { implicit request =>
      withPost(postId) { post =>
        def deletedResult = Redirect(routes.PostsController.show(postId)).flashing("info" -> "Comment deleted.")
        post.comments.get(commentId) match {
          case None => deletedResult
          case Some(comment) =>
            commit(expected, CommentDeleted(postId, commentId))(
              onCommit = deletedResult,
              onConflict = (actual, conflicts) => Conflict(views.html.posts.show(post, commentContentForm, conflicts)))
        }
      }
    }
  }

  // [...]

The code is a bit more complicated than the basic post actions, since adding and deleting comments requires the presence of a blog post instance. The withPost helper method is used to read the required post from the memory image, or render a 404 Not Found result if the post is not present.

In the case of the add method we then validate the submitted form (line 14). If incorrect, we rerender the page with the error messages (line 15). Otherwise we commit a new CommentAdded event using the provided content and the next available comment id (line 17). If the commit succeeds without conflict, we redirect the user to the blog post with the added comment (line 18). In case there is a conflict, we rerender the form but add some information on the conflicts that occurred (line 19). The delete action is very similar.

This basically completes the addition of some new functionality. The effort required is comparable to a traditional database backed application, which is good to know. But there is one fly in the ointment we need to fix before we can call it a day…

Conflict resolution

Now that we can add comments to post you’ll soon discover that multiple concurrent users will quickly get a conflict. In part 2 we added conflict detection and rendered a placeholder page whenever a conflict occurred. Now that conflicts are more likely to occur, we need to be a bit smarter about resolving these conflicts.

The basic idea is that comment events usually do not conflict, even when they apply to the same post and therefore the same event stream. We can capture this knowledge in a method like this:

  def conflictsWith(committed: PostEvent, attempted: PostEvent) = 
    (committed, attempted) match {
      case (a: PostCommentEvent, b: PostCommentEvent) => a.commentId == b.commentId
      case (_: PostCommentEvent, _)                   => false
      case _                                          => true
    }

This method states that any two PostCommentEvents only conflict when they affect the same comment (line 3). Furthermore, any new blog post related event does not conflict with an already committed PostCommentEvent (line 4). This allows the author to edit the blog post without getting conflicts on added or removed comments. Any other event combination is considered to conflict (line 5). So if you add a comment while someone edited the post, the system will give you a warning and ask you to resubmit your comment, if it is still applicable.

Notice that the main goal of this method is to decide whether we should ask the user for confirmation when a conflict occurs, or whether we should just proceed with the commit even though changes were made while the user was working on their request. This is rather subjective and the details will vary depending on your domain, your users, etc.

So now that we have a way to decide if two events conflict or not we need to modify our commit method to take this into account. This method is defined in the PostsController:

  /**
   * Commits an event and applies it to the current state. If successful the
   * provided `onCommit` callback is invoked and its result returned. Otherwise
   * the `onConflict` is callback is invoked and its result returned.
   */
  private[this] def commit
        (expected: StreamRevision, event: PostEvent)
        (onCommit: => Result,
         onConflict: (StreamRevision, Seq[PostEvent]) => Result): Result = {
    def resolveConflict(committed: Seq[PostEvent], attempted: PostEvent) = {
      val conflicting = committed.filter(PostEvent.conflictsWith(_, attempted))
      if (conflicting.isEmpty) Right(attempted)
      else Left(conflicting)
    }

    @tailrec def run(expected: StreamRevision, event: PostEvent): Result =
      memoryImage.tryCommit(event.postId.toString, expected, event) match {
        case Right(commit) => 
          onCommit
        case Left(conflict) => 
          resolveConflict(conflict.conflicting.flatMap(_.events), event) match {
            case Right(event)      => run(conflict.actual, event)
            case Left(conflicting) => onConflict(conflict.actual, conflicting)
          }
      }

    run(expected, event)
  }

The new commit method implementation first defines a resolveConflict helper method (line 10), which takes a list of already committed, potentially conflicting events and uses the PostEvent.conflictsWith method to see if there are any real conflicts. If there are none, the attempted event is returned. Otherwise the conflicting events are returned.

The run method (line 16) runs in a (tail-recursive) loop. It tries to commit against the expected revision of the event stream. If there is no conflict, it returns the result of the provided onCommit callback. Otherwise it tries to resolve the conflicts. If this succeeds, it tries invokes itself again[2], but now with the latest known event stream revision as the expected revision. If the conflict cannot be resolved, the result of the onConflict callback is returned instead.

Finally, line 27 simply kicks off the entire process using the provided revision and event.

With this in place actual conflicts should be quite rare. But we can still do better than just showing a generic “there was a conflict” error page. This is the job of the conflictsMessagePanel.scala.html template. This template shows a human readable version of the conflicts that occurred:

Conflict alert panel

Summary

Besides adding support for conflict resolution, the implementation of the blog post comment functionality was quite straightforward. In a blogging application conflicts are probably quite rare, so it may not make sense to build in extensive UI support for this, but having this as an example hopefully gives you some idea of what is possible. In more collaborative applications this kind of functionality is much more interesting and you may prefer to immediately push updates directly to the client, instead of waiting for the client to submit a form. An example of this is Pivotal Tracker or Apache Wave.

If your application has extreme availability requirements, similar conflict resolution can also help you deal with recovering from network partitioning. Or applications that need to be able to run in disconnected mode. In these cases you will need to write an event stream function that merges divergent event streams. Version control systems are examples of this, and can be a source of inspiration, although they usually don’t have intention revealing events to work with.

But the main point is that the level of conflict resolution you need depends on your users and your application. Event sourcing gives you a great tool to make conflicts easier to resolve, without necessarily complicating your application if you do not need this kind of functionality.

In the next part we’ll take a look at another kind of concurrency conflict that can occur in the current application when two or more users committing events to the same event stream nearly simultaneously.

Footnotes:

[1] The immutable SortedMap implementation is rather broken before Scala 2.10, but it’ll probably do for this example.

[2] In practice you should put some limit on the number of retries, to avoid bugs causing infinite loops.

 Leave a Reply

(required)

(required)

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>