In this chapter, we’ll be focusing on what is probably the most often overlooked way to
improve the value of MapReduce: customizing input and output. You will not
always want to load or store data the way Hadoop MapReduce does out of the
box. Sometimes you can skip the time-consuming step of storing data in HDFS
and just accept data from some original source, or feed it directly to some
process that uses it after MapReduce is finished. Sometimes the basic Hadoop
paradigm of file blocks and input splits doesn’t do what you need, so this
is where a custom InputFormat
or OutputFormat
comes into play.
Three patterns in this chapter deal with input: generating data, external source input, and partition pruning. All three input patterns share an interesting property: the map phase is completely unaware that tricky things are going on before it gets its input pairs. Customizing an input format is a great way to abstract away details of the method you use to load data.
On the flip side, Hadoop will not always store data in the way you need it to. There is one pattern in this chapter, external source output, that writes data to a system outside of Hadoop and HDFS. Just like the custom input formats, custom output formats keep the map or reduce phase from realizing that tricky things are going on as the data is going out.
Hadoop allows you to modify the way data is loaded on disk in two major
ways: configuring how contiguous chunks of input are generated from blocks
in HDFS (or maybe more exotic sources), and configuring how records appear
in the map phase. The two classes you’ll be playing with to do this are
RecordReader
and InputFormat
. These work with the Hadoop MapReduce framework in a very similar
way to how mappers and reducers are plugged in.
Hadoop also allows you to modify the way data is stored in an
analogous way: with an OutputFormat
and a RecordWriter
.
Hadoop relies on the input format of the job to do three things:
Validate the input configuration for the job (i.e., checking that the data is there).
Split the input blocks and files into logical chunks of type
InputSplit
, each of which is
assigned to a map task for processing.
Create the RecordReader
implementation to be used to create key/value pairs from the
raw InputSplit
.
These pairs are sent one by one to their mapper.
The most common input formats are subclasses of FileInputFormat
,
with the Hadoop default being TextInputFormat
. The input format first validates the input into the job by
ensuring that all of the input paths exist. Then it logically splits
each input file based on the total size of the file in bytes, using the
block size as an upper bound. For example, a 160 megabyte file in HDFS
will generate three input splits along the byte ranges 0MB-64MB
, 64MB-128MB
and 128MB-160MB
. Each map task will be assigned
exactly one of these input splits, and then the RecordReader
implementation is responsible for
generate key/value pairs out of all the bytes it has been
assigned.
Typically, the RecordReader
has
the additional responsibility of fixing boundaries, because the input
split boundary is arbitrary and probably will not fall on a record
boundary. For example, the TextInputFormat
reads text files using a
LineRecordReader
to create key/value pairs for each map task for each line of
text (i.e., separated by a newline character). The key is the number of
bytes read in the file so far and the value is a string of characters up
to a newline character. Because it is very unlikely that the chunk of
bytes for each input split will be lined up with a newline character,
the LineRecordReader
will read past
its given “end” in order to make sure a complete line is read. This bit
of data comes from a different data block and is therefore not stored on
the same node, so it is streamed from a DataNode hosting the block. This
streaming is all handled by an instance of the FSDataInputStream
class, and we (thankfully) don’t have to deal with any knowledge of
where these blocks are.
Don’t be afraid to go past split boundaries in your own formats, just be sure to test thoroughly so you aren’t duplicating or missing any data!
Custom input formats are not limited to file-based input. As
long as you can express the input as InputSplit
objects and key/value pairs,
custom or otherwise, you can read anything into the map phase of a
MapReduce job in parallel. Just be sure to keep in mind what an input
split represents and try to take advantage of data locality.
The InputFormat
abstract class
contains two abstract methods:
The implementation of getSplits
typically uses the given JobContext
object to retrieve the
configured input and return a List
of InputSplit
objects. The input splits
have a method to return an array of machines associated with the
locations of the data in the cluster, which gives clues to the
framework as to which TaskTracker should process the map task.
This method is also a good place to verify the configuration and
throw any necessary exceptions, because the method is used on the
front-end (i.e. before the job is submitted to the
JobTracker).
This method is used on the back-end to generate an implementation of
RecordReader
, which we’ll
discuss in more detail shortly. Typically, a new instance is
created and immediately returned, because the record reader has an
initialize
method that is
called by the framework.
The RecordReader
abstract class
creates key/value pairs from a given InputSplit
. While the InputSplit
represents the byte-oriented view
of the split, the RecordReader
makes
sense out of it for processing by a mapper. This is why Hadoop and
MapReduce is considered schema on read. It is in
the RecordReader
that the schema is
defined, based solely on the record reader implementation, which changes
based on what the expected input is for the job. Bytes are read from the
input source and turned into a WritableComparable
key and a Writable
value. Custom data
types are very common when creating custom input formats, as they are a
nice object-oriented way to present information to a mapper.
A RecordReader
uses the data
within the boundaries created by the input split to generate key/value
pairs. In the context of file-based input, the “start” is the byte
position in the file where the RecordReader
should start generating key/value
pairs. The “end” is where it should stop reading records. These are not
hard boundaries as far as the API is concerned—there is nothing stopping
a developer from reading the entire file for each map task. While
reading the entire file is not advised, reading outside of the
boundaries it often necessary to ensure that a complete record is
generated.
Consider the case of XML. While using a TextInputFormat
to grab each line works, XML elements are typically not on the same line
and will be split by a typical MapReduce input. By reading past the
“end” input split boundary, you can complete an entire record. After
finding the bottom of the record, you just need to ensure that each
record reader starts at the beginning of an XML element. After seeking
to the start of the input split, continue reading until the beginning of
the configured XML tag is read. This will allow the MapReduce framework
to cover the entire contents of an XML file, while not duplicating any
XML records. Any XML that is skipped by seeking forward to the start of
an XML element will be read by the preceding map task.
The RecordReader
abstract class
has a number of methods that must be overridden.
This method takes as arguments the map task’s assigned InputSplit
and TaskAttemptContext
, and prepares the
record reader. For file-based input formats, this is a good place
to seek to the byte position in the file to begin reading.
These methods are used by the framework to give generated key/value
pairs to an implementation of Mapper
. Be sure to reuse the objects
returned by these methods if at all possible!
Like the corresponding method of the InputFormat
class, this reads a single
key/value pair and returns true
until the data is consumed.
Like the corresponding method of the InputFormat
class, this is an optional
method used by the framework for metrics gathering.
This method is used by the framework for cleanup after there are no more key/value pairs to process.
Similarly to an input format, Hadoop relies on the output format of the job for two main tasks:
Validate the output configuration for the job.
Create the RecordWriter
implementation that will write the output of the job.
On the flip side of the FileInputFormat
,
there is a FileOutputFormat
to
work with file-based output. Because most output from a
MapReduce job is written to HDFS, the many file-based output formats
that come with the API will solve most of yours needs. The default used
by Hadoop is the TextOutputFormat
,
which stores key/value pairs to HDFS at a configured output directory
with a tab delimiter. Each reduce task writes an individual part file to
the configured output directory. The TextOutputFormat
also validates that the output directory does not exist prior
to starting the MapReduce job.
The TextOutputFormat
uses a
LineRecordWriter
to write key/value pairs for each map task or reduce task,
depending on whether there is a reduce phase or not. This class uses the
toString
method to serialize each
each key/value pair to a part file in HDFS, delimited by a tab. This tab
delimiter is the default and can be changed via job
configuration.
Again, much like an InputFormat
, you are not restricted to storing
data to HDFS. As long as you can write key/value pairs to some other
source with Java (e.g., a JDBC database connection), you can use
MapReduce to do a parallel bulk write. Just make sure whatever you are
writing to can handle the large number of connections from the many
tasks.
The OutputFormat
abstract class
contains three abstract methods for implementation:
This method is used to validate the output specification for the job, such as making sure the directory does not already exist prior to it being submitted. Otherwise, the output would be overwritten.
This method returns a RecordWriter
implementation that serializes key/value pairs to an output,
typically a FileSystem
object.
The output committer of a job sets up each task during initialization, commits the task upon
successful completion, and cleans up each task when it finishes —
successful or otherwise. For file-based output, a FileOutputCommitter
can be used to
handle all the heavy lifting. It will create temporary output
directories for each map task and move the successful output to
the configured output directory when necessary.
The RecordWriter
abstract
class writes key/value pairs to a file system, or another
output. Unlike its RecordReader
counterpart, it does not contain an initialize phase. However, the
constructor can always be used to set up the record writer for whatever
is needed. Any parameters can be passed in during construction, because
the record writer instance is created via OutputFormat.getRecordWriter
.
The RecordWriter
abstract class
is a much simpler interface, containing only two methods:
This method is called by the framework for each key/value pair that needs to be written. The implementation of this method depends very much on your use case. The examples we’ll show will write each key/value pair to an external in-memory key/value store rather than a file system.
This method is used by the framework after there are no more key/value pairs to write out. This can be used to release any file handles, shut down any connections to other services, or any other cleanup tasks needed.
The generating data pattern is interesting because instead of loading data that comes from somewhere outside, it generates that data on the fly and in parallel.
You want to generate a lot of data from scratch.
This pattern is different from all of the others in the book in that it doesn’t load data. With this pattern, you generate the data and store it back in the distributed file system.
Generating data isn’t common. Typically you’ll generate a bunch of the data at once then use it over and over again. However, when you do need to generate data, MapReduce is an excellent system for doing it.
The most common use case for this pattern is generating random data. Building some sort of representative data set could be useful for large scale testing for when the real data set is still too small. It can also be useful for building “toy domains” for researching a proof of concept for an analytic at scale.
Generating random data is also used often used as part of a benchmark, such as the commonly used TeraGen/TeraSort and DFSIO.
Unfortunately, the implementation of this pattern isn’t straightforward in Hadoop because one of the foundational pieces of the framework is assigning one map task to an input split and assigning one map function call to one record. In this case, there are no input splits and there are no records, so we have to fool the framework to think there are.
To implement this pattern in Hadoop, implement a
custom InputFormat
and let
a RecordReader
generate the random data. The map function is completely oblivious to
the origin of the data, so it can be built on the fly instead of being
loaded out of some file in HDFS. For the most part, using the identity
mapper is fine here, but you might want to do some post-processing in
the map task, or even analyze it right away. See Figure 7-1.
This pattern is map-only.
The InputFormat
creates
the fake splits from nothing. The number of splits it creates
should be configurable.
The RecordReader
takes
its fake split and generates random records from it.
In some cases, you can assign some information in the input split to tell the record reader what to generate. For example, to generate random date/time data, have each input split account for an hour.
In most cases, the IdentityMapper
is used to just write the data out as it comes in.
The lazy way of doing implementing this pattern is to seed the
job with many fake input files containing a single bogus record.
Then, you can just use a generic InputFormat
and RecordReader
and generate the data in the
map function. The empty input files are then deleted on application
exit.
Each mapper outputs a file containing random data.
There are a number of ways to create random data with SQL and Pig, but nothing that is eloquent or terse.
The major consideration here in terms of performance is how many worker map tasks are needed to generate the data. In general, the more map tasks you have, the faster you can generate data since you are better utilizing the parallelism of the cluster. However, it makes little sense to fire up more map tasks than you have map slots since they are all doing the same thing.
To generate random StackOverflow data, we’ll take a list of 1,000 words and just make random blurbs. We also have to generate a random score, a random row ID (we can ignore that it likely won’t be unique), a random user ID, and a random creation date.
The following descriptions of each code section explain the solution to the problem.
The driver parses the four command line arguments to configure this job. It sets our custom input format and calls the static methods to configure it further. All the output is written to the given output directory. The identity mapper is used for this job, and the reduce phase is disabled by setting the number of reduce tasks to zero.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
new
Configuration
();
int
numMapTasks
=
Integer
.
parseInt
(
args
[
0
]);
int
numRecordsPerTask
=
Integer
.
parseInt
(
args
[
1
]);
Path
wordList
=
new
Path
(
args
[
2
]);
Path
outputDir
=
new
Path
(
args
[
3
]);
Job
job
=
new
Job
(
conf
,
"RandomDataGenerationDriver"
);
job
.
setJarByClass
(
RandomDataGenerationDriver
.
class
);
job
.
setNumReduceTasks
(
0
);
job
.
setInputFormatClass
(
RandomStackOverflowInputFormat
.
class
);
RandomStackOverflowInputFormat
.
setNumMapTasks
(
job
,
numMapTasks
);
RandomStackOverflowInputFormat
.
setNumRecordPerTask
(
job
,
numRecordsPerTask
);
RandomStackOverflowInputFormat
.
setRandomWordList
(
job
,
wordList
);
TextOutputFormat
.
setOutputPath
(
job
,
outputDir
);
job
.
setOutputKeyClass
(
Text
.
class
);
job
.
setOutputValueClass
(
NullWritable
.
class
);
System
.
exit
(
job
.
waitForCompletion
(
true
)
?
0
:
2
);
}
The FakeInputSplit
class
simply extends InputSplit
and
implements Writable
. There is no
implementation for any of the overridden methods, or for methods
requiring return values return basic values. This input split is
used to trick the framework into assigning a task to generate the
random data.
public
static
class
FakeInputSplit
extends
InputSplit
implements
Writable
{
public
void
readFields
(
DataInput
arg0
)
throws
IOException
{
}
public
void
write
(
DataOutput
arg0
)
throws
IOException
{
}
public
long
getLength
()
throws
IOException
,
InterruptedException
{
return
0
;
}
public
String
[]
getLocations
()
throws
IOException
,
InterruptedException
{
return
new
String
[
0
];
}
}
The input format has two main purposes: returning the list of
input splits for the framework to generate map tasks from, and then
creating the RandomStackOverflowRecordReader
for the
map task. We override the getSplits
method to return a configured
number of FakeInputSplit
splits.
This number is pulled from the
configuration. When the framework calls createRecordReader
, a RandomStackOverflowRecordReader
is instantiated, initialized, and returned.
public
static
class
RandomStackOverflowInputFormat
extends
InputFormat
<
Text
,
NullWritable
>
{
public
static
final
String
NUM_MAP_TASKS
=
"random.generator.map.tasks"
;
public
static
final
String
NUM_RECORDS_PER_TASK
=
"random.generator.num.records.per.map.task"
;
public
static
final
String
RANDOM_WORD_LIST
=
"random.generator.random.word.file"
;
public
List
<
InputSplit
>
getSplits
(
JobContext
job
)
throws
IOException
{
// Get the number of map tasks configured for
int
numSplits
=
job
.
getConfiguration
().
getInt
(
NUM_MAP_TASKS
,
-
1
);
// Create a number of input splits equivalent to the number of tasks
ArrayList
<
InputSplit
>
splits
=
new
ArrayList
<
InputSplit
>();
for
(
int
i
=
0
;
i
<
numSplits
;
++
i
)
{
splits
.
add
(
new
FakeInputSplit
());
}
return
splits
;
}
public
RecordReader
<
Text
,
NullWritable
>
createRecordReader
(
InputSplit
split
,
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
// Create a new RandomStackOverflowRecordReader and initialize it
RandomStackOverflowRecordReader
rr
=
new
RandomStackOverflowRecordReader
();
rr
.
initialize
(
split
,
context
);
return
rr
;
}
public
static
void
setNumMapTasks
(
Job
job
,
int
i
)
{
job
.
getConfiguration
().
setInt
(
NUM_MAP_TASKS
,
i
);
}
public
static
void
setNumRecordPerTask
(
Job
job
,
int
i
)
{
job
.
getConfiguration
().
setInt
(
NUM_RECORDS_PER_TASK
,
i
);
}
public
static
void
setRandomWordList
(
Job
job
,
Path
file
)
{
DistributedCache
.
addCacheFile
(
file
.
toUri
(),
job
.
getConfiguration
());
}
}
This record reader is where the data is actually generated. It is given
during our FakeInputSplit
during
initialization, but simply ignores it. The number of records to
create is pulled from the job configuration, and the list of random
words is read from the DistributedCache
. For each call to
nextKeyValue
, a random record is
created using a simple random number generator. The body of the
comment is generated by a helper function that randomly selects
words from the list, between one and thirty words (also random). The
counter is incremented to keep track of how many records have been
generated. Once all the records are generated, the record reader
returns false
, signaling the
framework that there is no more input for the mapper.
public
static
class
RandomStackOverflowRecordReader
extends
RecordReader
<
Text
,
NullWritable
>
{
private
int
numRecordsToCreate
=
0
;
private
int
createdRecords
=
0
;
private
Text
key
=
new
Text
();
private
NullWritable
value
=
NullWritable
.
get
();
private
Random
rndm
=
new
Random
();
private
ArrayList
<
String
>
randomWords
=
new
ArrayList
<
String
>();
// This object will format the creation date string into a Date
// object
private
SimpleDateFormat
frmt
=
new
SimpleDateFormat
(
"yyyy-MM-dd'T'HH:mm:ss.SSS"
);
public
void
initialize
(
InputSplit
split
,
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
// Get the number of records to create from the configuration
this
.
numRecordsToCreate
=
context
.
getConfiguration
().
getInt
(
NUM_RECORDS_PER_TASK
,
-
1
);
// Get the list of random words from the DistributedCache
URI
[]
files
=
DistributedCache
.
getCacheFiles
(
context
.
getConfiguration
());
// Read the list of random words into a list
BufferedReader
rdr
=
new
BufferedReader
(
new
FileReader
(
files
[
0
].
toString
()));
String
line
;
while
((
line
=
rdr
.
readLine
())
!=
null
)
{
randomWords
.
add
(
line
);
}
rdr
.
close
();
}
public
boolean
nextKeyValue
()
throws
IOException
,
InterruptedException
{
// If we still have records to create
if
(
createdRecords
<
numRecordsToCreate
)
{
// Generate random data
int
score
=
Math
.
abs
(
rndm
.
nextInt
())
%
15000
;
int
rowId
=
Math
.
abs
(
rndm
.
nextInt
())
%
1000000000
;
int
postId
=
Math
.
abs
(
rndm
.
nextInt
())
%
100000000
;
int
userId
=
Math
.
abs
(
rndm
.
nextInt
())
%
1000000
;
String
creationDate
=
frmt
.
format
(
Math
.
abs
(
rndm
.
nextLong
()));
// Create a string of text from the random words
String
text
=
getRandomText
();
String
randomRecord
=
"<row Id=""
+
rowId
+
"" PostId=""
+
postId
+
"" Score=""
+
score
+
"" Text=""
+
text
+
"" CreationDate=""
+
creationDate
+
"" UserId"="
+
userId
+
"" />"
;
key
.
set
(
randomRecord
);
++
createdRecords
;
return
true
;
}
else
{
// We are done creating records
return
false
;
}
}
private
String
getRandomText
()
{
StringBuilder
bldr
=
new
StringBuilder
();
int
numWords
=
Math
.
abs
(
rndm
.
nextInt
())
%
30
+
1
;
for
(
int
i
=
0
;
i
<
numWords
;
++
i
)
{
bldr
.
append
(
randomWords
.
get
(
Math
.
abs
(
rndm
.
nextInt
())
%
randomWords
.
size
())
+
" "
);
}
return
bldr
.
toString
();
}
public
Text
getCurrentKey
()
throws
IOException
,
InterruptedException
{
return
key
;
}
public
NullWritable
getCurrentValue
()
throws
IOException
,
InterruptedException
{
return
value
;
}
public
float
getProgress
()
throws
IOException
,
InterruptedException
{
return
(
float
)
createdRecords
/
(
float
)
numRecordsToCreate
;
}
public
void
close
()
throws
IOException
{
// nothing to do here...
}
}
As stated earlier in this chapter, the external source output pattern writes data to a system outside of Hadoop and HDFS.
You want to write MapReduce output to a nonnative location.
With this pattern, we are able to output data from the MapReduce framework directly to an external source. This is extremely useful for direct loading into a system instead of staging the data to be delivered to the external source. The pattern skips storing data in a file system entirely and sends output key/value pairs directly where they belong. MapReduce is rarely ever hosting an applications as-is, so using MapReduce to bulk load into an external source in parallel has its uses.
In a MapReduce approach, the data is written out in parallel. As with using an external source for input, you need to be sure the destination system can handle the parallel ingest it is bound to endure with all the open connections.
Figure 7-2 shows the external source output structure, explained below.
The OutputFormat
verifies
the output specification of the job configuration
prior to job submission. This is a great place to ensure that the
external source is fully functional, as it won’t be good to
process all the data only to find out the external source was
unable when it was time to commit the data. This method also is
responsible for creating and initializing a RecordWriter
implementation.
The RecordWriter
writes
all key/value pairs to the external source. Much
like a RecordReader
, the
implementation varies depending on the external data source being
written to. During construction of the object, establish any
needed connections using the external source’s API. These
connections are then used to write out all the data from each map
or reduce task.
The output data has been sent to the external source and that external source has loaded it successfully.
Note that task failures are bound to happen, and when they do,
any key/value pairs written in the write
method can’t be reverted. In a
typical MapReduce job, temporary output is written to the file
system. In the event of a failure, this output is simply discarded.
When writing to an external source directly, it will receive the
data in a stream. If a task fails, the external source won’t
automatically know about it and discard all the data it received
from a task. If this is unacceptable, consider using a custom
OutputCommitter
to write
temporary output to the file system. This temporary output can then
be read, delivered to the external source, and deleted upon success,
or deleted from the file system outright in the event of a
failure.
From a MapReduce perspective, there isn’t much to worry about since the map and reduce are generic. However, you do have to be very careful that the receiver of the data can handle the parallel connections. Having a thousand tasks writing to a single SQL database is not going to work well. To avoid this, you may have to have each reducer handle a bit more data than you typically would to reduce the number of parallel writes to the data sink. This is not necessarily a problem if the destination of the data is parallel in nature and supports parallel ingestation. For example, for writing to a sharded SQL database, you could have each reducer write to a specific database instance.
This example is a basic means for writing to a number of Redis instances in parallel from MapReduce. Redis is an open-source, in-memory, key-value store. It is often referred to as a data structure server, since keys can contain strings, hashes, lists, sets, and sorted sets. Redis is written in ANSI C and works in most POSIX systems, such as Linux, without any external dependencies.
In order to work with the Hadoop framework, Jedis is used to communicate with Redis. Jedis is an open-source “blazingly small and sane Redis java client.” A list of clients written for other languages is available on their website.
Unlike other examples in this book, there is no actual analysis
in this example (along with the rest of the examples in this chapter).
It focuses on how to take a data set stored in HDFS and store it in an
external data source using a custom FileOutputFormat
. In this example, the Stack
Overflow users data set is written to a configurable number of Redis
instances, specifically the user-to-reputation mappings. These
mappings are randomly distributed evenly among a single Redis
hash.
A Redis hash is a map between string fields and string values,
similar to a Java HashMap
. Each
hash is given a key to identify the hash. Every hash can store more
than four billion field-value pairs.
The sections below with its corresponding code explain the following problem.
Problem: Given a set of user information, randomly distributed user-to-reputation mappings to a configurable number of Redis instances in parallel.
The RedisHashOutputFormat
is responsible for establishing and verifying the job
configuration prior to being submitted to the JobTracker. Once the
job has been submitted, it also creates the RecordWriter
to serialize all the output
key/value pairs. Typically, this is a file in HDFS. However, we are
not bound to using HDFS, as we will see in the RecordWriter
later on.
The output format contains configuration variables that must
be set by the driver to ensure it has all the information required
to do its job. Here, we have a couple public static
methods to take some of the
guess work out of what a developer needs to set. This output format
takes in a list of Redis instance hosts as a CSV structure and a
Redis hash key to write all the output to. In the checkOutputSpecs
method, we ensure that
both of these parameters are set before we even both launching the
job, as it will surely fail without them. This is where you’ll want
to verify your configuration!
The getRecordWriter
method
is used on the back end to create an instance of a RecordWriter
for the map or reduce task.
Here, we get the configuration variables required by the RedisHashRecordWriter
and return a new
instance of it. This record writer is a nested class of the RedisHashOutputFormat
, which is not
required but is more of a convention. The details of this class are
in the following section.
The final method of this output format is getOutputCommitter
. The output committer
is used by the framework to manage any temporary output before
committing in case the task fails and needs to be reexecuted. For
this implementation, we don’t typically care whether the task fails
and needs to be re-executed. As long as the job finishes we are
okay. An output committer is required by the framework, but the
NullOutputFormat
contains an
output committer implementation that doesn’t do anything.
public
static
class
RedisHashOutputFormat
extends
OutputFormat
<
Text
,
Text
>
{
public
static
final
String
REDIS_HOSTS_CONF
=
"mapred.redishashoutputformat.hosts"
;
public
static
final
String
REDIS_HASH_KEY_CONF
=
"mapred.redishashinputformat.key"
;
public
static
void
setRedisHosts
(
Job
job
,
String
hosts
)
{
job
.
getConfiguration
().
set
(
REDIS_HOSTS_CONF
,
hosts
);
}
public
static
void
setRedisHashKey
(
Job
job
,
String
hashKey
)
{
job
.
getConfiguration
().
set
(
REDIS_HASH_KEY_CONF
,
hashKey
);
}
public
RecordWriter
<
Text
,
Text
>
getRecordWriter
(
TaskAttemptContext
job
)
throws
IOException
,
InterruptedException
{
return
new
RedisHashRecordWriter
(
job
.
getConfiguration
().
get
(
REDIS_HASH_KEY_CONF
),
job
.
getConfiguration
().
get
(
REDIS_HOSTS_CONF
));
}
public
void
checkOutputSpecs
(
JobContext
job
)
throws
IOException
{
String
hosts
=
job
.
getConfiguration
().
get
(
REDIS_HOSTS_CONF
);
if
(
hosts
==
null
||
hosts
.
isEmpty
())
{
throw
new
IOException
(
REDIS_HOSTS_CONF
+
" is not set in configuration."
);
}
String
hashKey
=
job
.
getConfiguration
().
get
(
REDIS_HASH_KEY_CONF
);
if
(
hashKey
==
null
||
hashKey
.
isEmpty
())
{
throw
new
IOException
(
REDIS_HASH_KEY_CONF
+
" is not set in configuration."
);
}
}
public
OutputCommitter
getOutputCommitter
(
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
return
(
new
NullOutputFormat
<
Text
,
Text
>()).
getOutputCommitter
(
context
);
}
public
static
class
RedisHashRecordWriter
extends
RecordWriter
<
Text
,
Text
>
{
// code in next section
}
}
The RedisHashRecordWriter
handles connecting to Redis via the Jedis client and writing
out the data. Each key/value pair is randomly written to a Redis
instance, providing an even distribution of all data across all
Redis instances. The constructor stores the hash key to write to and
creates a new Jedis instance.
The code then connects to the Jedis instance and maps it to an
integer. This map is used in the write
method to get the assigned Jedis
instance. The hash code is the key is taken modulo the number of
configured Redis instances. The key/value pair is then written to
the returned Jedis instance to the configured hash. Finally, all
Jedis instances are disconnected in the close
method.
public
static
class
RedisHashRecordWriter
extends
RecordWriter
<
Text
,
Text
>
{
private
HashMap
<
Integer
,
Jedis
>
jedisMap
=
new
HashMap
<
Integer
,
Jedis
>();
private
String
hashKey
=
null
;
public
RedisHashRecordWriter
(
String
hashKey
,
String
hosts
)
{
this
.
hashKey
=
hashKey
;
// Create a connection to Redis for each host
// Map an integer 0-(numRedisInstances - 1) to the instance
int
i
=
0
;
for
(
String
host
:
hosts
.
split
(
","
))
{
Jedis
jedis
=
new
Jedis
(
host
);
jedis
.
connect
();
jedisMap
.
put
(
i
,
jedis
);
++
i
;
}
}
public
void
write
(
Text
key
,
Text
value
)
throws
IOException
,
InterruptedException
{
// Get the Jedis instance that this key/value pair will be
// written to
Jedis
j
=
jedisMap
.
get
(
Math
.
abs
(
key
.
hashCode
())
%
jedisMap
.
size
());
// Write the key/value pair
j
.
hset
(
hashKey
,
key
.
toString
(),
value
.
toString
());
}
public
void
close
(
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
// For each jedis instance, disconnect it
for
(
Jedis
jedis
:
jedisMap
.
values
())
{
jedis
.
disconnect
();
}
}
}
The Mapper instance is very straightforward and looks like any other mapper. The user ID and reputation are retrieved from the record and then output. The output format does all the heavy lifting for us, allowing it to be reused multiple times to write whatever we want to a Redis hash.
public
static
class
RedisOutputMapper
extends
Mapper
<
Object
,
Text
,
Text
,
Text
>
{
private
Text
outkey
=
new
Text
();
private
Text
outvalue
=
new
Text
();
public
void
map
(
Object
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
Map
<
String
,
String
>
parsed
=
MRDPUtils
.
transformXmlToMap
(
value
.
toString
());
String
userId
=
parsed
.
get
(
"Id"
);
String
reputation
=
parsed
.
get
(
"Reputation"
);
// Set our output key and values
outkey
.
set
(
userId
);
outvalue
.
set
(
reputation
);
context
.
write
(
outkey
,
outvalue
);
}
}
The driver code parses the command lines and calls our
public static
methods to set up
writing data to Redis. The job is then submitted just like any
other.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
new
Configuration
();
Path
inputPath
=
new
Path
(
args
[
0
]);
String
hosts
=
args
[
1
];
String
hashName
=
args
[
2
];
Job
job
=
new
Job
(
conf
,
"Redis Output"
);
job
.
setJarByClass
(
RedisOutputDriver
.
class
);
job
.
setMapperClass
(
RedisOutputMapper
.
class
);
job
.
setNumReduceTasks
(
0
);
job
.
setInputFormatClass
(
TextInputFormat
.
class
);
TextInputFormat
.
setInputPaths
(
job
,
inputPath
);
job
.
setOutputFormatClass
(
RedisHashOutputFormat
.
class
);
RedisHashOutputFormat
.
setRedisHosts
(
job
,
hosts
);
RedisHashOutputFormat
.
setRedisHashKey
(
job
,
hashName
);
job
.
setOutputKeyClass
(
Text
.
class
);
job
.
setOutputValueClass
(
Text
.
class
);
int
code
=
job
.
waitForCompletion
(
true
)
?
0
:
2
;
System
.
exit
(
code
);
}
The external source input pattern doesn’t load data from HDFS, but instead from some system outside of Hadoop, such as an SQL database or a web service.
You want to load data in parallel from a source that is not part of your MapReduce framework.
The typical model for using MapReduce to analyze your data is to store it into your storage platform first (i.e., HDFS), then analyze it. With this pattern, you can hook up the MapReduce framework into an external source, such as a database or a web service, and pull the data directly into the mappers.
There are a few reasons why you might want to analyze the data directly from the source instead of staging it first. It may be faster to load the data from outside of Hadoop without having to stage it into files first. For example, dumping a database to the file system is likely to be an expensive operation, and taking it from the database directly ensures that the MapReduce job has the most up-to-date data available. A lot can happen on a busy cluster, and dumping a database prior to running an analytics can also fail, causing a stall in the entire pipeline.
In a MapReduce approach, the data is loaded in parallel rather than in a serial fashion. The caveat to this is that the source needs to have well-defined boundaries on which data is read in parallel in order to scale. For example, in the case of a sharded databases, each map task can be assigned a shard to load from the a table, thus allowing for very quick parallel loads of data without requiring a database scan.
Figure 7-3 shows the external source input structure.
The InputFormat
creates
all the InputSplit
objects,
which may be based on a custom object. An input
split is a chunk of logical input, and that largely depends on the
format in which it will be reading data. In this pattern, the
input is not from a file-based input but an external source. The
input could be from a series of SQL tables or a number of
distributed services spread through the cluster. As long as the
input can be read in parallel, this is a good fit for
MapReduce.
The InputSplit
contains
all the knowledge of where the sources are and how much of each
source is going to be read. The framework uses the location
information to help determine where to assign the map task. A
custom InputSplit
must also
implement the Writable
interface, because the framework uses the methods of this
interface to transmit the input split information to a
TaskTracker. The number of map tasks distributed among
TaskTrackers is equivalent to the number of input splits generated
by the input format. The InputSplit
is then used to initialize a RecordReader
for processing.
The RecordReader
uses the
job configuration provided and InputSplit
information to read key/value
pairs. The implementation of this class depends on the data source
being read. It sets up any connections required to read data from
the external source, such as using JDBC to load from a database or
creating a REST call to access a RESTful service.
Data is loaded from the external source into the MapReduce job and the map phase doesn’t know or care where that data came from.
The bottleneck for a MapReduce job implementing this pattern is going to be the source or the network. The source may not scale well with multiple connections (e.g., a single-threaded SQL database isn’t going to like 1,000 mappers all grabbing data at once). Another problem may be the network infrastructure. Given that the source is probably not in the MapReduce cluster’s network backplane, the connections may be reaching out on a single connection on a slower public network. This should not be a problem if the source is inside the cluster.
This example demonstrates how to read data we just wrote to Redis. Again, we take in a CSV list of Redis instance hosts in order to connect to and read all the data from the hash. Since we distributed the data across a number of Redis instances, this data can be read in parallel. All we need to do is create a map task for each Redis instance, connect to Redis, and then create key/value pairs out of all the data we retrieve. This example uses the identity mapper to simply output each key/value pair received from Redis.
The sections below with its corresponding code explain the following problem.
Problem: Given a list of Redis instances in CSV format, read all the data stored in a configured hash in parallel.
The RedisInputSplit
represents the data to be processed by an individual Mapper. In
this example, we store the Redis instance hostname as the location
of the input split, as well as the hash key. The input split
implements the Writable
interface, so that it is serializable by the framework, and includes
a default constructor in order for the framework to create a new
instance via reflection. We return the location via the getLocations
method, in the hopes that the
JobTracker will assign each map task to a TaskTracker that is
hosting the data.
public
static
class
RedisHashInputSplit
extends
InputSplit
implements
Writable
{
private
String
location
=
null
;
private
String
hashKey
=
null
;
public
RedisHashInputSplit
()
{
// Default constructor for reflection
}
public
RedisHashInputSplit
(
String
redisHost
,
String
hash
)
{
this
.
location
=
redisHost
;
this
.
hashKey
=
hash
;
}
public
String
getHashKey
()
{
return
this
.
hashKey
;
}
public
void
readFields
(
DataInput
in
)
throws
IOException
{
this
.
location
=
in
.
readUTF
();
this
.
hashKey
=
in
.
readUTF
();
}
public
void
write
(
DataOutput
out
)
throws
IOException
{
out
.
writeUTF
(
location
);
out
.
writeUTF
(
hashKey
);
}
public
long
getLength
()
throws
IOException
,
InterruptedException
{
return
0
;
}
public
String
[]
getLocations
()
throws
IOException
,
InterruptedException
{
return
new
String
[]
{
location
};
}
}
The RedisHashInputFormat
mirrors that of the RedisHashOutputFormat
in many ways. It
contains configuration variables to know which Redis instances to
connect to and which hash to read from. In the getSplits
method, the configuration is
verified and a number of RedisHashInputSplits
is created based on
the number of Redis hosts. This will create one map task for each
configured Redis instance. The Redis hostname and hash key are
stored in the input split in order to be retrieved later by the
RedisHashRecordReader
. The
createRecordReader
method is
called by the framework to get a new instance of a record reader.
The record reader’s initialize
method is called by the framework, so we can just create a new
instance and return it. Again by convention, this class contains two
nested classes for the record reader and input split
implementations.
public
static
class
RedisHashInputFormat
extends
InputFormat
<
Text
,
Text
>
{
public
static
final
String
REDIS_HOSTS_CONF
=
"mapred.redishashinputformat.hosts"
;
public
static
final
String
REDIS_HASH_KEY_CONF
=
"mapred.redishashinputformat.key"
;
private
static
final
Logger
LOG
=
Logger
.
getLogger
(
RedisHashInputFormat
.
class
);
public
static
void
setRedisHosts
(
Job
job
,
String
hosts
)
{
job
.
getConfiguration
().
set
(
REDIS_HOSTS_CONF
,
hosts
);
}
public
static
void
setRedisHashKey
(
Job
job
,
String
hashKey
)
{
job
.
getConfiguration
().
set
(
REDIS_HASH_KEY_CONF
,
hashKey
);
}
public
List
<
InputSplit
>
getSplits
(
JobContext
job
)
throws
IOException
{
String
hosts
=
job
.
getConfiguration
().
get
(
REDIS_HOSTS_CONF
);
if
(
hosts
==
null
||
hosts
.
isEmpty
())
{
throw
new
IOException
(
REDIS_HOSTS_CONF
+
" is not set in configuration."
);
}
String
hashKey
=
job
.
getConfiguration
().
get
(
REDIS_HASH_KEY_CONF
);
if
(
hashKey
==
null
||
hashKey
.
isEmpty
())
{
throw
new
IOException
(
REDIS_HASH_KEY_CONF
+
" is not set in configuration."
);
}
// Create an input split for each host
List
<
InputSplit
>
splits
=
new
ArrayList
<
InputSplit
>();
for
(
String
host
:
hosts
.
split
(
","
))
{
splits
.
add
(
new
RedisHashInputSplit
(
host
,
hashKey
));
}
LOG
.
info
(
"Input splits to process: "
+
splits
.
size
());
return
splits
;
}
public
RecordReader
<
Text
,
Text
>
createRecordReader
(
InputSplit
split
,
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
return
new
RedisHashRecordReader
();
}
public
static
class
RedisHashRecordReader
extends
RecordReader
<
Text
,
Text
>
{
// code in next section
}
public
static
class
RedisHashInputSplit
extends
InputSplit
implements
Writable
{
// code in next section
}
}
The RedisHashRecordReader
is where most of the work is done. The initialize
method is called by the
framework and provided with an input split we created in the input
format. Here, we get the Redis instance to connect to and the hash
key. We then connect to Redis and get the number of key/value pairs
we will be reading from Redis. The hash doesn’t have a means to
iterate or stream the data one at a time or in bulk, so we simply
pull everything over and disconnect from Redis. We store an iterator
over the entries and log some helpful statements along the
way.
In nextKeyValue
, we iterate
through the map of entries one at a time and set the record reader’s
writable objects for the key and value. A return value of true
informs the framework that there is a
key/value pair to process. Once we have exhausted all the key/value
pairs, false is returned
so the
map task can complete.
The other methods of the record reader are used by the
framework to get the current key and value for the mapper to
process. It is worthwhile to reuse this object whenever possible.
The getProgress
method is useful
for reporting gradual status to the JobTracker and should also be
reused if possible. Finally, the close
method is for finalizing the
process. Since we pulled all the information and disconnected from
Redis in the initialize
method,
there is nothing to do here.
public
static
class
RedisHashRecordReader
extends
RecordReader
<
Text
,
Text
>
{
private
static
final
Logger
LOG
=
Logger
.
getLogger
(
RedisHashRecordReader
.
class
);
private
Iterator
<
Entry
<
String
,
String
>>
keyValueMapIter
=
null
;
private
Text
key
=
new
Text
(),
value
=
new
Text
();
private
float
processedKVs
=
0
,
totalKVs
=
0
;
private
Entry
<
String
,
String
>
currentEntry
=
null
;
public
void
initialize
(
InputSplit
split
,
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
// Get the host location from the InputSplit
String
host
=
split
.
getLocations
()[
0
];
String
hashKey
=
((
RedisHashInputSplit
)
split
).
getHashKey
();
LOG
.
info
(
"Connecting to "
+
host
+
" and reading from "
+
hashKey
);
Jedis
jedis
=
new
Jedis
(
host
);
jedis
.
connect
();
jedis
.
getClient
().
setTimeoutInfinite
();
// Get all the key/value pairs from the Redis instance and store
// them in memory
totalKVs
=
jedis
.
hlen
(
hashKey
);
keyValueMapIter
=
jedis
.
hgetAll
(
hashKey
).
entrySet
().
iterator
();
LOG
.
info
(
"Got "
+
totalKVs
+
" from "
+
hashKey
);
jedis
.
disconnect
();
}
public
boolean
nextKeyValue
()
throws
IOException
,
InterruptedException
{
// If the key/value map still has values
if
(
keyValueMapIter
.
hasNext
())
{
// Get the current entry and set the Text objects to the entry
currentEntry
=
keyValueMapIter
.
next
();
key
.
set
(
currentEntry
.
getKey
());
value
.
set
(
currentEntry
.
getValue
());
return
true
;
}
else
{
// No more values? return false.
return
false
;
}
}
public
Text
getCurrentKey
()
throws
IOException
,
InterruptedException
{
return
key
;
}
public
Text
getCurrentValue
()
throws
IOException
,
InterruptedException
{
return
value
;
}
public
float
getProgress
()
throws
IOException
,
InterruptedException
{
return
processedKVs
/
totalKVs
;
}
public
void
close
()
throws
IOException
{
// nothing to do here
}
}
Much like the previous example’s driver, we use the public static
methods provided by the
input format to modify the job configuration. Since we are just
using the identity mapper, we don’t need to set any special classes.
The number of reduce tasks is set to zero to specify that this is a
map-only job.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
new
Configuration
();
String
hosts
=
otherArgs
[
0
];
String
hashKey
=
otherArgs
[
1
];
Path
outputDir
=
new
Path
(
otherArgs
[
2
]);
Job
job
=
new
Job
(
conf
,
"Redis Input"
);
job
.
setJarByClass
(
RedisInputDriver
.
class
);
// Use the identity mapper
job
.
setNumReduceTasks
(
0
);
job
.
setInputFormatClass
(
RedisHashInputFormat
.
class
);
RedisHashInputFormat
.
setRedisHosts
(
job
,
hosts
);
RedisHashInputFormat
.
setRedisHashKey
(
job
,
hashKey
);
job
.
setOutputFormatClass
(
TextOutputFormat
.
class
);
TextOutputFormat
.
setOutputPath
(
job
,
outputDir
);
job
.
setOutputKeyClass
(
Text
.
class
);
job
.
setOutputValueClass
(
Text
.
class
);
System
.
exit
(
job
.
waitForCompletion
(
true
)
?
0
:
3
);
}
Partition pruning configures the way the framework picks input splits and drops files from being loaded into MapReduce based on the name of the file.
You have a set of data that is partitioned by a predetermined value, which you can use to dynamically load the data based on what is requested by the application.
Typically, all the data loaded into a MapReduce job is assigned into map tasks and read in parallel. If entire files are going to be thrown out based on the query, loading all of the files is a large waste of processing time. By partitioning the data by a common value, you can avoid significant amounts of processing time by looking only where the data would exist. For example, if you are commonly analyzing data based on date ranges, partitioning your data by date will make it so you only need to load the data inside of that range.
The added caveat to this pattern is this should be handled transparently, so you can run the same MapReduce job over and over again, but over different data sets. This is done by simply changing the data you are querying for, rather than changing the implementation of the job. A great way to do this would be to strip away how the data is stored on the file system and instead put it inside an input format. The input format knows where to locate and get the data, allowing the number of map tasks generated to change based on the query.
This is exceptionally useful if the data storage is volatile and likely to change. If you have dozens of analytics using some type of partitioned input format, you can change the input format implementation and simply recompile all analytics using the new input format code. Since all your analytics get input from a query rather than a file, you don’t need to re-implement how the data is read into the analytic. This can save a massive amount of development time, making you look really good to your boss!
Figure 7-4 shows the structure for partition pruning, explained below.
The InputFormat
is
where this pattern comes to life. The getSplits
method is where we pay special
attention, because it determines the input splits that will be
created, and thus the number of map tasks. While the configuration
is typically a set of files, configuration turns into more of a
query than a set of file paths. For instance, if data is stored on
a file system by date, the InputFormat
can accept a date range as
input, then determine which folders to pull into the MapReduce
job. If data is sharded in an external service by date, say 12
shards for each month, only one shard needs to be read by the
MapReduce job when looking for data in March. The key here is that
the input format determines where the data comes from based on a
query, rather than passing in a set of files.
The RecordReader
implementation depends on how the data is being stored. If it is a
file-based input, something like a LineRecordReader
can be used to read
key/value pairs from a file. If it is an external source, you’ll
have to customize something more to your needs.
Partition pruning changes only the amount of data that is read by the MapReduce job, not the eventual outcome of the analytic. The main reason for partition pruning is to reduce the overall processing time to read in data. This is done by ignoring input that will not produce any output before it even gets to a map task.
Many modern relational databases handle partition pruning transparently. When you create the table, you specify how the database should partition the data and the database will handle the rest on inserts. Hive also supports partitioning.
CREATE
TABLE
parted_data
(
foo_date
DATE
)
PARTITION
BY
RANGE
(
foo_date
)
(
PARTITION
foo_2012
VALUES
LESS
THAN
(
TO_DATE
(
'01/01/2013'
,
'DD/MM/YYYY'
)),
PARTITION
foo_2011
VALUES
LESS
THAN
(
TO_DATE
(
'01/01/2012'
,
'DD/MM/YYYY'
)),
PARTITION
foo_2010
VALUES
LESS
THAN
(
TO_DATE
(
'01/01/2011'
,
'DD/MM/YYYY'
)),
);
Then, when you query with a specific value in the WHERE
clause, the database will
automatically use only the relevant partitions.
SELECT
*
FROM
parted_data
WHERE
foo_date
=
TO_DATE
(
'01/31/2012'
);
The data in this pattern is loaded into each map task is as fast as in any other pattern. Only the number of tasks changes based on the query at hand. Utilizing this pattern can provide massive gains by reducing the number of tasks that need to be created that would not have generated output anyways. Outside of the I/O, the performance depends on the other pattern being applied in the map and reduce phases of the job.
This example demonstrates a smarter way to store and read data in Redis. Rather than randomly distributing the user-to-reputation mappings, we can partition this data on particular criteria. The user-to-reputation mappings are partitioned based on last access date and stored in six different Redis instances. Two months of data are stored in separate hashes on each Redis instance. That is, January and February are stored in different hashes on Redis instance 0, March and April on instance 1, and so on.
By distributing the data in this manner, we can more intelligently read it based on a user query. Whereas the previous examples took in a list of Redis instances and a hash key via the command line, this pattern hardcodes all the logic of where and how to store the data in the output format, as well as in the input format. This completely strips away knowledge from the mapper and reducer of where the data is coming from, which has its advantages and disadvantages for a developer using our input and output formats.
It may not be the best idea to actually hardcode information into the Java code itself, but instead have a rarely-changing configuration file that can be found by your formats. This way, things can still be changed if necessary and prevent a recompile. Environment variables work nicely, or it can just be passed in via the command line.
The sections below with its corresponding code explain the following problem.
Problem: Given a set of user data, partition the user-to-reputation mappings by last access date across six Redis instances.
To help better store information, a custom WritableComparable
is implemented in order
to allow the mapper to set information needed by the record writer.
This class contains methods to set and get the field name to be
stored in Redis, as well as the last access month. The last access
month accepts a zero-based integer value for the month, but is later
turned into a string representation for easier querying in the next
example. Take the time to implement the compareTo
, toString
, and hashCode
methods (like every good Java
developer!).
public
static
class
RedisKey
implements
WritableComparable
<
RedisKey
>
{
private
int
lastAccessMonth
=
0
;
private
Text
field
=
new
Text
();
public
int
getLastAccessMonth
()
{
return
this
.
lastAccessMonth
;
}
public
void
setLastAccessMonth
(
int
lastAccessMonth
)
{
this
.
lastAccessMonth
=
lastAccessMonth
;
}
public
Text
getField
()
{
return
this
.
field
;
}
public
void
setField
(
String
field
)
{
this
.
field
.
set
(
field
);
}
public
void
readFields
(
DataInput
in
)
throws
IOException
{
lastAccessMonth
=
in
.
readInt
();
this
.
field
.
readFields
(
in
);
}
public
void
write
(
DataOutput
out
)
throws
IOException
{
out
.
writeInt
(
lastAccessMonth
);
this
.
field
.
write
(
out
);
}
public
int
compareTo
(
RedisKey
rhs
)
{
if
(
this
.
lastAccessMonth
==
rhs
.
getLastAccessMonth
())
{
return
this
.
field
.
compareTo
(
rhs
.
getField
());
}
else
{
return
this
.
lastAccessMonth
<
rhs
.
getLastAccessMonth
()
?
-
1
:
1
;
}
}
public
String
toString
()
{
return
this
.
lastAccessMonth
+
" "
+
this
.
field
.
toString
();
}
public
int
hashCode
()
{
return
toString
().
hashCode
();
}
}
This output format is extremely basic, as all the grunt work is handled in
the record writer. The main thing to focus on is the templated
arguments when extending the OutputFormat
class. This output format accepts our custom class as the output key
and a Text
object as the output
value. Any other classes will cause errors when trying to write any
output.
Since our record writer implementation is coded to a specific
and known output, there is no need to verify any output
specification of the job. An output committer is still required by
the framework, so we use NullOutputFormat
’s
output committer.
public
static
class
RedisLastAccessOutputFormat
extends
OutputFormat
<
RedisKey
,
Text
>
{
public
RecordWriter
<
RedisKey
,
Text
>
getRecordWriter
(
TaskAttemptContext
job
)
throws
IOException
,
InterruptedException
{
return
new
RedisLastAccessRecordWriter
();
}
public
void
checkOutputSpecs
(
JobContext
context
)
throws
IOException
,
InterruptedException
{
}
public
OutputCommitter
getOutputCommitter
(
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
return
(
new
NullOutputFormat
<
Text
,
Text
>()).
getOutputCommitter
(
context
);
}
public
static
class
RedisLastAccessRecordWriter
extends
RecordWriter
<
RedisKey
,
Text
>
{
// Code in next section
}
}
The RedisLastAccessRecordWriter
is templated to accept the same classes as the output
format. The construction of the class connects to all six Redis
instances and puts them in a map. This map stores the
month-to-Redis-instance mappings and is used in the write
method to retrieve the proper
instance. The write
method then
uses a map of month int to a three character month code for
serialization. This map is omitted for brevity, but looks something
like 0→JAN, 1→FEB, ..., 11→DEC. This means all the hashes in Redis are
named based on the three-character month code. The close
method disconnects all the Redis
instances.
public
static
class
RedisLastAccessRecordWriter
extends
RecordWriter
<
RedisKey
,
Text
>
{
private
HashMap
<
Integer
,
Jedis
>
jedisMap
=
new
HashMap
<
Integer
,
Jedis
>();
public
RedisLastAccessRecordWriter
()
{
// Create a connection to Redis for each host
int
i
=
0
;
for
(
String
host
:
MRDPUtils
.
REDIS_INSTANCES
)
{
Jedis
jedis
=
new
Jedis
(
host
);
jedis
.
connect
();
jedisMap
.
put
(
i
,
jedis
);
jedisMap
.
put
(
i
+
1
,
jedis
);
i
+=
2
;
}
}
public
void
write
(
RedisKey
key
,
Text
value
)
throws
IOException
,
InterruptedException
{
// Get the Jedis instance that this key/value pair will be
// written to -- (0,1)->0, (2-3)->1, ... , (10-11)->5
Jedis
j
=
jedisMap
.
get
(
key
.
getLastAccessMonth
());
// Write the key/value pair
j
.
hset
(
MONTH_FROM_INT
.
get
(
key
.
getLastAccessMonth
()),
key
.
getField
().
toString
(),
value
.
toString
());
}
public
void
close
(
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
// For each jedis instance, disconnect it
for
(
Jedis
jedis
:
jedisMap
.
values
())
{
jedis
.
disconnect
();
}
}
}
The mapper code parses each input record and sets the values
for the output RedisKey
and the
output value. The month of the last access data is parsed via the
Calendar
and SimpleDateFormat
classes.
public
static
class
RedisLastAccessOutputMapper
extends
Mapper
<
Object
,
Text
,
RedisKey
,
Text
>
{
// This object will format the creation date string into a Date object
private
final
static
SimpleDateFormat
frmt
=
new
SimpleDateFormat
(
"yyyy-MM-dd'T'HH:mm:ss.SSS"
);
private
RedisKey
outkey
=
new
RedisKey
();
private
Text
outvalue
=
new
Text
();
public
void
map
(
Object
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
Map
<
String
,
String
>
parsed
=
MRDPUtils
.
transformXmlToMap
(
value
.
toString
());
String
userId
=
parsed
.
get
(
"Id"
);
String
reputation
=
parsed
.
get
(
"Reputation"
);
// Grab the last access date
String
strDate
=
parsed
.
get
(
"LastAccessDate"
);
// Parse the string into a Calendar object
Calendar
cal
=
Calendar
.
getInstance
();
cal
.
setTime
(
frmt
.
parse
(
strDate
));
// Set our output key and values
outkey
.
setLastAccessMonth
(
cal
.
get
(
Calendar
.
MONTH
));
outkey
.
setField
(
userId
);
outvalue
.
set
(
reputation
);
context
.
write
(
outkey
,
outvalue
);
}
}
The driver looks very similar to a more basic job configuration. All the special configuration is entirely handled by the output format class and record writer.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
new
Configuration
();
Path
inputPath
=
new
Path
(
args
[
0
]);
Job
job
=
new
Job
(
conf
,
"Redis Last Access Output"
);
job
.
setJarByClass
(
PartitionPruningOutputDriver
.
class
);
job
.
setMapperClass
(
RedisLastAccessOutputMapper
.
class
);
job
.
setNumReduceTasks
(
0
);
job
.
setInputFormatClass
(
TextInputFormat
.
class
);
TextInputFormat
.
setInputPaths
(
job
,
inputPath
);
job
.
setOutputFormatClass
(
RedisHashSetOutputFormat
.
class
);
job
.
setOutputKeyClass
(
RedisKey
.
class
);
job
.
setOutputValueClass
(
Text
.
class
);
int
code
=
job
.
waitForCompletion
(
true
)
?
0
:
2
;
System
.
exit
(
code
);
}
This example demonstrates how to query for the information we just stored in Redis. Unlike most examples, where you provide some path to files in HDFS, we instead just pass in the months of data we want. Figuring out where to get the data is entirely handled intelligently by the input format.
The heart of partition pruning is to avoid reading data that you don’t have to read. By storing the user-to-reputation mappings across six different Redis servers, we need to connect only to the instances that are hosting the requested month’s data. Even better, we need to read only from the hashes that are holding the specific month. For instance, passing in “JAN,FEB,MAR,NOV” on the command line will create three input splits, one for each Redis instance hosting the data (0, 1, and 5). All the data on Redis instance 0 will be read, but only the first months on Redis instances 1 and 5 will be pulled. This is much better than having to connect to all the desired instances and read all the data, only to throw most of it away!
The sections below with its corresponding code explain the following problem.
Problem: Given a query for user to reputation mappings by months, read only the data required to satisfy the query in parallel.
The input split shown here is very similar to the input split in External Source Input Example. Instead of storing one hash key, we are going to store multiple hash keys. This is because the data is partitioned based on month, instead of all the data being randomly distributed in one hash.
public
static
class
RedisLastAccessInputSplit
extends
InputSplit
implements
Writable
{
private
String
location
=
null
;
private
List
<
String
>
hashKeys
=
new
ArrayList
<
String
>();
public
RedisLastAccessInputSplit
()
{
// Default constructor for reflection
}
public
RedisLastAccessInputSplit
(
String
redisHost
)
{
this
.
location
=
redisHost
;
}
public
void
addHashKey
(
String
key
)
{
hashKeys
.
add
(
key
);
}
public
void
removeHashKey
(
String
key
)
{
hashKeys
.
remove
(
key
);
}
public
List
<
String
>
getHashKeys
()
{
return
hashKeys
;
}
public
void
readFields
(
DataInput
in
)
throws
IOException
{
location
=
in
.
readUTF
();
int
numKeys
=
in
.
readInt
();
hashKeys
.
clear
();
for
(
int
i
=
0
;
i
<
numKeys
;
++
i
)
{
hashKeys
.
add
(
in
.
readUTF
());
}
}
public
void
write
(
DataOutput
out
)
throws
IOException
{
out
.
writeUTF
(
location
);
out
.
writeInt
(
hashKeys
.
size
());
for
(
String
key
:
hashKeys
)
{
out
.
writeUTF
(
key
);
}
}
public
long
getLength
()
throws
IOException
,
InterruptedException
{
return
0
;
}
public
String
[]
getLocations
()
throws
IOException
,
InterruptedException
{
return
new
String
[]
{
location
};
}
}
This input format class intelligently creates RedisLastAccessInputSplit
objects from the
selected months of data. Much like the output format we showed
earlier in OutputFormat code, this output
format writes RedisKey
objects,
this input format reads the same objects and is templated to enforce
this on mapper implementations. It initially creates a hash map of
host-to-input splits in order to add the hash keys to the input
split, rather than adding both months of data to the same split. If
a split has not been created for a particular month, a new one is
created and the month hash key is added. Otherwise, the hash key is
added to the split that has already been created. A List
is then created out of the values
stored in the map. This will create a number of input splits
equivalent to the number of Redis instances required to satisfy the
query.
There are a number of helpful hash maps to help convert a
month string to an integer, as well as figure out which Redis
instance hosts which month of data. The initialization of these hash
maps are ommitted from the static
block for brevity.
public
static
class
RedisLastAccessInputFormat
extends
InputFormat
<
RedisKey
,
Text
>
{
public
static
final
String
REDIS_SELECTED_MONTHS_CONF
=
"mapred.redilastaccessinputformat.months"
;
private
static
final
HashMap
<
String
,
Integer
>
MONTH_FROM_STRING
=
new
HashMap
<
String
,
Integer
>();
private
static
final
HashMap
<
String
,
String
>
MONTH_TO_INST_MAP
=
new
HashMap
<
String
,
String
>();
private
static
final
Logger
LOG
=
Logger
.
getLogger
(
RedisLastAccessInputFormat
.
class
);
static
{
// Initialize month to Redis instance map
// Initialize month 3 character code to integer
}
public
static
void
setRedisLastAccessMonths
(
Job
job
,
String
months
)
{
job
.
getConfiguration
().
set
(
REDIS_SELECTED_MONTHS_CONF
,
months
);
}
public
List
<
InputSplit
>
getSplits
(
JobContext
job
)
throws
IOException
{
String
months
=
job
.
getConfiguration
().
get
(
REDIS_SELECTED_MONTHS_CONF
);
if
(
months
==
null
||
months
.
isEmpty
())
{
throw
new
IOException
(
REDIS_SELECTED_MONTHS_CONF
+
" is null or empty."
);
}
// Create input splits from the input months
HashMap
<
String
,
RedisLastAccessInputSplit
>
instanceToSplitMap
=
new
HashMap
<
String
,
RedisLastAccessInputSplit
>();
for
(
String
month
:
months
.
split
(
","
))
{
String
host
=
MONTH_TO_INST_MAP
.
get
(
month
);
RedisLastAccessInputSplit
split
=
instanceToSplitMap
.
get
(
host
);
if
(
split
==
null
)
{
split
=
new
RedisLastAccessInputSplit
(
host
);
split
.
addHashKey
(
month
);
instanceToSplitMap
.
put
(
host
,
split
);
}
else
{
split
.
addHashKey
(
month
);
}
}
LOG
.
info
(
"Input splits to process: "
+
instanceToSplitMap
.
values
().
size
());
return
new
ArrayList
<
InputSplit
>(
instanceToSplitMap
.
values
());
}
public
RecordReader
<
RedisKey
,
Text
>
createRecordReader
(
InputSplit
split
,
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
return
new
RedisLastAccessRecordReader
();
}
public
static
class
RedisLastAccessRecordReader
extends
RecordReader
<
RedisKey
,
Text
>
{
// Code in next section
}
}
The RedisLastAccessRecordReader
is a
bit more complicated than the other record readers we
have seen. It needs to read from multiple hashes, rather than just
reading everything at once in the initialize
method. Here, the configuration
is simply read in this method.
In nextKeyValue
, a new
connection to Redis is created if the iterator through the hash is
null, or if we have reached the end of all the hashes to read. If
the iterator through the hashes does not have a next value, we
immediately return false
, as
there is no more data for the map task. Otherwise, we connect to
Redis and pull all the data from the specific hash. The hash
iterator is then used to exhaust all the field value pairs from
Redis. A do-while loop is used to ensure that once a hash iterator
is complete, it will loop back around to get data from the next hash
or inform the task there is no more data to be read.
The implementation of the remaining methods are identical to
that of the RedisHashRecordReader
and are omitted.
public
static
class
RedisLastAccessRecordReader
extends
RecordReader
<
RedisKey
,
Text
>
{
private
static
final
Logger
LOG
=
Logger
.
getLogger
(
RedisLastAccessRecordReader
.
class
)
;
private
Entry
<
String
,
String
>
currentEntry
=
null
;
private
float
processedKVs
=
0
,
totalKVs
=
0
;
private
int
currentHashMonth
=
0
;
private
Iterator
<
Entry
<
String
,
String
>
>
hashIterator
=
null
;
private
Iterator
<
String
>
hashKeys
=
null
;
private
RedisKey
key
=
new
RedisKey
(
)
;
private
String
host
=
null
;
private
Text
value
=
new
Text
(
)
;
public
void
initialize
(
InputSplit
split
,
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
// Get the host location from the InputSplit
host
=
split
.
getLocations
(
)
[
0
]
;
// Get an iterator of all the hash keys we want to read
hashKeys
=
(
(
RedisLastAccessInputSplit
)
split
)
.
getHashKeys
(
)
.
iterator
(
)
;
LOG
.
info
(
"Connecting to "
+
host
)
;
}
public
boolean
nextKeyValue
(
)
throws
IOException
,
InterruptedException
{
boolean
nextHashKey
=
false
;
do
{
// if this is the first call or the iterator does not have a
// next
if
(
hashIterator
=
=
null
|
|
!
hashIterator
.
hasNext
(
)
)
{
// if we have reached the end of our hash keys, return
// false
if
(
!
hashKeys
.
hasNext
(
)
)
{
// ultimate end condition, return false
return
false
;
}
else
{
// Otherwise, connect to Redis and get all
// the name/value pairs for this hash key
Jedis
jedis
=
new
Jedis
(
host
)
;
jedis
.
connect
(
)
;
String
strKey
=
hashKeys
.
next
(
)
;
currentHashMonth
=
MONTH_FROM_STRING
.
get
(
strKey
)
;
hashIterator
=
jedis
.
hgetAll
(
strKey
)
.
entrySet
(
)
.
iterator
(
)
;
jedis
.
disconnect
(
)
;
}
}
// If the key/value map still has values
if
(
hashIterator
.
hasNext
(
)
)
{
// Get the current entry and set
// the Text objects to the entry
currentEntry
=
hashIterator
.
next
(
)
;
key
.
setLastAccessMonth
(
currentHashMonth
)
;
key
.
setField
(
currentEntry
.
getKey
(
)
)
;
value
.
set
(
currentEntry
.
getValue
(
)
)
;
}
else
{
nextHashKey
=
true
;
}
}
while
(
nextHashKey
)
;
return
true
;
}
.
.
.
}
The driver code sets the months most recently accessed passed in via the command line. This configuration parameter is used by the input format to determine which Redis instances to read from, rather than reading from every Redis instance. It also sets the output directory for the job. Again, it uses the identity mapper rather than performing any analysis on the data retrieved.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
new
Configuration
();
String
lastAccessMonths
=
args
[
0
];
Path
outputDir
=
new
Path
(
args
[
1
]);
Job
job
=
new
Job
(
conf
,
"Redis Input"
);
job
.
setJarByClass
(
PartitionPruningInputDriver
.
class
);
// Use the identity mapper
job
.
setNumReduceTasks
(
0
);
job
.
setInputFormatClass
(
RedisLastAccessInputFormat
.
class
);
RedisLastAccessInputFormat
.
setRedisLastAccessMonths
(
job
,
lastAccessMonths
);
job
.
setOutputFormatClass
(
TextOutputFormat
.
class
);
TextOutputFormat
.
setOutputPath
(
job
,
outputDir
);
job
.
setOutputKeyClass
(
RedisKey
.
class
);
job
.
setOutputValueClass
(
Text
.
class
);
System
.
exit
(
job
.
waitForCompletion
(
true
)
?
0
:
2
);
}