Introducing RHadoop

RHadoop is a collection of three R packages for providing large data operations with an R environment. It was developed by Revolution Analytics, which is the leading commercial provider of software based on R. RHadoop is available with three main R packages: rhdfs, rmr, and rhbase. Each of them offers different Hadoop features.

  • rhdfs is an R interface for providing the HDFS usability from the R console. As Hadoop MapReduce programs write their output on HDFS, it is very easy to access them by calling the rhdfs methods. The R programmer can easily perform read and write operations on distributed data files. Basically, rhdfs package calls the HDFS API in backend to operate data sources stored on HDFS.
  • rmr is an R interface for providing Hadoop MapReduce facility inside the R environment. So, the R programmer needs to just divide their application logic into the map and reduce phases and submit it with the rmr methods. After that, rmr calls the Hadoop streaming MapReduce API with several job parameters as input directory, output directory, mapper, reducer, and so on, to perform the R MapReduce job over Hadoop cluster.
  • rhbase is an R interface for operating the Hadoop HBase data source stored at the distributed network via a Thrift server. The rhbase package is designed with several methods for initialization and read/write and table manipulation operations.

Here it's not necessary to install all of the three RHadoop packages to run the Hadoop MapReduce operations with R and Hadoop. If we have stored our input data source at the HBase data source, we need to install rhbase; else we require rhdfs and rmr packages. As Hadoop is most popular for its two main features, Hadoop MapReduce and HDFS, both of these features will be used within the R console with the help of RHadoop rhdfs and rmr packages. These packages are enough to run Hadoop MapReduce from R. Basically, rhdfs provides HDFS data operations while rmr provides MapReduce execution operations.

RHadoop also includes another package called quick check, which is designed for debugging the developed MapReduce job defined by the rmr package.

In the next section, we will see their architectural relationships as well as their installation steps.

Understanding the architecture of RHadoop

Since Hadoop is highly popular because of HDFS and MapReduce, Revolution Analytics has developed separate R packages, namely, rhdfs, rmr, and rhbase. The architecture of RHadoop is shown in the following diagram:

Understanding the architecture of RHadoop

RHadoop Ecosystem

Installing RHadoop

In this section, we will learn some installation tricks for the three RHadoop packages including their prerequisites.

  • R and Hadoop installation: As we are going to use an R and Hadoop integrated environment, we need Hadoop as well as R installed on our machine. If you haven't installed yet, see Chapter 1, Getting Ready to Use R and Hadoop. As we know, if we have too much data, we need to scale our cluster by increasing the number of nodes. Based on this, to get RHadoop installed on our system we need Hadoop with either a single node or multimode installation as per the size of our data.

    RHadoop is already tested with several Hadoop distributions provided by Cloudera, Hortonworks, and MapR.

  • Installing the R packages: We need several R packages to be installed that help it to connect R with Hadoop. The list of the packages is as follows:
    • rJava
    • RJSONIO
    • itertools
    • digest
    • Rcpp
    • httr
    • functional
    • devtools
    • plyr
    • reshape2

    We can install them by calling the execution of the following R command in the R console:

    install.packages( c('rJava','RJSONIO', 'itertools', 'digest','Rcpp','httr','functional','devtools', 'plyr','reshape2'))
    
  • Setting environment variables: We can set this via the R console using the following code:
    ## Setting HADOOP_CMD
    Sys.setenv(HADOOP_CMD="/usr/local/hadoop/bin/hadoop")
    
    ## Setting up HADOOP_STREAMING
    Sys.setenv(HADOOP_STREAMING="/usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar")
    

    or, we can also set the R console via the command line as follows:

    export HADOOP_CMD=/usr/local/Hadoop
    export HADOOP_STREAMING=/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar
    
  • Installing RHadoop [rhdfs, rmr, rhbase]
    1. Download RHadoop packages from GitHub repository of Revolution Analytics: https://github.com/RevolutionAnalytics/RHadoop.
      • rmr: [rmr-2.2.2.tar.gz]
      • rhdfs: [rhdfs-1.6.0.tar.gz]
      • rhbase: [rhbase-1.2.0.tar.gz]
    2. Installing packages.
      • For rmr we use:
        R CMD INSTALL rmr-2.2.2.tar.gz
        
      • For rhdfs we use:
        R CMD INSTALL rmr-2.2.2.tar.gz
        
      • For rhbase we use:
        R CMD INSTALL rhbase-1.2.0.tar.gz
        

      Tip

      To install rhbase, we need to have HBase and Zookeeper installed on our Hadoop cluster.

Understanding RHadoop examples

Once we complete the installation of RHadoop, we can test the setup by running the MapReduce job with the rmr2 and rhdfs libraries in the RHadoop sample program as follows:

