Friday, May 27, 2011

Beyond Chat : Lift, Comet and Akka

Earlier this week Diego Medina put together a really nice blog post about Comet in Lift, and that reminded me that I've been meaning to write this one.

A few weeks ago, as part of the Learn Scala in 2 Days workshop, I wrote a little demo of Lift as a frontend to the adventure game we were developing for the class project. At that point in the workshop we had already gone from using a simple text-input client to using Akka to decouple I/O from game logic, so adding in a CometActor was a no-brainer. In this post I'm going to go over that Akka interaction as well as detailing how I wrote the CometActor.

Game Overview



Before we get into the Lift side of things, I want to provide an overview of the game components that we're working with. Full source for the demo is available on GitHub, but I'll pull relevant snippets out.

First, let's start with the interaction model. We have a single GameActor engine (Akka) that is responsible for receiving messages from the frontend and dispatching them. The very first message the frontend must send to the GameActor is a Join message:

case class Join(name : String, input : ActorRef, display : ActorRef)


The join message contains three values: the desired name of the player joining, as well as references to the frontend actors responsible for displaying and processing choices (input) and for displaying status messages. Once the GameActor has these references, everything can be done asynchronously because at that point it's all just a system of actors. We do make one exception and use a synchronous request/reply when sending the Join message so that the GameActor can indicate to the frontend that the Join succeeded.

In any case, after the initial Join the GameActor can send one of two messages to the frontend. First, the "input" actor can be sent a Prompt message:

case class Prompt(status : String, message: Option[String], choices : List[Choice])
case class Choice(description : String, agent : ActorRef, action : Action)
sealed trait Action


The Prompt message basically contains the current status of the player, an optional message (e.g. "You have moved to X"), and a List of current Choices for the player. A Choice simply binds a description to some Action as well as the actor that will perform the Action. Remember, everything here is an actor (including other players), so this allows us to do direct chats between users among other things. Action is just a marker trait so that we can constrain things. On the frontend side, when a choice is made the frontend simply sends the "action" instance to the "agent" instance as specified in the Choice.

We also want the GameActor to be able to asynchronously update the player's display, so we'll add a Display message:

case class Display(message : String) extends Action


Note that we also make this an Action so that it can pull double-duty as the message that players send to one another for chat.

On to Lift



Now that we have an outline of interaction between the frontend and the game engine, let's look at how we can realize the frontend in Lift using a CometActor. This source is in GameDisplay.scala.

The first thing we want to do is define a case class to hold our current state. In Lift, Comet components live for the duration of the page view, so re-rendering the page will create a new component. In our case we want the state to stay in a user's session so that a refresh doesn't end their game.

case class ClientState(name : String,
status : String,
messages : List[String],
choices : List[Choice])


In our GameDisplay, we set up a SessionVar to hold this state in a Box so that we can represent an unjoined state as Empty.

class GameDisplay extends CometActor {
...
object currentState extends
SessionVar[Box[ClientState]](Empty)
...


Bridging in Akka



Before we get into the render and message-processing logic of our GameDisplay, we have one more piece of plumbing to create: a bridge between Akka and Lift Actors. Lift has its own Actor library, and it's based on the same core concepts, but their not directly interchangable. For the purposes of this demo I wrote a small proxy Actor with Akka to forward messages from the Akka side of things back to the CometActor (remember, we pass Akka ActorRefs to the GameEngine):

class BridgeActor extends Actor {
private var target : Option[CometActor] = None
def receive = {
case comet : CometActor => target = Some(comet)
case msg => target.foreach(_ ! msg)
}
}


Basically you can send a reference to a CometActor to the BridgeActor and it registers it as the forwarding target. Any other message is sent to the forwarding target, if defined. There's actually a cleaner way to do this by using structural types and redefining the Join case class (and GameActor's internal code) as:

object ActorType {
type Basic = { def ! (msg : Any) : Unit }
}
case class Join(name : String, input : ActorType.Basic, display : ActorType.Basic)


Just remember that structural typing uses reflection, so you need to be aware of performance and functionality constraints if you want to use it.

