In this recipe, we will use the Apache Giraph API to implement a distributed breadth-first search to determine if two employees are connected in the company's network via one or more pathways. The code will rely on message passing between employee vertices to determine if a vertex is reachable.
Make sure you have a basic familiarity with Google Pregel/BSP and the Giraph API.
You will need access to a pseudo-distributed Hadoop cluster. The code listed in this recipe uses a split master worker configuration that is not ideal in fully-distributed environments. It also assumes familiarity with bash shell scripting.
You will need to load the example dataset gooftech.tsv
to an HDFS folder located at /input/gooftech
.
You will also need to package this code inside a JAR file to be executed by the Hadoop JAR launcher from the shell. The shell script listed in the recipe will show a template for job submission with the correct classpath dependencies.
Carry out the following steps to perform a breadth-first search in Giraph:
EmployeeRDFTextInputFormat.java
. See steps 1 to 3 in the How to do it… section of the Single-source shortest-path with Apache Giraph recipe.EmployeeBreadthFirstSearch.java
:import org.apache.giraph.graph.*; import org.apache.giraph.lib.TextVertexOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * Start with specified employee, mark the target if message is received */ public class EmployeeBreadthFirstSearch implements Tool{ public static final String NAME = "emp_breadth_search"; private Configuration conf; private static final String SOURCE_ID = "emp_src_id"; private static final String DEST_ID = "emp_dest_id"; public EmployeeBreadthFirstSearch(Configuration configuration) { conf = configuration; }
run()
method in the following code sets up the Giraph job configuration:@Override public int run(String[] args) throws Exception { if(args.length < 5) { System.err.println(printUsage()); System.exit(1); } if(args.length > 5) { System.err.println("too many arguments. " +"Did you forget to quote the source or destination ID name ('firstname lastname')"); System.exit(1); } String input = args[0]; String output = args[1]; String source_id = args[2]; String dest_id = args[3]; String zooQuorum = args[4]; conf.set(SOURCE_ID, source_id); conf.set(DEST_ID, dest_id); conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false); conf.setBoolean(GiraphJob.USE_SUPERSTEP_COUNTERS, false); conf.setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0); GiraphJob job = new GiraphJob(conf, "determine connectivity between " + source_id + " and " + dest_id); job.setVertexClass(EmployeeSearchVertex.class); job.setVertexInputFormatClass(EmployeeRDFTextInputFormat.cl ass); job.setVertexOutputFormatClass(BreadthFirstTextOutputFormat.class); job.setZooKeeperConfiguration(zooQuorum); FileInputFormat.addInputPath(job.getInternalJob(), new Path(input)); FileOutputFormat.setOutputPath(job.getInternalJob(), removeAndSetOutput(output)); job.setWorkerConfiguration(1, 1, 100.0f); if(job.run(true)) { long srcCounter = job.getInternalJob().getCounters(). getGroup("Search").findCounter("Source Id found").getValue(); long dstCounter = job.getInternalJob().getCounters().getGroup("Search").findCounter("Dest Id found").getValue(); if(srcCounter == 0 || dstCounter == 0) { System.out.println("Source and/or Dest Id not found in dataset. Check your arguments."); } return 0; } else { return 1; } }
Tool
interface:private Path removeAndSetOutput(String outputDir) throws IOException { FileSystem fs = FileSystem.get(conf); Path path = new Path(outputDir); fs.delete(path, true); return path; } private String printUsage() { return "usage: <input> <output> <single quoted source_id> <single quoted dest_id> <zookeeper_quorum>"; } @Override public void setConf(Configuration conf) { this.conf = conf; } @Override public Configuration getConf() { return conf; }
main()
method instantiates and submits the job using ToolRunner
:public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new EmployeeBreadthFirstSearch(new Configuration()), args)); }
EmployeeSearchVertex
, lets us define a custom compute method to be used during each superstep:public static class EmployeeSearchVertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends EdgeListVertex<Text, IntWritable, NullWritable, IntWritable> { private IntWritable msg = new IntWritable(1); private boolean isSource() { return getId().toString().equals( getConf().get(SOURCE_ID)); } private boolean isDest() { return getId().toString().equals( getConf().get(DEST_ID)); } @Override public void compute(Iterable<IntWritable> messages) throws IOException { if(getSuperstep() == 0) { if(isSource()) { getContext().getCounter("Search", "Source Id found").increment(1); sendMessageToAllEdges(msg); } else if(isDest()){ getContext().getCounter("Search", "Dest Id found").increment(1l); } } boolean connectedToSourceId = false; for(IntWritable msg : messages) { if(isDest()) { setValue(msg); } connectedToSourceId = true; } if(connectedToSourceId) sendMessageToAllEdges(msg); voteToHalt(); } }
BreadthFirstTextOutputFormat
, lets us define a custom OutputFormat
. The BreadtFirstTextOutputFormat
class lets us output our vertex information as Text
key-value pairs back to HDFS:public static class BreadthFirstTextOutputFormat extends TextVertexOutputFormat <Text, IntWritable, NullWritable> { private static class EmployeeRDFVertexWriter extends TextVertexWriter <Text, IntWritable, NullWritable> { private Text valOut = new Text(); private String sourceId = null; private String destId = null; public EmployeeRDFVertexWriter( String sourceId, String destId, RecordWriter<Text, Text> lineRecordWriter) { super(lineRecordWriter); this.sourceId = sourceId; this.destId = destId; } @Override public void writeVertex( Vertex<Text, IntWritable, NullWritable, ?> vertex) throws IOException, InterruptedException { if(vertex.getId().toString().equals(destId)) { if(vertex.getValue().get() > 0) { getRecordWriter().write(new Text(sourceId + " is connected to " + destId), new Text("")); } else { getRecordWriter().write(new Text(sourceId + " is not connected to " + destId), new Text("")); } } } } @Override public VertexWriter<Text, IntWritable, NullWritable> createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException { RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context); String sourceId = context.getConfiguration().get(SOURCE_ID); String destId = context.getConfiguration().get(DEST_ID); return new EmployeeRDFVertexWriter(sourceId, destId, recordWriter); } } }
run_employee_connectivity_search.sh
using the commands listed in the following code snippet. Change GIRAPH_PATH
to match your local path to the Giraph JAR file and change JAR_PATH
to match the local path to your own custom JAR file that you compiled using the preceding code.GIRAPH_PATH=lib/giraph/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$GIRAPH_PATH JAR_PATH=dist/employee_examples.jar export HADOOP_CLASSPATH hadoop jar $JAR_PATH emp_breadth_search -libjars $GIRAPH_PATH,$JAR_PATH /input/gooftech /output/gooftech 'Valery Dorado' 'Gertha Linda' localhost:2181
run_employee_connectivity_search.sh
. You should see the job submitted to the Hadoop cluster. Upon successful completion, you should see a single part file under /output/gooftech
saying Valery Dorado is not connected to Gertha Linda
.run_employee_connectivity_search.sh
. Change the source ID to Shoshana Gatton
. Save and close the script.run_employee_connectivity_search.sh
. The output should now be Shoshana Gatton is connected to Gertha Linda
.To understand how the custom InputFormat
and job
setup works, check out the How it works… section from the recipe titled Single-source shortest-path using Apache Giraph. This recipe uses exactly the same input format, and the same job setup, except for the following differences:
DEST_ID
argument to be supplied by the command line.EmployeeSearchVertex
.OutputFormat
subclass is set to the static inner class BreadthFirstTextOutputFormat
. This is explained in more detail in the following paragraph.The compute()
function inside EmployeeSearchVertex
is where we take advantage of Giraph message passing to determine reachability. Starting at the first superstep, we send a message to each edge from the source ID. If we find the supplied source IDs and destination IDs in the dataset vertices, we increment the counters to let the user know. This will help us quickly see any incorrectly entered command-line arguments for source/destination vertex IDs. After the first superstep, both these counters should be set to 1
. We define a private constant member variable msg
that is set to 1
. The actual numeric content of the message is never used, but by keeping the vertex value as IntWritable
we can use the already built custom InputFormat
EmployeeRDFTextInputFormat
. If during any superstep a vertex receives a message, we forward that message along to each of its edges. If the destination vertex ever receives a message, we set its value to the integer 1
contained in the message. By the end of the job execution, the destination vertex will have a value of 1
, which means it is connected by one or more edges to the source vertex, or to the
initial value of 0
, meaning it never received a message and is not connected.
We define the static inner class BreadthFirstTextOutputFormat
to handle the output formatting. This follows a similar inheritance/delegation pattern to our custom InputFormat
defined earlier, except instead of delegating to a custom RecordReader
, we use a custom RecordWriter
. When we instantiate our TextVertexWriter
subclass EmployeeRDFVertexWriter
, we pass its references to the configured source and destination vertex IDs. The framework handles this automatically by calling the writeVertex()
method for each vertex in our dataset. For this job, we are only interested in printing out whether or not the source vertex is connected by one or more paths to the destination vertex. If the current vertex we are processing is the destination vertex, we will printout one of two strings. If the vertex value is greater than 0
, then that destination must have received one or more messages, which is only possible if there exists at least one path of edge communication between the source and destination. Otherwise, if the value of the destination vertex is still 0
, then we can safely assume that it is not reachable by the source. For just one pair of source-destination nodes, as we have in this recipe, we could have placed this business logic directly in the job class and used counters after the execution finished, but this design is more extensible should we want to use this code to query multiple destination-source vertex pairs.
Programs designed using the Hadoop MapReduce API usually require some additional tuning once you begin testing at scale. It is not uncommon to completely re-evaluate a chosen design pattern that simply does not scale. Working with the Giraph API requires the same diligence and patience.
This is not always easy to spot initially. You may have a relatively small graph that operates very well within a given BSP design approach. Suddenly you hit scale and notice all sorts of errors you never planned for. Try to keep your compute()
function small to avoid complications and aid with troubleshooting. At the time of this writing, Giraph workers will attempt to hold their assigned graph partitions directly in memory. Minimizing vertex memory footprint is of the upmost importance. Moreover, many people have to tune their message passing settings using the parameters located at the top of GiraphJob
. You can control the number of messaging threads used by each worker to communicate with other workers by setting MSG_NUM_FLUSH_THREADS
. By default, Giraph will let each worker open a communication thread to every other worker in the job. For many Hadoop clusters, this is not sustainable. Also, consider adjusting the maximum number of messages allowed to be flushed in bulk using MAX_MESSAGES_PER_FLUSH_PUT
. The default value 2000 may not be adequate for your job.