One of the best features of Hadoop is the Hadoop Distributed File System. This creates a network of computers that automatically synchronize their data, making our input data available to all the computers. Not having to worry about how the data gets distributed makes our lives much easier.
For this recipe, we'll put a file into HDFS and read it back out using Cascalog, line by line.
The previous recipes in this chapter used the version of Hadoop that Leiningen downloaded as one of Cascalog's dependencies. For this recipe, however, we'll need to have Hadoop installed and running separately. Go to http://hadoop.apache.org/ and download and install it. You might also be able to use your operating system's package manager. Alternatively, Cloudera has a VM with a 1-node Hadoop cluster that you can download and use (https://ccp.cloudera.com/display/SUPPORT/CDH+Downloads#CDHDownloads-CDH4PackagesandDownloads).
You'll still need to configure everything. Take a look at the Hadoop website for the Getting Started documentation of your version. Get a single node setup working.
Once it's installed and configured, go ahead and start the servers. There's a script in the bin
directory to do this:
$ ./bin/start-dfsdfs.sh
We still need to have everything working with Clojure, however. For this, we just use the same dependencies and references as we did in the Initializing Cascalog and Hadoop for distributed processing recipe. However, this time, don't worry about the REPL. We'll take care of that separately.
For data, we'll use a dataset of the U.S. domestic flights from 1990–2009. You can download this dataset yourself from Infochimps at http://www.ericrochester.com/clj-data-analysis/data/flights_with_colnames.csv.gz. I've unzipped it into the data directory.
For this recipe, we'll insert a file into the distributed file system, run the Clojure REPL inside Hadoop, and read the data back out.
data/16285/flights_with_colnames.csv
file. We can insert it into HDFS with this command:$ hadoop fs -put datadata/16285/flights_with_colnames.csv flights_with_colnames.csv
hadoop
command on a JAR file created from our project. Create an empty namespace to give the JAR file a little content. For example, I created a file named src/distrib_data/cascalog_setup.clj
with this content:(ns distrib-data.cascalog-setup (:require [cascalog.logic.ops :as c] [clojure.string :as string]) (:use cascalog.api))
$ lein uberjar Created /Users/err8n/p/cljbook/distrib-data/target/distrib-data-0.1.0.jar Created /Users/err8n/p/cljbook/distrib-data/target/distrib-data-0.1.0-standalone.jarjar
If you're using Windows, Mac, or another OS with a case-insensitive filesystem, you'll need to remove the LICENSE
file, because it will clash with a license directory. To do this, you can use this command:
$ zip -d target/distrib-data-0.1.0-standalone.jar META-INF/LICENSE deleting: META-INF/LICENSE
hadoop
command on the JAR file we just created:$ hadoop jar target/distrib-data-0.1.0-standalone.jar Clojure 1.66.0 user=>
user=> (require '[clojure.string :as string] '[cascalog.logic.ops :as c]) nil user=> (use 'cascalog.api) nil
companions.txt
file:user=> (?<- (stdout) [?line] ((hfs-textline "hdfs:///user/err8n/flights_with_colnames.csv") :> ?line)) … RESULTS origin_airport,destin_airport,passengers,flights,month MHK,AMW,21,1,200810 EUG,RDM,41,22,199011 EUG,RDM,88,19,199012 EUG,RDM,11,4,199010 …
There are several moving parts to this recipe. The primary one is Hadoop. It has its own configuration, environment variables, and libraries that the process executing the Cascalog queries must access. The easiest way to manage this is to run everything through the hadoop
command, so that's what we did.
The hadoop
command has an fs
task, which provides access to a whole range of operations to work with HDFS. In this case, we used its –put
option to move a data file into HDFS.
Once there, we can refer to the file using the hdfs: URI
scheme. Hadoop knows how to find these URIs.
In the Cascalog query, hfs-textline
reads the file line by line. We use the :>
operator to bind each line to the ?line
name, which is returned as the output of the query.