## loading the libraries
library(rhdfs')
library('rmr2')

## initializing the RHadoop
hdfs.init()

# defining the input data
small.ints = to.dfs(1:10)

## Defining the MapReduce job
mapreduce(
# defining input parameters as small.ints hdfs object, map parameter as function to calculate the min and max for generated random deviates.
  input = small.ints, 
  map = function(k, v)
  {
lapply(seq_along(v), function(r){
  
  x <- runif(v[[r]])
    keyval(r,c(max(x),min(x))) 
  })})

After running these lines, simply pressing Ctrl + Enter will execute this MapReduce program. If it succeeds, the last line will appear as shown in the following screenshot:

Understanding RHadoop examples

Where characters of that last line indicate the output location of the MapReduce job.

To read the result of the executed MapReduce job, copy the output location, as provided in the last line, and pass it to the from.dfs() function of rhdfs.

Understanding RHadoop examples

Where the first column of the previous output indicates the max value, and the second one the min value.

Word count

MapReduce problem definition: This RHadoop MapReduce program is defined for identifying the frequency of all the words that are present in the provided input text files.

Also, note that this is the same MapReduce problem as we learned in the previous section about RHIPE in Chapter 2, Writing Hadoop MapReduce Programs.

wordcount = function(input, output = NULL, pattern = " "){

Map phase: This map function will read the text file line by line and split them by spaces. This map phase will assign 1 as a value to all the words that are caught by the mapper.

wc.map = function(., lines) {
  keyval(
  unlist(
  strsplit(
  x = lines,
  split = pattern)),
  1)}

Reduce phase: Reduce phase will calculate the total frequency of all the words by performing sum operations over words with the same keys.

wc.reduce = function(word, counts ) {
  keyval(word, sum(counts))}

Defining the MapReduce job: After defining the word count mapper and reducer, we need to create the driver method that starts the execution of MapReduce.

# To execute the defined Mapper and Reducer functions
# by specifying the input, output, map, reduce and input.format as parameters.

# Syntax:
# mapreduce(input, output, input.format, map,reduce,
# combine)

mapreduce(input = input ,
  output = output,
  input.format = "text",
  map = wc.map,
  reduce = wc.reduce,
  combine = T)}

Executing the MapReduce job: We will execute the RHadoop MapReduce job by passing the input data location as a parameter for the wordcount function.

wordcount('/RHadoop/1/')

Exploring the wordcount output:

from.dfs("/tmp/RtmpRMIXzb/file2bda5e10e25f")

Understanding the RHadoop function reference

RHadoop has three different packages, which are in terms of HDFS, MapReduce, and HBase operations, to perform operations over the data.

Here we will see how to use the rmr and rhdfs package functions:

The hdfs package

The categorized functions are:

  • Initialization
    • hdfs.init: This is used to initialize the rhdfs package. Its syntax is hdfs.init().
    • hdfs.defaults: This is used to retrieve and set the rhdfs defaults. Its syntax is hdfs.defaults().

    To retrieve the hdfs configuration defaults, refer to the following screenshot:

    The hdfs package
  • File manipulation
    • hdfs.put: This is used to copy files from the local filesystem to the HDFS filesystem.
      hdfs.put('/usr/local/hadoop/README.txt','/RHadoop/1/')
      
    • hdfs.copy: This is used to copy files from the HDFS directory to the local filesystem.
      hdfs.put('/RHadoop/1/','/RHadoop/2/')
      
    • hdfs.move: This is used to move a file from one HDFS directory to another HDFS directory.
      hdfs.move('/RHadoop/1/README.txt','/RHadoop/2/')
      
    • hdfs.rename: This is used to rename the file stored at HDFS from R.
      hdfs.rename('/RHadoop/README.txt','/RHadoop/README1.txt')
      
    • hdfs.delete: This is used to delete the HDFS file or directory from R.
      hdfs.delete("/RHadoop")
      
    • hdfs.rm: This is used to delete the HDFS file or directory from R.
      hdfs.rm("/RHadoop")
      
    • hdfs.chmod: This is used to change permissions of some files.
      hdfs.chmod('/RHadoop', permissions= '777')
      
  • File read/write:
    • hdfs.file: This is used to initialize the file to be used for read/write operation.
      f = hdfs.file("/RHadoop/2/README.txt","r",buffersize=104857600)
      
    • hdfs.write: This is used to write in to the file stored at HDFS via streaming.
      f = hdfs.file("/RHadoop/2/README.txt","r",buffersize=104857600)
      hdfs.write(object,con,hsync=FALSE)
      
    • hdfs.close: This is used to close the stream when a file operation is complete. It will close the stream and will not allow further file operations.
      hdfs.close(f)
      
    • hdfs.read: This is used to read from binary files on the HDFS directory. This will use the stream for the deserialization of the data.

      f = hdfs.file("/RHadoop/2/README.txt","r",buffersize=104857600)

      m = hdfs.read(f)

      c = rawToChar(m)

      print(c)

  • Directory operation:
    • hdfs.dircreate or hdfs.mkdir: Both these functions will be used for creating a directory over the HDFS filesystem.
      hdfs.mkdir("/RHadoop/2/")
      
    • hdfs.rm or hdfs.rmr or hdfs.delete - to delete the directory or file from HDFS.
      hdfs.rm("/RHadoop/2/")
      
  • Utility:
    • hdfs.ls: This is used to list the directory from HDFS.
      Hdfs.ls('/')
      
      The hdfs package
    • hdfs.file.info: This is used to get meta information about the file stored at HDFS.
      hdfs.file.info("/RHadoop")
      
    The hdfs package

The rmr package

The categories of the functions are as follows:

  • For storing and retrieving data:
    • to.dfs: This is used to write R objects from or to the filesystem.

      small.ints = to.dfs(1:10)

    • from.dfs: This is used to read the R objects from the HDFS filesystem that are in the binary encrypted format.

      from.dfs('/tmp/RtmpRMIXzb/file2bda3fa07850')

  • For MapReduce:
    • mapreduce: This is used for defining and executing the MapReduce job.

      mapreduce(input, output, map, reduce, combine, input.fromat, output.format, verbose)

    • keyval: This is used to create and extract key-value pairs.

      keyval(key, val)

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset