The workhorse of our application is the fetcher, the actor responsible for fetching the follower details from GitHub. In the first instance, our actor will accept a single message, Fetch(user)
. It will fetch the followers corresponding to user
and log the response to screen. We will use the recipes developed in Chapter 7, Web APIs, to query the GitHub API with an OAuth token. We will inject the token through the actor constructor.
Let's start with the companion object. This will contain the definition of the Fetch(user)
message and two factory methods to create the Props
instances. You can find the code examples for this section in the chap09/fetchers_alone
directory in the sample code provided with this book (https://github.com/pbugnion/s4ds):
// Fetcher.scala import akka.actor._ import scalaj.http._ import scala.concurrent.Future object Fetcher { // message definitions case class Fetch(login:String) // Props factory definitions def props(token:Option[String]):Props = Props(classOf[Fetcher], token) def props():Props = Props(classOf[Fetcher], None) }
Let's now define the fetcher itself. We will wrap the call to the GitHub API in a future. This avoids a single slow request blocking the actor. When our actor receives a Fetch
request, it wraps this request into a future, sends it off, and can then process the next message. Let's go ahead and implement our actor:
// Fetcher.scala class Fetcher(val token:Option[String]) extends Actor with ActorLogging { import Fetcher._ // import message definition // We will need an execution context for the future. // Recall that the dispatcher doubles up as execution // context. import context.dispatcher def receive = { case Fetch(login) => fetchUrl(login) } private def fetchUrl(login:String) { val unauthorizedRequest = Http( s"https://api.github.com/users/$login/followers") val authorizedRequest = token.map { t => unauthorizedRequest.header("Authorization", s"token $t") } // Prepare the request: try to use the authorized request // if a token was given, and fall back on an unauthorized // request val request = authorizedRequest.getOrElse(unauthorizedRequest) // Fetch from github val response = Future { request.asString } response.onComplete { r => log.info(s"Response from $login: $r") } } }
Let's instantiate an actor system and four fetchers to check whether our actor is working as expected. We will read the GitHub token from the environment, as described in Chapter 7, Web APIs, then create four actors and ask each one to fetch the followers of a particular GitHub user. We wait five seconds for the requests to get completed, and then shut the system down:
// FetcherDemo.scala import akka.actor._ object FetcherDemo extends App { import Fetcher._ // Import the messages val system = ActorSystem("fetchers") // Read the github token if present. val token = sys.env.get("GHTOKEN") val fetchers = (0 until 4).map { i => system.actorOf(Fetcher.props(token)) } fetchers(0) ! Fetch("odersky") fetchers(1) ! Fetch("derekwyatt") fetchers(2) ! Fetch("rkuhn") fetchers(3) ! Fetch("tototoshi") Thread.sleep(5000) // Wait for API calls to finish system.shutdown // Shut system down }
Let's run the code through SBT:
$ GHTOKEN="2502761..." sbt run [INFO] [11/08/2015 16:28:06.500] [fetchers-akka.actor.default-dispatcher-2] [akka://fetchers/user/$d] Response from tototoshi: Success(HttpResponse([{"login":"akr4","id":10892,"avatar_url":"https://avatars.githubusercontent.com/u/10892?v=3","gravatar_id":""...
Notice how we explicitly need to shut the actor system down using system.shutdown
. The program hangs until the system is shut down. However, shutting down the system will stop all the actors, so we need to make sure that they have finished working. We do this by inserting a call to Thread.sleep
.
Using Thread.sleep
to wait until the API calls have finished to shut down the actor system is a little crude. A better approach could be to let the actors signal back to the system that they have completed their task. We will see examples of this pattern later when we implement the fetcher manager actor.
Akka includes a feature-rich scheduler to schedule events. We can use the scheduler to replace the call to Thread.sleep
by scheduling a system shutdown five seconds in the future. This is preferable as the scheduler does not block the calling thread, unlike Thread.sleep
. To use the scheduler, we need to import a global execution context and the duration module:
// FetcherDemoWithScheduler.scala import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._
We can then schedule a system shutdown by replacing our call to Thread.sleep
with the following:
system.scheduler.scheduleOnce(5.seconds) { system.shutdown }
Besides scheduleOnce
, the scheduler also exposes a schedule
method that lets you schedule events to happen regularly (every two seconds, for instance). This is useful for heartbeat checks or monitoring systems. For more information, read the API documentation on the scheduler available at http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html.
Note that we are actually cheating a little bit here by not fetching every follower. The response to the follower's query is actually paginated, so we would need to fetch several pages to fetch all the followers. Adding logic to the actor to do this is not terribly complicated. We will ignore this for now and assume that users are capped at 100 followers each.