In Chapter 5, Scala and SQL through JDBC, we investigated how to access SQL databases with JDBC. As interacting with JDBC feels somewhat unnatural, we extended JDBC using custom wrappers. The wrappers were developed to provide a functional interface to hide the imperative nature of JDBC.
With the difficulty of interacting directly with JDBC from Scala and the ubiquity of SQL databases, you would expect there to be existing Scala libraries that wrap JDBC. Slick is such a library.
Slick styles itself as a functional-relational mapping library, a play on the more traditional object-relational mapping name used to denote libraries that build objects from relational databases. It presents a functional interface to SQL databases, allowing the client to interact with them in a manner similar to native Scala collections.
In this chapter, we will use a somewhat more involved example dataset. The Federal Electoral Commission of the United States (FEC) records all donations to presidential candidates greater than $200. These records are publicly available. We will look at the donations for the campaign leading up to the 2012 general elections that resulted in Barack Obama's re-election. The data includes donations to the two presidential candidates, Obama and Romney, and also to the other contenders in the Republican primaries (there were no Democrat primaries).
In this chapter, we will take the transaction data provided by the FEC, store it in a table, and learn how to query and analyze it.
The first step is to acquire the data. If you have downloaded the code samples from the Packt website, you should already have two CSVs in the data
directory of the code samples for this chapter. If not, you can download the files using the following links:
data.scala4datascience.com/fec/ohio.csv.gz
(or ohio.csv.zip
)data.scala4datascience.com/fec/us.csv.gz
(or us.csv.zip
)Decompress the two files and place them in a directory called data/
in the same location as the source code examples for this chapter. The data files correspond to the following:
ohio.csv
file is a CSV of all the donations made by donors in Ohio.us.csv
file is a CSV of all the donations made by donors across the country. This is quite a large file, with six million rows.The two CSV files contain identical columns. Use the Ohio dataset for more responsive behavior, or the nationwide data file if you want to wrestle with a larger dataset. The dataset is adapted from a list of contributions downloaded from http://www.fec.gov/disclosurep/PDownload.do.
Let's start by creating a Scala case class to represent a transaction. In the context of this chapter, a transaction is a single donation from an individual to a candidate:
// Transaction.scala import java.sql.Date case class Transaction( id:Option[Int], // unique identifier candidate:String, // candidate receiving the donation contributor:String, // name of the contributor contributorState:String, // contributor state contributorOccupation:Option[String], // contributor job amount:Long, // amount in cents date:Date // date of the donation )
The code repository for this chapter includes helper functions in an FECData
singleton object to load the data from CSVs:
scala> val ohioData = FECData.loadOhio s4ds.FECData = s4ds.FECData@718454de
Calling FECData.loadOhio
or FECData.loadAll
will create an FECData
object with a single attribute, transactions
, which is an iterator over all the donations coming from Ohio or the entire United States:
scala> val ohioTransactions = ohioData.transactions Iterator[Transaction] = non-empty iterator scala> ohioTransactions.take(5).foreach(println) Transaction(None,Paul, Ron,BROWN, TODD W MR.,OH,Some(ENGINEER),5000,2011-01-03) Transaction(None,Paul, Ron,DIEHL, MARGO SONJA,OH,Some(RETIRED),2500,2011-01-03) Transaction(None,Paul, Ron,KIRCHMEYER, BENJAMIN,OH,Some(COMPUTER PROGRAMMER),20120,2011-01-03) Transaction(None,Obama, Barack,KEYES, STEPHEN,OH,Some(HR EXECUTIVE / ATTORNEY),10000,2011-01-03) Transaction(None,Obama, Barack,MURPHY, MIKE W,OH,Some(MANAGER),5000,2011-01-03)
Now that we have some data to play with, let's try and put it in the database so that we can run some useful queries on it.
To add Slick to the list of dependencies, you will need to add "com.typesafe.slick" %% "slick" % "2.1.0"
to the list of dependencies in your build.sbt
file. You will also need to make sure that Slick has access to a JDBC driver. In this chapter, we will connect to a MySQL database, and must, therefore, add the MySQL connector "mysql" % "mysql-connector-java" % "5.1.37"
to the list of dependencies.
Slick is imported by importing a specific database driver. As we are using MySQL, we must import the following:
scala> import slick.driver.MySQLDriver.simple._ import slick.driver.MySQLDriver.simple._
To connect to a different flavor of SQL database, import the relevant driver. The easiest way of seeing what drivers are available is to consult the API documentation for the slick.driver
package, which is available at http://slick.typesafe.com/doc/2.1.0/api/#scala.slick.driver.package. All the common SQL flavors are supported (including H2, PostgreSQL, MS SQL Server, and SQLite).
Let's create a table to represent our transactions. We will use the following schema:
CREATE TABLE transactions( id INT(11) AUTO_INCREMENT PRIMARY KEY, candidate VARCHAR(254) NOT NULL, contributor VARCHAR(254) NOT NULL, contributor_state VARCHAR(2) NOT NULL, contributor_occupation VARCHAR(254), amount BIGINT(20) NOT NULL, date DATE );
Note that the donation amount is in cents. This allows us to use an integer field (rather than a fixed point decimal, or worse, a float).
You should never use a floating point format to represent money or, in fact, any discrete quantity because floats cannot represent most fractions exactly:
scala> 0.1 + 0.2 Double = 0.30000000000000004
This seemingly nonsensical result occurs because there is no way to store 0.3 exactly in doubles.
This post gives an extensive discussion of the limitations of the floating point format:
http://docs.oracle.com/cd/E19957-01/806-3568/ncg_goldberg.html
To use Slick with tables in our database, we first need to tell Slick about the database schema. We do this by creating a class that extends the Table
abstract class. The way in which a schema is defined is quite straightforward, so let's dive straight into the code. We will store our schema in a Tables
singleton. We define a Transactions
class that provides the mapping to go from collections of Transaction
instances to SQL tables structured like the transactions
table:
// Tables.scala import java.sql.Date import slick.driver.MySQLDriver.simple._ /** Singleton object for table definitions */ object Tables { // Transactions table definition class Transactions(tag:Tag) extends Table[Transaction](tag, "transactions") { def id = column[Int]("id", O.PrimaryKey, O.AutoInc) def candidate = column[String]("candidate") def contributor = column[String]("contributor") def contributorState = column[String]( "contributor_state", O.DBType("VARCHAR(2)")) def contributorOccupation = column[Option[String]]( "contributor_occupation") def amount = column[Long]("amount") def date = column[Date]("date") def * = (id.?, candidate, contributor, contributorState, contributorOccupation, amount, date) <> ( Transaction.tupled, Transaction.unapply) } val transactions = TableQuery[Transactions] }
Let's go through this line by line. We first define a Transactions
class, which must take a Slick Tag
object as its first argument. The Tag
object is used by Slick internally to construct SQL statements. The Transactions
class extends a Table
object, passing it the tag and name of the table in the database. We could, optionally, have added a database name by extending Table[Transaction](tag, Some("fec"), "transactions")
rather than just Table[Transaction](tag, "transactions")
. The Table
type is parametrized by Transaction
. This means that running SELECT
statements on the database returns Transaction
objects. Similarly, we will insert data into the database by passing a transaction or list of transactions to the relevant Slick methods.
Let's look at the Transactions
class definition in more detail. The body of the class starts by listing the database columns. For instance, the id
column is defined as follows:
def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
We tell Slick that it should read the column called id
and transform it to a Scala integer. Additionally, we tell Slick that this column is the primary key and that it is auto-incrementing. The Slick documentation contains a list of available options for column
.
The candidate
and contributor
columns are straightforward: we tell Slick to read these as String
from the database. The contributor_state
column is a little more interesting. Besides specifying that it should be read from the database as a String
, we also tell Slick that it should be stored in the database with type VARCHAR(2)
.
The
contributor_occupation
column in our table can contain NULL
values. When defining the schema, we pass the Option[String]
type to the column method:
def contributorOccupation = column[Option[String]]("contributor_occupation")
When reading from the database, a NULL
field will get converted to None
for columns specified as Option[T]
. Conversely, if the field has a value, it will be returned as Some(value)
.
The last line of the class body is the most interesting part: it specifies how to transform the raw data read from the database into a Transaction
object and how to convert a Transaction
object to raw fields ready for insertion:
def * = (id.?, candidate, contributor, contributorState, contributorOccupation, amount, date) <> ( Transaction.tupled, Transaction.unapply)
The first part is just a tuple of fields to be read from the database: (id.?, candidate, contributor, contributorState, contributorOccupation, amount, date)
, with a small amount of metadata. The second part is a pair of functions that describe how to transform this tuple into a Transaction
object and back. In this case, as Transaction
is a case class, we can take advantage of the Transaction.tupled
and Transaction.unapply
methods automatically provided for case classes.
Notice how we followed the id
entry with .?
. In our Transaction
class, the donation id
has the Option[Int]
type, but the column in the database has the INT
type with the additional O.AutoInc
option. The .?
suffix tells Slick to use the default value provided by the database (in this case, the database's auto-increment) if id
is None
.
Finally, we define the value:
val transactions = TableQuery[Transactions]
This is the handle that we use to actually interact with the database. For instance, as we will see later, to get a list of donations to Barack Obama, we run the following query (don't worry about the details of the query for now):
Tables.transactions.filter {_.candidate === "Obama, Barack"}.list
Let's summarize the parts of our Transactions
mapper class:
Transactions
class must extend the Table
abstract class parametrized by the type that we want to return: Table[Transaction]
.column
, for example, def contributorState = column[String]("contributor_state", O.DBType("VARCHAR(2)"))
. The [String]
type parameter defines the Scala type that this column gets read as. The first argument is the SQL column name. Consult the Slick documentation for a full list of additional arguments (http://slick.typesafe.com/doc/2.1.0/schemas.html).def * = (id.?, candidate, ...) <> (Transaction.tupled, Transaction.unapply)
.So far, you have learned how to define Table
classes that encode the transformation from rows in a SQL table to Scala case classes. To move beyond table definitions and start interacting with a database server, we must connect to a database. As in the previous chapter, we will assume that there is a MySQL server running on localhost on port 3306
.
We will use the console to demonstrate the functionality in this chapter, but you can find an equivalent sample program in SlickDemo.scala
. Let's open a Scala console and connect to the database running on port 3306
:
scala> import slick.driver.MySQLDriver.simple._ import slick.driver.MySQLDriver.simple._ scala> val db = Database.forURL( "jdbc:mysql://127.0.0.1:3306/test", driver="com.mysql.jdbc.Driver" ) db: slick.driver.MySQLDriver.backend.DatabaseDef = slick.jdbc.JdbcBackend$DatabaseDef@3632d1dd
If you have read the previous chapter, you will recognize the first argument as a JDBC-style URL. The URL starts by defining a protocol, in this case, jdbc:mysql
, followed by the IP address and port of the database server, followed by the database name (test
, here).
The second argument to forURL
is the class name of the JDBC driver. This driver is imported at runtime using reflection. Note that the driver specified here must match the Slick driver imported statically.
Having defined the database, we can now use it to create a connection:
scala> db.withSession { implicit session => // do something useful with the database println(session) } scala.slick.jdbc.JdbcBackend$BaseSession@af5a276
Slick functions that require access to the database take a Session
argument implicitly: if a Session
instance marked as implicit is available in scope, they will use it. Thus, preceding session
with the implicit
keyword saves us having to pass session
explicitly every time we run an operation on the database.
If you have read the previous chapter, you will recognize that Slick deals with the need to close connections with the loan pattern: a database connection is created in the form of a session
object and passed temporarily to the client. When the client code returns, the session is closed, ensuring that all opened connections are closed. The client code is therefore spared the responsibility of closing the connection.
The loan pattern is very useful in production code, but it can be somewhat cumbersome in the shell. Slick lets us create a session explicitly as follows:
scala> implicit val session = db.createSession session: slick.driver.MySQLDriver.backend.Session = scala.slick.jdbc.JdbcBackend$BaseSession@2b775b49 scala> session.close
Let's use our new connection to create the transaction table in the database. We can access methods to create and drop tables using the ddl
attribute on our TableQuery[Transactions]
instance:
scala> db.withSession { implicit session => Tables.transactions.ddl.create }
If you jump into a mysql
shell, you will see that a transactions
table has been created:
mysql> describe transactions ; +------------------------+--------------+------+-----+ | Field | Type | Null | Key | +------------------------+--------------+------+-----+ | id | int(11) | NO | PRI | | candidate | varchar(254) | NO | | | contributor | varchar(254) | NO | | | contributor_state | varchar(2) | NO | | | contributor_occupation | varchar(254) | YES | | | amount | bigint(20) | NO | | | date | date | NO | | +------------------------+--------------+------+-----+ 7 rows in set (0.01 sec)
The ddl
attribute also includes a drop
method to drop the table. Incidentally, ddl
stands for "data-definition language" and is commonly used to refer to the parts of SQL relevant to schema and constraint definitions.
Slick
TableQuery
instances let us interact with SQL tables with an interface similar to Scala collections.
Let's create a transaction first. We will pretend that a donation occurred on the 22nd of June, 2010. Unfortunately, the code to create dates in Scala and pass these to JDBC is particularly clunky. We first create a java.util.Date
instance, which we must then convert to a java.sql.Date
to use in our newly created transaction:
scala> import java.text.SimpleDateFormat import java.text.SimpleDateFormat scala> val date = new SimpleDateFormat("dd-MM-yyyy").parse("22-06-2010") date: java.util.Date = Tue Jun 22 00:00:00 BST 2010 scala> val sqlDate = new java.sql.Date(date.getTime()) sqlDate: java.sql.Date = 2010-06-22 scala> val transaction = Transaction( None, "Obama, Barack", "Doe, John", "TX", None, 200, sqlDate ) transaction: Transaction = Transaction(None,Obama, Barack,Doe, John,TX,None,200,2010-06-22)
Much of the interface provided by the TableQuery
instance mirrors that of a mutable list. To insert a single row in the transaction table, we can use the +=
operator:
scala> db.withSession { implicit session => Tables.transactions += transaction } Int = 1
Under the hood, this will create a JDBC prepared statement and run this statement's executeUpdate
method.
If you are committing many rows at a time, you should use Slick's bulk insert operator: ++=
. This takes a List[Transaction]
as input and inserts all the transactions in a single batch by taking advantage of JDBC's addBatch
and executeBatch
functionality.
Let's insert all the FEC transactions so that we have some data to play with when running queries in the next section. We can load an iterator of transactions for Ohio by calling the following:
scala> val transactions = FECData.loadOhio.transactions transactions: Iterator[Transaction] = non-empty iterator
We can also load the transactions for the whole of United States:
scala> val transactions = FECData.loadAll.transactions transactions: Iterator[Transaction] = non-empty iterator
To avoid materializing all the transactions in a single fell swoop—thus potentially exceeding our computer's available memory—we will take batches of transactions from the iterator and insert them:
scala> val batchSize = 100000 batchSize: Int = 100000 scala> val transactionBatches = transactions.grouped(batchSize) transactionBatches: transactions.GroupedIterator[Transaction] = non-empty iterator
An iterator's grouped
method splits the iterator into batches. It is useful to split a long collection or iterator into manageable batches that can be processed one after the other. This is important when integrating or processing large datasets.
All that we have to do now is iterate over our batches, inserting them into the database as we go:
scala> db.withSession { implicit session => transactionBatches.foreach { batch => Tables.transactions ++= batch.toList } }
While this works, it is sometimes useful to see progress reports when doing long-running integration processes. As we have split the integration into batches, we know (to the nearest batch) how far into the integration we are. Let's print the progress information at the beginning of every batch:
scala> db.withSession { implicit session => transactionBatches.zipWithIndex.foreach { case (batch, batchNumber) => println(s"Processing row ${batchNumber*batchSize}") Tables.transactions ++= batch.toList } } Processing row 0 Processing row 100000 ...
We use the .zipWithIndex
method to transform our iterator over batches into an iterator of (batch, current
index) pairs. In a full-scale application, the progress information would probably be written to a log file rather than to the screen.
Slick's well-designed interface makes inserting data very intuitive, integrating well with native Scala types.
In the previous section, we used Slick to insert donation data into our database. Let's explore this data now.
When defining the Transactions
class, we defined a TableQuery
object, transactions
, that acts as the handle for accessing the transaction table. It exposes an interface similar to Scala iterators. For instance, to see the first five elements in our database, we can call take(5)
:
scala> db.withSession { implicit session => Tables.transactions.take(5).list } List[Tables.Transactions#TableElementType] = List(Transaction(Some(1),Obama, Barack,Doe, ...
Internally, Slick implements the .take
method using a SQL LIMIT
. We can, in fact, get the SQL statement using the .selectStatement
method on the query:
scala> db.withSession { implicit session => println(Tables.transactions.take(5).selectStatement) } select x2.`id`, x2.`candidate`, x2.`contributor`, x2.`contributor_state`, x2.`contributor_occupation`, x2.`amount`, x2.`date` from (select x3.`date` as `date`, x3.`contributor` as `contributor`, x3.`amount` as `amount`, x3.`id` as `id`, x3.`candidate` as `candidate`, x3.`contributor_state` as `contributor_state`, x3.`contributor_occupation` as `contributor_occupation` from `transactions` x3 limit 5) x2
Our Slick query is made up of the following two parts:
.take(n)
: This part is called the invoker. Invokers build up the SQL statement but do not actually fire it to the database. You can chain many invokers together to build complex SQL statements..list
: This part sends the statement prepared by the invoker to the database and converts the result to Scala object. This takes a session
argument, possibly implicitly.