Queue control and the pull pattern

We have now defined the three worker actors in our crawler application. The next step is to define the manager. The fetcher manager is responsible for keeping a queue of logins to fetch as well as a set of login names that we have already seen in order to avoid fetching the same logins more than once.

A first attempt might involve building an actor that keeps a set of users that we have already seen and just dispatches it to a round-robin router for fetchers when it is given a new user to fetch. The problem with this approach is that the number of messages in the fetchers' mailboxes would accumulate quickly: for each API query, we are likely to get tens of followers, each of which is likely to make it back to a fetcher's inbox. This gives us very little control over the amount of work piling up.

The first problem that this is likely to cause involves the GitHub API rate limit: even with authentication, we are limited to 5,000 requests per hour. It would be useful to stop queries as soon as we hit this threshold. We cannot be responsive if each fetcher has a backlog of hundreds of users that they need to fetch.

A better alternative is to use a pull system: the fetchers request work from a central queue when they find themselves idle. Pull systems are common in Akka when we have a producer that produces work faster than consumers can process it (refer to http://www.michaelpollmeier.com/akka-work-pulling-pattern/).

Conversations between the manager and fetchers will proceed as follows:

  • If the manager goes from a state of having no work to having work, it sends a WorkAvailable message to all the fetchers.
  • Whenever a fetcher receives a WorkAvailable message or when it completes an item of work, it sends a GiveMeWork message to the queue manager.
  • When the queue manager receives a GiveMeWork message, it ignores the request if no work is available or it is throttled. If it has work, it sends a Fetch(user) message to the actor.

Let's start by modifying our fetcher. You can find the code examples for this section in the chap09/ghub_crawler directory in the sample code provided with this book (https://github.com/pbugnion/s4ds). We will pass a reference to the fetcher manager through the constructor. We need to change the companion object to add the WorkAvailable message and the props factory to include the reference to the manager:

// Fecther.scala
object Fetcher {
  case class Fetch(url:String)
  case object WorkAvailable

  def props(
    token:Option[String], 
    fetcherManager:ActorRef, 
    responseInterpreter:ActorRef):Props =
      Props(classOf[Fetcher], 
        token, fetcherManager, responseInterpreter)
}

We also need to change the receive method so that it queries the FetcherManager asking for more work once it's done processing a request or when it receives a WorkAvailable message.

This is the final version of the fetchers:

class Fetcher(
  val token:Option[String], 
  val fetcherManager:ActorRef,
  val responseInterpreter:ActorRef) 
extends Actor with ActorLogging {
  import Fetcher._
  import context.dispatcher

  def receive = {
    case Fetch(login) => fetchFollowers(login)
    case WorkAvailable => 
      fetcherManager ! FetcherManager.GiveMeWork
  }

  private def fetchFollowers(login:String) {
    val unauthorizedRequest = Http(
      s"https://api.github.com/users/$login/followers")
    val authorizedRequest = token.map { t =>
      unauthorizedRequest.header("Authorization", s"token $t")
    }
    val request = authorizedRequest.getOrElse(unauthorizedRequest)
    val response = Future { request.asString }

    response.onComplete { r =>
      responseInterpreter ! 
        ResponseInterpreter.InterpretResponse(login, r)
      fetcherManager ! FetcherManager.GiveMeWork
    }
  }

}

Now that we have a working definition of the fetchers, let's build the FetcherManager. This is the most complex actor that we have built so far, and, before we dive into building it, we need to learn a bit more about the components of the Akka toolkit.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset