RHadoop is a collection of three R packages for providing large data operations with an R environment. It was developed by Revolution Analytics, which is the leading commercial provider of software based on R. RHadoop is available with three main R packages: rhdfs
, rmr
, and rhbase
. Each of them offers different Hadoop features.
rhdfs
is an R interface for providing the HDFS usability from the R console. As Hadoop MapReduce programs write their output on HDFS, it is very easy to access them by calling the rhdfs
methods. The R programmer can easily perform read and write operations on distributed data files. Basically, rhdfs
package calls the HDFS API in backend to operate data sources stored on HDFS.rmr
is an R interface for providing Hadoop MapReduce facility inside the R environment. So, the R programmer needs to just divide their application logic into the map and reduce phases and submit it with the rmr
methods. After that, rmr
calls the Hadoop streaming MapReduce API with several job parameters as input directory, output directory, mapper, reducer, and so on, to perform the R MapReduce job over Hadoop cluster.rhbase
is an R interface for operating the Hadoop HBase data source stored at the distributed network via a Thrift server. The rhbase
package is designed with several methods for initialization and read/write and table manipulation operations.Here it's not necessary to install all of the three RHadoop packages to run the Hadoop MapReduce operations with R and Hadoop. If we have stored our input data source at the HBase data source, we need to install rhbase
; else we require rhdfs
and rmr
packages. As Hadoop is most popular for its two main features, Hadoop MapReduce and HDFS, both of these features will be used within the R console with the help of RHadoop rhdfs
and rmr
packages. These packages are enough to run Hadoop MapReduce from R. Basically, rhdfs
provides HDFS data operations while rmr
provides MapReduce execution operations.
RHadoop also includes another package called quick check
, which is designed for debugging the developed MapReduce job defined by the rmr
package.
In the next section, we will see their architectural relationships as well as their installation steps.
Since Hadoop is highly popular because of HDFS and MapReduce, Revolution Analytics has developed separate R packages, namely, rhdfs
, rmr
, and rhbase
. The architecture of RHadoop is shown in the following diagram:
In this section, we will learn some installation tricks for the three RHadoop packages including their prerequisites.
RHadoop is already tested with several Hadoop distributions provided by Cloudera, Hortonworks, and MapR.
We can install them by calling the execution of the following R command in the R console:
install.packages( c('rJava','RJSONIO', 'itertools', 'digest','Rcpp','httr','functional','devtools', 'plyr','reshape2'))
## Setting HADOOP_CMD Sys.setenv(HADOOP_CMD="/usr/local/hadoop/bin/hadoop") ## Setting up HADOOP_STREAMING Sys.setenv(HADOOP_STREAMING="/usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar")
or, we can also set the R console via the command line as follows:
export HADOOP_CMD=/usr/local/Hadoop export HADOOP_STREAMING=/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar
rhdfs
, rmr
, rhbase
]rmr
: [rmr-2.2.2.tar.gz
]rhdfs
: [rhdfs-1.6.0.tar.gz
]rhbase
: [rhbase-1.2.0.tar.gz
]R CMD INSTALL rmr-2.2.2.tar.gz
rhdfs
we use:R CMD INSTALL rmr-2.2.2.tar.gz
rhbase
we use:R CMD INSTALL rhbase-1.2.0.tar.gz
Once we complete the installation of RHadoop, we can test the setup by running the MapReduce job with the rmr2
and rhdfs
libraries in the RHadoop sample program as follows:
## loading the libraries library(rhdfs') library('rmr2') ## initializing the RHadoop hdfs.init() # defining the input data small.ints = to.dfs(1:10) ## Defining the MapReduce job mapreduce( # defining input parameters as small.ints hdfs object, map parameter as function to calculate the min and max for generated random deviates. input = small.ints, map = function(k, v) { lapply(seq_along(v), function(r){ x <- runif(v[[r]]) keyval(r,c(max(x),min(x))) })})
After running these lines, simply pressing Ctrl + Enter will execute this MapReduce program. If it succeeds, the last line will appear as shown in the following screenshot:
Where characters of that last line indicate the output location of the MapReduce job.
To read the result of the executed MapReduce job, copy the output location, as provided in the last line, and pass it to the from.dfs()
function of rhdfs
.
Where the first column of the previous output indicates the max value, and the second one the min value.
MapReduce problem definition: This RHadoop MapReduce program is defined for identifying the frequency of all the words that are present in the provided input text files.
Also, note that this is the same MapReduce problem as we learned in the previous section about RHIPE in Chapter 2, Writing Hadoop MapReduce Programs.
wordcount = function(input, output = NULL, pattern = " "){
Map phase: This map
function will read the text file line by line and split them by spaces. This map phase will assign 1
as a value to all the words that are caught by the mapper.
wc.map = function(., lines) { keyval( unlist( strsplit( x = lines, split = pattern)), 1)}
Reduce phase: Reduce phase will calculate the total frequency of all the words by performing sum operations over words with the same keys.
wc.reduce = function(word, counts ) { keyval(word, sum(counts))}
Defining the MapReduce job: After defining the word count mapper and reducer, we need to create the driver
method that starts the execution of MapReduce.
# To execute the defined Mapper and Reducer functions # by specifying the input, output, map, reduce and input.format as parameters. # Syntax: # mapreduce(input, output, input.format, map,reduce, # combine) mapreduce(input = input , output = output, input.format = "text", map = wc.map, reduce = wc.reduce, combine = T)}
Executing the MapReduce job: We will execute the RHadoop MapReduce job by passing the input data location as a parameter for the wordcount
function.
wordcount('/RHadoop/1/')
Exploring the wordcount
output:
from.dfs("/tmp/RtmpRMIXzb/file2bda5e10e25f")
RHadoop has three different packages, which are in terms of HDFS, MapReduce, and HBase operations, to perform operations over the data.
Here we will see how to use the rmr
and rhdfs
package functions:
The categorized functions are:
To retrieve the hdfs
configuration defaults, refer to the following screenshot:
hdfs.put
: This is used to copy files from the local filesystem to the HDFS filesystem.hdfs.put('/usr/local/hadoop/README.txt','/RHadoop/1/')
hdfs.copy
: This is used to copy files from the HDFS directory to the local filesystem.hdfs.put('/RHadoop/1/','/RHadoop/2/')
hdfs.move
: This is used to move a file from one HDFS directory to another HDFS directory.hdfs.move('/RHadoop/1/README.txt','/RHadoop/2/')
hdfs.rename
: This is used to rename the file stored at HDFS from R.hdfs.rename('/RHadoop/README.txt','/RHadoop/README1.txt')
hdfs.delete
: This is used to delete the HDFS file or directory from R.hdfs.delete("/RHadoop")
hdfs.rm
: This is used to delete the HDFS file or directory from R.hdfs.rm("/RHadoop")
hdfs.chmod
: This is used to change permissions of some files.hdfs.chmod('/RHadoop', permissions= '777')
hdfs.file
: This is used to initialize the file to be used for read/write operation.f = hdfs.file("/RHadoop/2/README.txt","r",buffersize=104857600)
hdfs.write
: This is used to write in to the file stored at HDFS via streaming.f = hdfs.file("/RHadoop/2/README.txt","r",buffersize=104857600) hdfs.write(object,con,hsync=FALSE)
hdfs.close
: This is used to close the stream when a file operation is complete. It will close the stream and will not allow further file operations.hdfs.close(f)
hdfs.read
: This is used to read from binary files on the HDFS directory. This will use the stream for the deserialization of the data.f = hdfs.file("/RHadoop/2/README.txt","r",buffersize=104857600)
m = hdfs.read(f)
c = rawToChar(m)
print(c)