Let's insert some documents into our newly created database. We want to store information about GitHub users, using the following document structure:
{ id: <mongodb object id>, login: "pbugnion", github_id: 1392879, repos: [ { name: "scikit-monaco", id: 14821551, language: "Python" }, { name: "contactpp", id: 20448325, language: "Python" } ] }
Casbah provides a DBObject
class to represent MongoDB documents (and subdocuments) in Scala. Let's start by creating a DBObject
instance for each repository subdocument:
scala> val repo1 = DBObject("name" -> "scikit-monaco", "id" -> 14821551, "language" -> "Python") repo1: DBObject = { "name" : "scikit-monaco" , "id" : 14821551, "language" : "Python"}
As you can see, a DBObject
is just a list of key-value pairs, where the keys are strings. The values have compile-time type AnyRef
, but Casbah will fail (at runtime) if you try to add a value that cannot be serialized.
We can also create DBObject
instances from lists of key-value pairs directly. This is particularly useful when converting from a Scala map to a DBObject
:
scala> val fields:Map[String, Any] = Map( "name" -> "contactpp", "id" -> 20448325, "language" -> "Python" ) Map[String, Any] = Map(name -> contactpp, id -> 20448325, language -> Python) scala> val repo2 = DBObject(fields.toList) repo2: dDBObject = { "name" : "contactpp" , "id" : 20448325, "language" : "Python"}
The DBObject
class provides many of the same methods as a map. For instance, we can address individual fields:
scala> repo1("name") AnyRef = scikit-monaco
We can construct a new object by adding a field to an existing object:
scala> repo1 + ("fork" -> true) mutable.Map[String,Any] = { "name" : "scikit-monaco" , "id" : 14821551, "language" : "python", "fork" : true}
Note the return type: mutable.Map[String,Any]
. Rather than implementing methods such as +
directly, Casbah adds them to DBObject
by providing an implicit conversion to and from mutable.Map
.
New DBObject
instances can also be created by concatenating two existing instances:
scala> repo1 ++ DBObject( "locs" -> 6342, "description" -> "Python library for Monte Carlo integration" ) DBObject = { "name" : "scikit-monaco" , "id" : 14821551, "language" : "Python", "locs" : 6342 , "description" : "Python library for Monte Carlo integration"}
DBObject
instances can then be inserted into a collection using the +=
operator. Let's insert our first document into the user
collection:
scala> val userDocument = DBObject( "login" -> "pbugnion", "github_id" -> 1392879, "repos" -> List(repo1, repo2) ) userDocument: DBObject = { "login" : "pbugnion" , ... } scala> val coll = MongoClient()("github")("users") coll: com.mongodb.casbah.MongoCollection = users scala> coll += userDocument com.mongodb.casbah.TypeImports.WriteResult = WriteResult{, n=0, updateOfExisting=false, upsertedId=null}
A database containing a single document is a bit boring, so let's add a few more documents queried directly from the GitHub API. You learned how to query the GitHub API in the previous chapter, so we won't dwell on how to do this here.
In the code examples for this chapter, we have provided a class called GitHubUserIterator
that queries the GitHub API (specifically the /users
endpoint) for user documents, converts them to a case class, and offers them as an iterator. You will find the class in the code examples for this chapter (available on GitHub at https://github.com/pbugnion/s4ds/tree/master/chap08) in the GitHubUserIterator.scala
file. The easiest way to have access to the class is to open an SBT console in the directory of the code examples for this chapter. The API then fetches users in increasing order of their login ID:
scala> val it = new GitHubUserIterator it: GitHubUserIterator = non-empty iterator scala> it.next // Fetch the first user User = User(mojombo,1,List(Repo(...
GitHubUserIterator
returns instances of the User
case class, defined as follows:
// User.scala case class User(login:String, id:Long, repos:List[Repo]) // Repo.scala case class Repo(name:String, id:Long, language:String)
Let's write a short program to fetch 500 users and insert them into the MongoDB database. We will need to authenticate with the GitHub API to retrieve these users. The constructor for GitHubUserIterator
takes the GitHub OAuth token as an optional argument. We will inject the token through the environment, as we did in the previous chapter.
We first give the entire code listing before breaking it down—if you are typing this out, you will need to copy GitHubUserIterator.scala
from the code examples for this chapter to the directory in which you are running this to access the GitHubUserIterator
class. The class relies on scalaj-http
and json4s
, so either copy the build.sbt
file from the code examples or specify those packages as dependencies in your build.sbt
file.
// InsertUsers.scala import com.mongodb.casbah.Imports._ object InsertUsers { /** Function for reading GitHub token from environment. */ lazy val token:Option[String] = sys.env.get("GHTOKEN") orElse { println("No token found: continuing without authentication") None } /** Transform a Repo instance to a DBObject */ def repoToDBObject(repo:Repo):DBObject = DBObject( "github_id" -> repo.id, "name" -> repo.name, "language" -> repo.language ) /** Transform a User instance to a DBObject */ def userToDBObject(user:User):DBObject = DBObject( "github_id" -> user.id, "login" -> user.login, "repos" -> user.repos.map(repoToDBObject) ) /** Insert a list of users into a collection. */ def insertUsers(coll:MongoCollection)(users:Iterable[User]) { users.foreach { user => coll += userToDBObject(user) } } /** Fetch users from GitHub and passes them to `inserter` */ def ingestUsers(nusers:Int)(inserter:Iterable[User] => Unit) { val it = new GitHubUserIterator(token) val users = it.take(nusers).toList inserter(users) } def main(args:Array[String]) { val coll = MongoClient()("github")("users") val nusers = 500 coll.dropCollection() val inserter = insertUsers(coll)_ ingestUsers(inserter)(nusers) } }
Before diving into the details of how this program works, let's run it through SBT. You will want to query the API with authentication to avoid hitting the rate limit. Recall that we need to set the GHTOKEN
environment variable:
$ GHTOKEN="e83638..." sbt $ runMain InsertUsers
The program will take about five minutes to run (depending on your Internet connection). To verify that the program works, we can query the number of documents in the users
collection of the github
database:
$ mongo github --quiet --eval "db.users.count()" 500
Let's break the code down. We first load the OAuth token to authenticate with the GithHub API. The token is stored as an environment variable, GHTOKEN
. The token
variable is a lazy val
, so the token is loaded only when we formulate the first request to the API. We have already used this pattern in Chapter 7, Web APIs.
We then define two methods to transform from classes in the domain model to DBObject
instances:
def repoToDBObject(repo:Repo):DBObject = ... def userToDBObject(user:User):DBObject = ...
Armed with these two methods, we can add users to our MongoDB collection easily:
def insertUsers(coll:MongoCollection)(users:Iterable[User]) { users.foreach { user => coll += userToDBObject(user) } }
We used currying to split the arguments of insertUsers
. This lets us use insertUsers
as a function factory:
val inserter = insertUsers(coll)_
This creates a new method, inserter
, with signature Iterable[User] => Unit
that inserts users into coll
. To see how this might come in useful, let's write a function to wrap the whole data ingestion process. This is how a first attempt at this function could look:
def ingestUsers(nusers:Int)(inserter:Iterable[User] => Unit) { val it = new GitHubUserIterator(token) val users = it.take(nusers).toList inserter(users) }
Notice how ingestUsers
takes a method that specifies how the list of users is inserted into the database as its second argument. This function encapsulates the entire code specific to insertion into a MongoDB collection. If we decide, at some later date, that we hate MongoDB and must insert the documents into a SQL database or write them to a flat file, all we need to do is pass a different inserter
function to ingestUsers
. The rest of the code remains the same. This demonstrates the increased flexibility afforded by using higher-order functions: we can easily build a framework and let the client code plug in the components that it needs.
The ingestUsers
method, as defined previously, has one problem: if the nusers
value is large, it will consume a lot of memory in constructing the entire list of users. A better solution would be to break it down into batches: we fetch a batch of users from the API, insert them into the database, and move on to the next batch. This allows us to control memory usage by changing the batch size. It is also more fault tolerant: if the program crashes, we can just restart from the last successfully inserted batch.
The .grouped
method, available on all iterables, is useful for batching. It returns an iterator over fragments of the original iterable:
scala> val it = (0 to 10) it: Range.Inclusive = Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> it.grouped(3).foreach { println } // In batches of 3 Vector(0, 1, 2) Vector(3, 4, 5) Vector(6, 7, 8) Vector(9, 10)
Let's rewrite our ingestUsers
method to use batches. We will also add a progress report after each batch in order to give the user some feedback:
/** Fetch users from GitHub and pass them to `inserter` */
def ingestUsers(nusers:Int)(inserter:Iterable[User] => Unit) {
val batchSize = 100
val it = new GitHubUserIterator(token)
print("Inserted #users: ")
it.take(nusers).grouped(batchSize).zipWithIndex.foreach {
case (users, batchNumber) =>
print(s"${batchNumber*batchSize} ")
inserter(users)
}
println()
}
Let's look at the highlighted line more closely. We start from the user iterator, it
. We then take the first nusers
. This returns an Iterator[User]
that, instead of happily churning through every user in the GitHub database, will terminate after nusers
. We then group this iterator into batches of 100 users. The .grouped
method returns Iterator[Iterator[User]]
. We then zip each batch with its index so that we know which batch we are currently processing (we use this in the print
statement). The .zipWithIndex
method returns Iterator[(Iterator[User], Int)]
. We unpack this tuple in the loop using a case statement that binds users
to Iterator[User]
and batchNumber
to the index. Let's run this through SBT:
$ GHTOKEN="2502761..." sbt > runMain InsertUsers [info] Running InsertUsers Inserted #users: 0 100 200 300 400 [success] Total time: 215 s, completed 01-Nov-2015 18:44:30