In any case, our code uses the BridgeActor, so we need to set that up in our GameDisplay constructor by instantiating it Akka-style (Actors.actorOf(...).start()) and then sending it a "this" reference to register our GameDisplay as the forwarding target:

class GameDisplay extends CometActor {
// A bridge between the Lift and Akka actor libraries
private val bridge = Actors.actorOf(classOf[BridgeActor]).start()
bridge ! this

// Make sure to stop our BridgeActor when we clean up Comet
override protected def localShutdown() {
  bridge.stop()
}
...

Note that we also override our localShutdown method to make sure we clean up the BridgeActor when our own Comet component is cleaned up (iron9light, thanks for pointing this out).

Comet Rendering



At this point we've defined how we'll store our Comet state and how it will receive messages from the game engine, so now it's time to actually look at the rendering and message processing. In Lift, the CometActor's main render is handled by the "render" method (surprise!). In our case, we want to render differently based on whether the player has joined the game. First, let's look at what happens when the player hasn't joined the game.

def render = currentState.is match {
case Empty => {
/* We need to prompt the player for their name in order to join
* the game. The ajaxForm method wraps a regular HTML form
* (specified here directly with Scala's XML literals) and processes
* the form as an AJAX submission. */
SHtml.ajaxForm(
What's your name? ++
/* SHtml.text generates a text input that invokes a Scala
* callback (in this case, the login method) with the text
* it contains when the form is submitted. */
SHtml.text("", login) ++
<input type="submit" value="Log in" />
)
}
...
}


Hopefully the comments are self-explanatory, but I do want to point out how nice Lift's AJAX support is. Simply wrapping a normal form in SHtml.ajaxForm(...) gets me a form that will submit via an AJAX call. AJAX and Comet work really well together, and Lift has first-class support both.

Sending Messages Synchronously



At this point we've rendered a simple form out to the user, and when they put in their name and hit the "Log in" button we'll get a callback (from the SHtml.text input) with the submitted name in the login method:

def login (name : String) {
Controller.game !! Join(name, bridge, bridge) match {
case Some(Prompt(status,message,choices)) =>
currentState.set(Full(ClientState(name,status,message.map(List(_)) getOrElse Nil,choices)))
case other => error("Error: " + other)
}
reRender()
}


The "!!" is actor-ese (supported by both Lift and Akka actors) to send a message to an actor and wait for a reply. In this example we're sending the Join message to our game engine and wait for the game engine's response to pattern match. Note that the return type fo "!!" is an Option (or Box in LiftActor) because it's possible that an error may occur during processing. We only care about the game engine returning our first Prompt, so I've skimped a bit on error handling (errors are simply displayed as a Lift error message), but in a real app you might want something more involved.

If we do get back a prompt, we want to initialize our state by setting a new Full box containing said state. We simply copy in the name and current choices, and we map the optional message to a List. In either the success or error case we re-render the whole page since everything is going to change.

Rendering with CSS bindings



Now that we have some state, we use the second match in our render method to produce some markup:

def render = currentState.is match {
...
case Full(state @ ClientState(name, status, messages, choices)) => {
/* When we have state to render, utilize Lift's
* CSS binding Domain-Specific Language (DSL) to
* process the template we were given. More on CSS Bindings
* can be found here:
*
* http://www.assembla.com/wiki/show/liftweb/Binding_via_CSS_Selectors
*/
"#status *" #> status &
"#messages *" #> messages.reverse.map{Text(_) ++ <br/>} &
".choice" #> generateChoices(state)
}
}  


This is simply a set of CSS selector transforms that fill in our template (src/main/webapp/index.html):

<div class="lift:comet?type=GameDisplay">
<h1>Status: <span id="status">Nothing</span></h1>
<h1>Messages:</h1>
<div id="messages"></div>
<h1>You may:</h1>
<div id="choices">
<ul>
<li class="choice">Something</li>
</ul>
</div>
</div>


We delegate to the "generateChoices" method to create the actual choices since we'll want to use this in other places:

/**
* Because we need to generate choices in both our main render and our
* updateChoices methods, we refactor out the common generation here.
*/
def generateChoices(state : ClientState) : NodeSeq = state.choices.flatMap {
/* Special handling for chat messages. Here we create a new form
* that allows us to customize our message */
case Choice(description, agent, Display(otherPlayer)) => {
SHtml.ajaxForm(
<li>Say "{ SHtml.text("hi", message => agent ! Display(state.name + " says: " + message)) }" to {otherPlayer}
<input type="submit" value="Send!" /></li>
)
}
case choice =>
SHtml.a(() => perform(choice), <li>{choice.description}</li>)
}


Here's where we get a little fancy. We're going to flatMap over our list of choices to produce a NodeSeq of the markup. If the choice happens to be a "Display" choice (remember, this is for chat or one-way notification), we generate a text input inside an AJAX form so that we can send user-specified text to the other player's display actor. If it's a normal choice, we just set up an AJAX link to perform the choice when clicked. Performing the choice simply sends the Choice's action to the Choice's actor and waits for a Prompt response. If it receives a proper Prompt response it sends it to itself (this allows for uniform handling of Prompt changes):

choice.agent !! ClientChoice(state.name, choice.action) match {
case Some(p : Prompt) => this ! p
case other => error(other.toString)
}


Reacting to External Events



Now, up to this point everything is user-driven. However, the fact that Lift's Comet support is actor-driven means we can respond to events triggered in the game engine, too. We achieve this by hooking into the actor processing on the mediumPriority method. This is a PartialFunction[Any,Unit] that can process selected messages. Remember, to work with Akka we've provided a bridge, but otherwise there's no difference if you decide to stick entirely with Lift actors (or some other impl).

Our mediumPriority handler wants to cover two main cases. First, we need to handle display messages sent to us, either from other players or from the game engine:

override def mediumPriority = {
case Display(message) => currentState.foreach {
state => {
currentState.set(Full(state.copy(messages = message :: state.messages)))
partialUpdate(updateMessages())
}
}
...
}


If we receive a new Display message, we update our state (using the ClientState case class's copy method) and then call partialUpdate. The partialUpdate method allows us to send arbitrary JavaScript commands to the client asynchronously to update the page. In our case, the updateMessages method uses JsCmds.SetHtml to replace the "messages" div with our new list of messages:

def updateMessages() : JsCmd = {
currentState.is.map {
state => JsCmds.SetHtml("messages", state.messages.reverse.flatMap{Text(_) ++ <br/>})
} openOr JsCmds.Noop
}


Similarly, if we receive a new Prompt message, we update our state and use partialUpdate to not only render the new messages, but a new list of Choices, too:

override def mediumPriority = {
...
case Prompt(newStatus, message, newChoices) => currentState.foreach {
state => {
// Optionally prepend the provided message
val newMessages = message.map(_ :: state.messages) getOrElse state.messages

currentState.set(Full(state.copy(status = newStatus,
messages = newMessages,
choices = newChoices)))
partialUpdate(updateMessages() & updateChoices())
}
}
}


The updateChoices method is similar to updateMessages, except we can use the CSS transforms again on the "defaultHtml" member. This member is initialized to whatever the content of our Comet template tag is, so we can reuse it later. Remember that CSS transforms are really just NodeSeq => NodeSeq functions, which is why we can use "apply" to perform the transform in place:

def updateChoices() : JsCmd = {
currentState.is.map {
state => JsCmds.SetHtml("choices",
("#choices ^^" #> "ignore" &
".choice" #> generateChoices(state)).apply(defaultHtml)
) &
JsCmds.SetHtml("status", Text(state.status))
} openOr JsCmds.Noop
}


Wrapping Up



As you can see, we can add a lot of rich functionality with very little code. Excluding the game engine itself, the Comet actor is 170 lines of code including comments, and the template is about 30. We're able to perform any action that the game engine sends us, including customized chat with other players. Lift's use of the Actor model makes event-driven rendering simple and allows for some fantastic client-side functionality.

I hope that people find this post informative, and I welcome feedback. Happy Lifting!

Tuesday, February 8, 2011

It's Not the Size, It's How You Use It

Yesterday I saw a flurry of discussion surrounding a post deriding not just message queueing systems but people who choose to use them. Normally, considering the tone of the post, I would just ignore it as a rant (did he get saddled with a poor implementation somewhere?), but in this case I actually use queues in a moderate-sized system and I'd like to think I'm not a complete idiot. The post makes some statements about queues that contain some truth to them (as well as some that appear misinformed), but mostly it overgeneralizes and doesn't attempt to provide any in-depth discussion about the issues you need to consider when using (or choosing to use) queues. It certainly doesn't talk about any benefits. That's what this post is for.

First, a caveat: by no means am I an expert in queuing systems, but I'd like to think that I'm a decent programmer and I've been working with queuing systems in one form or another for about 5 years. If I misstate something here please let me know so I can correct it.

The Router Analogy



As a network engineer, I deal with queues all of the time. These aren't the message passing queues that Ted is talking about per se (I'll get to those shortly), but they share some characteristics. First and foremost, the queues in routers and switches provide elasticity, which in turn provides better link utilization. Network traffic can be quite bursty, and without queues to smooth out the bursts congestion would have a much more significant (and negative) effect than it otherwise does. The same holds true for a job processing system or any other message passing scenario: a queue allows you to plan for the steady state but deal with the occasional spike.

Another use of queues, coming from the networking perspective, is prioritization. In network equipment prioritization is typically done via 2 or more queues for a given egress port, with a variety of scheduling algorithms used to pull messages from the different queues. You could do the exact same thing with a queuing system, although many brokers support some form of prioritization within a given queue.

As Ted points out, you can achieve both of these goals with a database, syslog, or even flat text files (I repeat myself), but the question is, do you want to? Sure, I can see scenarios where you might have relatively few/relaxed requirements, but a modern message broker is not the simplistic tool that Ted makes them out to be. For all but the simplest cases you're going to be duplicating a lot of work that others have done (NIH syndrome?) and you probably won't write something as scalable or robust as what's already out there.

Dude, Where's My Message?



One of the claims that the post makes is that


Depending on your queue implementation, when you pop a message off, it's gone. The consumer acknowledges receipt of it, and the queue forgets about it. So, if your data processor fails, you've got data loss.


Well, the key there is "Depending on your queue implementation". Most brokers I've worked with offer the choice between auto-ACK and explicit ACK. They also can provide things like transactions and durable messages, but I'm not going to get into too much detail here. The way you typically deal with work that you don't want to lose is to pop the message, process it, and then ACK it after you're finished. This is called "at-least-once" processing because if your consumer falls over after it finishes processing but before the ACK, the message will go back on the queue and some other consumer will grab the message and reprocess it. For this reason it's recommended that the processing is idempotent. Without the explicit ACK messaging is basically "at-most-once". There are also cases where you really don't care if you lose a message, so no one is trying to say that one size fits all.

You can also try for "exactly-once", but things get significantly more complex. In fact, you really can't achieve it 100% of the time because, hey, we live in a non-deterministic world. There will always be some failure mode, although you can certainly approximate 100% guarantees to successive decimal points. In other words, your solution depends on what's good enough for your requirements.

As an example, let's look at the post's suggestion of using a database or syslog for collecting job messages. Publishing to syslog is unacknowledged, and clients often have buffers sitting between them and the syslog daemon (network, I/O, etc). It's entirely possible that work would get lost because the message never makes it to the syslog daemon in the first place. Once it does make it to the log you need to keep track of which messages you've processed and which ones remain. You're keeping state somewhere, so with a failure it's still possible that your consumer could reprocess messages, etc. Same thing with databases. You can use locking, transactions, etc to try and guard against lost/reprocessed messages and you're still in the same boat as with message queuing, you're just writing a lot of the logic yourself.

Shades of Blur



Another point that the post tries to make is the intractability of monitoring a queuing system. I think that the claim that using a message queue


...blurs your mental model of what's going on. You end up expecting synchronous behavior out of a system that's asynchronous.


is missing the point. What I would say instead is that you need to really understand the systems you're using. That means failure modes, limitations, and fundamentals of operation. This is me speaking not just as a developer, but as a network engineer and sysadmin. Disks will crash, RAID arrays will get corrupted, and BGP will mysteriously route your traffic from Saint Louis to Charlotte via a heavily congested T3 in DC (thanks Cogent!). I've seen syslog crash and fall over on Solaris and I've seen databases corrupt in all sorts of unique ways.

If you need strict synchronous behavior then a messaging system is probably the wrong tool. But if you understand the limitations going in then you make an informed decision. System engineering and architecture is seldom as black-and-white as this post portrays.

Keeping an Eye on Things



As I said before, queues can provide elasticity for the occasional spike, but if you're constantly running behind you need to reassess what you're doing. Add more consumers, prioritize, etc. Re-architecting for scale is not something unique to message queues. Database sharding, load balancing, etc, all attest to this.

A major component of this is monitoring, and monitoring well. Ted asks how you can monitor a queue with just the length. For starters, throughput, and therefor wait time (from production to consumption) is usually a more important metric in the systems I've worked on. Queue length could vary significantly from moment to moment depending on your production patterns, but if there's a good bound on how long messages take to process then you don't really care. If your processing time per-message stays fairly constant then queue length can be a good predictor for queue wait, but this isn't always the case.

For example, at my job I write and maintain an image processing system that renders images for our orders. Each resulting image can be comprised of one or more source images and one or more operations on those images. The time to render varies depending the script, but generally stays within a certain bound. In our usage we monitor not only the queue length but the average queue throughput. We also record the duration for each render as well as each render's individual operations (crop, compose, etc), but that's more for tuning and QA on the render engines than for indicating health of the system at this point. So far this system has served us well. We have a good idea of how big the queue gets depending on time of year, day of week, etc, and we know that with our current set of servers we should be seeing roughly the same throughput at any given time with a non-empty queue. What constitutes a healthy system is going to be different for every system whether or not it uses a queue (how big is my table, how large is the logfile, etc), so I don't see how not using a queue is going to help. In fact, most brokers I've worked with have good facilities for monitoring built-in (via SNMP, REST, etc), so again, this is something where you can either leverage other people's work or you ending up re-writing it all yourself.

Additionally, you should be monitoring the consumers. Even if you don't care about how long things take to process, you should know that things are being processed. This is not the difficult problem that the post makes it out to be, and again, you're going to want to do this no matter what mechanism you're using to move jobs around. Do you think that not using a queue is going to magically obviate the need to keep an eye on the whole system? Really?

Why I Use Queues



Thus far I've basically worked at refuting or expanding on some of the points (accusations?) made in the post. Now I would like to cover some of the things that I think make queuing systems beneficial.

First, queues not only decouple the producer and consumer, but they decouple the routing between the producer and consumer. In some systems the routing is configured on the broker itself, in others the routing can be specified by the consumers (AMQP, for example). This allows a lot of flexibility in terms of how you process messages. Because the transport between producer and consumer is abstracted, you can do interesting things like message transformation by re-routing the messages via a middle-man (see Apache Camel, among others). Not everyone will need this flexibility, but when you need it you don't have to make any changes to your producers or consumers.

Another aspect of this decoupling is the ability to transparently inspect traffic going through the queue. Much as you would configure a mirror port on a switch to do a packet capture, we can tap into a queue and see the traffic that's going through production. In my case I use this to sample the work queue to build QA data sets. That way we can run a representative sample of work without having to build our own artificial jobs to mimic the real work.

The second thing I like about queuing systems is that they can provide a unified infrastructure service for job management. Now, there is always the danger of holding a hammer and seeing everything as a nail. In our enterprise, however, we have a lot of disparate systems that do batch processing and with queuing we can get a better overall picture of the processing via a single service. We're also looking at federating 3000+ brokers so that our various sites can use a global addressing scheme to support disconnected operation (a limitation of our environment) without having to put all of the logic into each and every client and service. Given the option of writing an federated job system by myself and writing a small federation agent (about 600 lines of Scala) I'm always going to choose the latter.

Is He Done Yet?



Almost. Message queuing systems, like any other arrow in an engineer's quiver, can be misapplied. But the mis-/overuse of a given tool by the naive is no reason to tar it (or the people who use it) as useless. In the 14 or so years I've been programming I've seen misuse of a lot of techniques and tools, but that just prompts me to learn from those mistakes and try to pass what I've learned on to others. In that spirit, I hope this post succeeds in making developers better informed about queuing systems and where they work and where they don't.