Chapter 13. Unique Uses of Drill

As you have seen, Apache Drill is capable of querying all kinds of data, large and small, in a variety of different systems. This chapter highlights some examples of unique use cases in which Drill has made complex analysis easy. The first example demonstrates how to use Drill’s suite of geometric functions as well as the image metadata format plug-in to identify photos taken within a geographic region. Next, you will see a demonstration of a situation in which writing a format plug-in was very helpful, for working with Excel files. Finally, we cover several use cases in which analysts greatly expanded Drill’s functionality by creating UDFs.

Finding Photos Taken Within a Geographic Region

In Drill 1.14, two features were added that made this use case possible: the ability to analyze Exchangeable Image File (EXIF) metadata,1 and a collection of geographic information system (GIS) functions allowing all kinds of functionality, including the ability to search within defined geographic areas or polygons as well as the ability to create these polygons. Drill’s GIS functionality largely follows the GIS functionality found in PostGIS.2

The first thing you need to do is to extract the fields that contain the latitude and longitude in the EXIF metadata. The example that follows demonstrates how to access the geocoordinates of an image in Drill (note that not all images will contain these fields):

SELECT t.GPS.GPSLatitude AS lat, t.GPS.GPSLongitude AS lon 
FROM dfs.`photo1.jpg` AS t;

The next thing you need to do is convert these coordinates into a point object that you can use as a filter. You can accomplish this by using the ST_Point(long,lat) function, which converts text or fields into a binary geometry that can be used in subsequent GIS functions:

SELECT ST_Point(t.GPS.GPSLongitude, t.GPS.GPSLatitude) AS geom
FROM dfs.`photo1.jpg` AS t;

Now that you know how to generate the points from the images, the final step is to use the ST_Within(point, polygon) function in the WHERE clause to filter out the rows that are not within that polygon. You can also use the ST_DWithin() function to filter by distance from a given point. The next example demonstrates the final query, which scans a directory and returns the metadata for any photos within a given polygon—in this case, San Jose, CA:

SELECT *
FROM dfs.pics.`*.jpg` 
WHERE ST_Within(
   ST_Point(t.GPS.GPSLongitude, t.GPS.GPSLatitude), 
   ST_GeomFromText(
    'POLYGON((-121.95 37.28, -121.94 37.35, -121.84 37.35, -121.84 37.28, 
        -121.95 37.28))'
    ) 
);

Drilling Excel Files

Charles has worked for a management consultancy in the past, and in many instances has been asked to assist in helping a client out with “big data,” which is a US government term for a shared drive full of random Excel files. Although this may seem like a trivial task, analyzing this data can be quite time-consuming, especially if the files themselves are large or there are a great deal of them.

Unfortunately, Drill cannot natively query Excel files. As of Office 2007, Microsoft adopted an XML format with the idea of enabling its data to be read by other applications. Luckily, there are open source Java libraries to parse the Office Open XML format, and because Excel files are by their nature tabular data, and the data is self-describing, these files lend themselves well to being read by Drill. Therefore, enabling Drill to query Excel files can dramatically reduce the amount of time and effort it takes to analyze this kind of data.

You learned how to write a custom format plug-in in Chapter 12, and this section demonstrates a practical use case for writing one. You can find the complete code for this storage plug-in on GitHub.

The pom.xml File

The first step in writing the Excel plug-in is identifying the dependencies. Essentially, the format plug-in will be a wrapper for the Apache POI libraries, which are a collection of Java libraries that read the Office Open XML files.3 You must include these libraries in the dependencies in your pom.xml file, as shown here:

<dependency>
  <groupId>org.apache.poi</groupId>
  <artifactId>poi</artifactId>
  <version>3.17</version>
</dependency>
<dependency>
  <groupId>org.apache.poi</groupId>
  <artifactId>poi-ooxml</artifactId>
  <version>3.17</version>
</dependency>
<dependency>
  <groupId>org.apache.poi</groupId>
  <artifactId>poi-ooxml-schemas</artifactId>
  <version>3.17</version>
</dependency>
<dependency>
  <groupId>org.apache.poi</groupId>
  <artifactId>poi-scratchpad</artifactId>
  <version>3.17</version>
</dependency>
<dependency>
  <groupId>org.apache.poi</groupId>
  <artifactId>ooxml-schemas</artifactId>
  <version>1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.poi</groupId>
  <artifactId>openxml4j</artifactId>
  <version>1.0-beta</version>
</dependency>

After you have written the pom.xml file, the next step is to write the format plug-in, which in this case consists of the ExcelFormatPlugin class, which implements the EasyFormatPlugin interface; the ExcelFormatPluginConfig class, which implements the FormatPluginConfig interface; and finally the ExcelRecordReader class, which extends the AbstractRecordReader class.

Most of the code for ExcelFormatPlugin and ExcelFormatPluginConfig is boilerplate code, with the only real modifications being configuration variables that are passed to the record reader. Because Excel files often have headers, footers, and other nontabular content, it is useful for the user to be able to define a region in the spreadsheet where the data is and ignore the rest. Therefore, the Excel format plug-in has configuration variables indicating the first and last column and row where the data exists.

The Excel Custom Record Reader

The ExcelRecordReader class is where most of the work actually happens. The setup() function, shown in the code that follows, sets up the RecordReader and defines all the variables that are needed for reading the rest of the data. As with UDFs, the setup() function in this interface is executed once, and the next() function is executed for each row of your data. Because it is possible in Excel to determine the number of columns in the spreadsheet as well as the header names, the format plug-in will perform both of these operations.

Because this plug-in implements the EasyFormatInterface, the example that follows creates a workbook object that represents the spreadsheet in memory. The first thing that you have to do is get the column headers and data types:

//Get the workbook
this.workbook = new XSSFWorkbook(this.fsStream.getWrappedStream());

//Evaluate formulae
FormulaEvaluator evaluator = workbook.getCreationHelper()
                                     .createFormulaEvaluator();

//Get the sheet
this.sheet = workbook.getSheetAt(0);
this.workbook.setMissingCellPolicy(Row.MissingCellPolicy
                                      .CREATE_NULL_AS_BLANK);

//Get the field names
int columnCount = sheet.getRow(0).getPhysicalNumberOfCells();
this.excelFieldNames = new String[columnCount];
this.rowIterator = sheet.iterator();

if (rowIterator.hasNext()) {
   while (this.lineCount < config.headerRow) {
      Row row = rowIterator.next();
      this.lineCount++;
   }
Row row = rowIterator.next();
this.totalColumnCount = row.getLastCellNum();

In the preceding code, the second line creates a FormulaEvaluator object. You will use this object when you read in the lines of data to evaluate any cells that contain Excel formulae. The next two lines get the appropriate sheet and set the appropriate missing cell policy.

The next step is to get the dimensions of the data and then set up the data types of the columns and their names. The library we are using allows you to determine whether a cell contains a numeric value or a string; after you’ve done that, you can extract the value from the cell. This will become the column name. Here is the full code for the setup() function:

  public void setup(final OperatorContext context, 
                    final OutputMutator output) 
                    throws ExecutionSetupException {
    this.writer = new VectorContainerWriter(output);
    try {
      this.workbook = new XSSFWorkbook(this.fsStream
                                           .getWrappedStream());

      //Evaluate formulae
      FormulaEvaluator evaluator = workbook.getCreationHelper()
                                           .createFormulaEvaluator();
      this.sheet = workbook.getSheetAt(0);
      this.workbook.setMissingCellPolicy(Row
                                            .MissingCellPolicy
                                            .CREATE_NULL_AS_BLANK);

      //Get the field names
      int columnCount = sheet.getRow(0).getPhysicalNumberOfCells();
      this.excelFieldNames = new String[columnCount];
      this.rowIterator = sheet.iterator();

      if (rowIterator.hasNext()) {
        while (this.lineCount < config.headerRow) {
          Row row = rowIterator.next();
          this.lineCount++;
        }
        Row row = rowIterator.next();
        this.totalColumnCount = row.getLastCellNum();

        Iterator<Cell> cellIterator = row.cellIterator();
        int colPosition = 0;
        while (cellIterator.hasNext()) {
          Cell cell = cellIterator.next();
          CellValue cellValue = evaluator.evaluate(cell);
          switch (cellValue.getCellTypeEnum()) {
            case STRING:
              this.excelFieldNames[colPosition] = cell
                               .getStringCellValue()
                               .replaceAll("_", "__")
                               .replace(PARSER_WILDCARD, SAFE_WILDCARD)
                               .replaceAll("\.", SAFE_SEPARATOR);
              break;
            case NUMERIC:
              this.excelFieldNames[colPosition] = String
                                              .valueOf(cell
                                              .getNumericCellValue());
              break;
          }
          colPosition++;
        }
      }
    } catch (java.io.IOException e) {
      throw UserException.dataReadError(e).build(logger);
    }
  }

The other method that you must implement is the next() method, which is executed on every row of data and maps the cells to the appropriate columns or vectors. The function that follows contains a lot of boilerplate code, but the next() function maps the data to the appropriate columns:

 public int next() {
    this.writer.allocate();
    this.writer.reset();

    //Get the configuration variables  
    int skipRows = config.headerRow;
    int recordCount = 0;
    int sheetCount = workbook.getNumberOfSheets();
    int lastRow = config.lastRow;

    //Create the base MapWriter
    BaseWriter.MapWriter map = this.writer.rootAsMap();
    int colPosition = 0;
    FormulaEvaluator evaluator = workbook.getCreationHelper()
                                         .createFormulaEvaluator();

    try {
      
       //Iterate through the rows
       while (recordCount > lastRow && rowIterator.hasNext()) {
        lineCount++;
        if (recordCount > 0) {
          this.writer.setPosition(recordCount);
          map.start();
        }
        //Get the row 
        Row row = rowIterator.next();
        //Get a cell iterator 
        Iterator<Cell> cellIterator = row.cellIterator();

        colPosition = 0;
        String fieldName;
        if (row.getLastCellNum() < totalColumnCount) {
          System.out.println("Wrong number of columns in row.");
        }
        
        //Iterate through the columns
        for (int cn = 0; cn < totalColumnCount; cn++) {

          Cell cell = row.getCell(cn);

          CellValue cellValue = evaluator.evaluate(cell);
          fieldName = excelFieldNames[colPosition];
          
          //Assign values to appropriate columns
          if (cellValue == null) {
            String fieldValue = "";
            byte[] bytes = fieldValue.getBytes("UTF-8");
            this.buffer.setBytes(0, bytes, 0, bytes.length);
            map.varChar(fieldName).writeVarChar(0, bytes.length, 
                                                   buffer);
          } else {
            switch (cellValue.getCellTypeEnum()) {
              case NUMERIC:
                if (DateUtil.isCellDateFormatted(cell)) {
                  map.timeStamp(fieldName).writeTimeStamp(cell
                                          .getDateCellValue()
                                          .getTime());
                } else {
                  double fieldNumValue = cell.getNumericCellValue();
                  map.float8(fieldName).writeFloat8(fieldNumValue);
                }
                break;
              case STRING:
                String fieldValue = "";
                fieldValue = cellValue.formatAsString();
                if (fieldValue.length() > 1) {
                  fieldValue = fieldValue.substring(1);
                  fieldValue = fieldValue.substring(0, 
                                            fieldValue.length() - 1);
                }

                byte[] bytes = fieldValue.getBytes("UTF-8");
                this.buffer.setBytes(0, bytes, 0, bytes.length);
                map.varChar(fieldName).writeVarChar(0, 
                                               bytes.length, buffer);
                break;
              case FORMULA:
                break;
              case BLANK:
                break;
            }
          }
          colPosition++;
        }
        map.end();
        recordCount++;
      }


      this.writer.setValueCount(recordCount);
      return recordCount;

    } catch (final Exception e) {
      throw UserException.dataReadError(e).build(logger);
    }
  }

Using the Excel Format Plug-in

After you have built and installed the format plug-in, the next step is to set up the format in the appropriate storage plug-in. Here are the default settings:

"excel": {
   "type": "excel",
    "extensions": ["xlsx"],
    "headerRow": 0,
    "lastRow": 1000000,
    "sheetNumber": 0,
    "firstColumn": 0,
    "lastColumn": 0
}

Because Excel files often contain formatting and other nondata cells, it is useful to be able to define a region within the spreadsheet where the data exists, and the variables lastRow, firstColumn, and lastColumn enable you to do that. Additionally, the headerRow variable allows you set the row index of the column headers. Remember that the indexing starts at zero. You can also set the sheetNumber to query various sheets within the Excel files.

At this point, you are ready to query Excel files as if they were any other data table in Drill! As noted earlier, the Excel plug-in will interpret data types from the Excel file, so there is no need to use the CAST() and TO_ functions for calculations.

Next, we’ll explore a few practical examples where analysts created UDFs to solve particular problems in Drill. These might serve as an inspiration for your own customization efforts.

Network Packet Analysis (PCAP) with Drill

A core component of any network security program is analyzing raw data that is coming over the wire. This raw network data is captured in a format called Packet Capture (PCAP) or PCAP Next Generation (PCAP-NG) and can be challenging to analyze because it is a binary format. One of the most common tools for analyzing PCAP data is called Wireshark. Even though Wireshark is a capable tool, it is limited in that it can only analyze data that fits in your system’s memory.

Drill can query a PCAP or PCAP-NG file and retrieve fields including the following:

  • Protocol type (TCP/UDP)

  • Source/destination IP address

  • Source/destination port

  • Source/destination MAC address

  • Date and time of packet creation

  • Packet length

  • TCP session number and flags

  • The packet data in binary form

Querying PCAP or PCAP-NG requires no additional configuration settings, so out of the box, your Drill installation can query them both.

Examples of Queries Using PCAP Data Files

Let’s take the example of finding a SYN scan. If you are not familiar with the technique, a SYN scan is a passive technique intended to identify open ports on a victim machine. A SYN scan works by the attacker first attempting to open a connection on a victim machine by sending a TCP SYN packet. Next, if the port is open, the victim will respond with a SYN/ACK packet. If this were a normal connection, the client would respond with an ACK packet and a connection would be opened. However, in this case, because the goal is simply to determine open ports, the attacker does not send the final ACK packet.

Using the example file synscan.pcap,4 if you execute the following query, you will see an example of a SYN scan in progress. First, let’s see if we can find examples of sessions with more than one packet. By aggregating on the TCP session ID and counting the number of packets, we can find these suspicious connections. The following query illustrates how to do that:

SELECT tcp_session, COUNT(*) AS packet_count
FROM dfs.test.`synscan.pcap`
GROUP BY tcp_session HAVING COUNT(*) > 1

Figure 13-1 shows the results of this query.

Figure 13-1. SYN scan query results

Next, we could run queries on each of the session IDs, but let’s take a look at the first one, containing five packets. This next query retrieves the sessions:

SELECT * 
FROM dfs.test.`synscan.pcap`
WHERE tcp_session=6346604732028469374

In Figure 13-2, you can see the results of this query. The important column to note is the tcp_parsed_flags column.

Figure 13-2. SYN scan results (some columns omitted)

In this example, note the exact pattern of a SYN scan. The first packet is a SYN packet followed by a series of ACK/SYN packets with no corresponding ACK packet.

Automating the process using an aggregate function

As you can see in the previous example, it is possible to identify possible SYN scan attacks with a series of SQL queries. However, another way to approach this is to write an aggregate function that returns true or false depending on whether a given session is a syn-flood attempt. We have already covered the mechanics of writing UDFs in detail in previous chapters, and the complete code for this UDF is in the book’s GitHub repository.

Here’s the pseudocode for such an aggregate function:

  1. Group by the TCP session ID.

  2. If there is a SYN flag set, increment a counter variable.

  3. If the session has seen the SYN flag and there is an ACK flag set (SYN/ACK), increment the counter.

  4. If the session is never closed (ACK never received), return true; otherwise, return false.

The example that follows shows the heart of the UDF:5

@Override
public void add() {
  if(syn.value == 1 && connectionStatus.value == 0){
    //New connection attempt  SYN received and awaiting SYN/ACK
    connectionStatus.value = 1;
  } else if(connectionStatus.value == 1 && synAck.value == 1){
    //Connection status of 2 indicates SYN/ACK
    //has been received and are awaiting the final ACK
    connectionStatus.value = 2;
  } else if(connectionStatus.value == 2 && syn.value == 0 && ack.value == 1) {
    //ACK received, connection established
    connectionStatus.value = 3;
  }
}

@Override
public void output() {
  if(connectionStatus.value == 2) {
    out.value = 1;
  } else {
    out.value = 0;
  }
}

@Override
public void reset() {
  connectionStatus.value = 0;
}

After you have installed the UDF, you could call the function using the following query:

SELECT tcp_session, 
    is_syn_scan(tcp_session, tcp_flags_syn, tcp_flags_ack)
FROM file.pcap
GROUP BY tcp_session

Alternatively, you could use the function in the WHERE clause to identify session IDs that are possible SYN scan attempts, as in the following query:

SELECT tcp_session
FROM file.pcap
GROUP BY tcp_session 
HAVING is_syn_scan(tcp_session, tcp_flags_syn, tcp_flags_ack)

The results of this query would display the sessions that are possible SYN scans.

Analyzing Twitter Data with Drill

Data from Twitter has proven to be a valuable source for a wide range of analytic questions. Although there are many libraries and sources for Twitter data, in many instances they omit the metadata that is quite valuable for analysis.

To deal with this problem, Bob Rudis of Rapid7 wrote a collection of UDFs to extract metadata from tweets. These UDFs include the following:

tw_parse_tweet(VARCHAR)

Parses the tweet text and returns a map column with the following named values:

weightedLength

(INT) Indicates the overall length of the tweet with code points weighted per the ranges defined in the configuration file.

permillage

(INT) Indicates the proportion (per thousand) of the weighted length in comparison to the max weighted length. A value >1,000 indicates input text that is longer than the allowable maximum.

isValid

(BOOLEAN) Indicates whether input text length corresponds to a valid result.

display_start/display_end

(INT) Provide indices identifying the inclusive start and exclusive end of the displayable content of the Tweet.

valid_start/valid_end

(INT) Provide indices identifying the inclusive start and exclusive end of the valid content of the Tweet.

tw_extract_hashtags(VARCHAR)

Extracts all hashtags in the tweet text into a list that can be FLATTEN()ed.

tw_extract_screennames(VARCHAR)

Extracts all screen names in the tweet text into a list that can be FLATTEN()ed.

tw_extract_urls(VARCHAR)

Extracts all URLs in the tweet text into a list that can be FLATTEN()ed.

tw_extract_reply_screenname()

Extracts the reply screen name (if any) from the tweet text into a VARCHAR.

This collection of functions is a great example of writing a custom UDF to extract artifacts from data.

Using Drill in a Machine Learning Pipeline

Let’s begin with the bad news: you cannot do machine learning directly in Drill. However, Drill can be a useful component in a machine learning pipeline. Figure 13-3 illustrates the various phases of the machine learning process. The complete code for this example is posted in the UDFs folder of the book’s GitHub repository, under machine_learning.

Figure 13-3. The supervised machine learning pipeline

Making Predictions Within Drill

Even though you can’t train a model in Drill, after you’ve built one, you can use that model to make predictions with data that Drill can query. We demonstrate this using H20, a notebooking product focused on machine learning to build a model and serialize it. Then, we wrap it in a custom UDF.

The H20 platform is very useful for developing machine learning models, but one of its key features is that after you have built and trained a model, you can serialize and export the model as either a Plain Old Java Object (POJO) or a Maven Old Java Object (MOJO).6

Building and Serializing a Model

The first step is building and serializing a model in H20. The code that follows (from the H20 documentation) demonstrates how to create and train a model—in this case using sample data—and save it as a MOJO. If you are familiar with machine learning in Python, the general pattern should look familiar to you. H20 also features an R interface, and you can do the same thing in R:

#Example code from 
#http://docs.h2o.ai/h2o/latest-stable/h2o-docs/productionizing.html

import h2o 
from h2o.estimators.gbm 
import H2OGradientBoostingEstimator 

h2o.init() 
h2o_df = h2o.load_dataset("prostate.csv") 
h2o_df["CAPSULE"] = h2o_df["CAPSULE"].asfactor() 
model=H2OGradientBoostingEstimator(distribution="bernoulli", 
                    ntrees=100, 
                    max_depth=4, 
                    learn_rate=0.1) 

#Train the model
model.train(y="CAPSULE", 
            x=["AGE","RACE","PSA","GLEASON"], 
            training_frame=h2o_df)

#Save the model as a MOJO
modelfile = model.download_mojo(path="experiment/", 
                            get_genmodel_jar=True)

After you’ve executed this code, you will have a JAR file in a folder called experiment. You will need to copy this as well as the h2o-genmodel.jar file to your Drill site folder, as described in “Building and Installing Your UDF”.

Writing the UDF Wrapper

Now that you have the JAR files for the model, the next step is to write a UDF that wraps the functionality of the model. Because you will be using data of identical structure to that with which you trained your model, your prediction function in your UDF must have the same structure as the data that you used to train the model.

MOJO Models Accept Only Doubles as Input

When creating a wrapper function for an H20 MOJO, remember that all your features must be Java doubles. Therefore, all your input parameters must be converted to doubles.

Example 13-1 shows the setup() function for the Drill UDF. There are five input parameters that are all Float8Holders and correspond to the structure of the original data. The @Workspace annotation contains the actual model, which we initialize in the setup() function and use in the eval() function.

Example 13-1. Intro and setup() function for H20 wrapper
@FunctionTemplate(name = "binomal_prediction",
    scope = FunctionTemplate.FunctionScope.SIMPLE,
    nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)

public static class Binomal_Prediction implements DrillSimpleFunc {

  @Param
  Float8Holder age;

  @Param
  Float8Holder race;

  @Param
  Float8Holder dcaps;

  @Param
  Float8Holder vol;

  @Param
  Float8Holder gleason;

  @Workspace
  hex.genmodel.easy.EasyPredictModelWrapper model;

  @Output
  BitHolder out;

  @Override
  public void setup() {
    try {
      model = new hex.genmodel.easy
                        .EasyPredictModelWrapper(hex.genmodel 
                        .MojoModel
.load("/Users/cgivre/experiment/GBM_model_python_1527558187517_1.zip"));
    } catch (Exception e){
      //Handle exception
    }
  }

The eval() function in the UDF is really quite simple, as well. All it needs to do is map the values from the input into an H20 RowData object and then call the model’s predictBinomial() function. This model is a classifier, so you can access the prediction probabilities as well by calling the predictBinomial() function on the results of the eval() function, as demonstrated here:

public void eval() {
  hex.genmodel.easy.RowData row = new hex.genmodel.easy.RowData();
  row.put("AGE", age.value);
  row.put("RACE", race.value);
  row.put("DCAPS", dcaps.value);
  row.put("VOL", vol.value);
  row.put("GLEASON", gleason.value);
  
  try {
    hex.genmodel.easy.prediction.BinomialModelPrediction p = model
                                                .predictBinomial(row);
    if(p.label.equals("1")) {
      out.value = 1;
    } else {
      out.value = 0;
    }
  } catch(Exception e) {
    //Handle exception
  }
}

You can see that this UDF returns true or false depending on the model’s prediction. H20 has other types of models, for regression, clustering, and more. If you use one of those, the prediction functions are slightly different.

Making Predictions Using the UDF

After you have installed the wrapper UDF, you can use it in a query to obtain a list of predictions, as shown here:

SELECT binomial_prediction(`age`,
    `race`,
    `dcaps`,
    `vol`,
    `gleason`) AS prediction
FROM dfs.book.`ml_sample.csvh`
LIMIT 5

This query would yield results like those shown in Table 13-1.

Table 13-1. Machine learning prediction results
Prediction
TRUE
TRUE
FALSE
TRUE
FALSE

Conclusion

In this final chapter, you have seen several examples of how Drill can facilitate a wide variety of data analyses, including network security, image metadata, and even machine learning. By extending the code base with some simple UDFs and format plug-ins, its possible to extend Drill even further and unlock all kinds of data for rapid ad hoc analysis. 

1 EXIF metadata contains information about an image or any digital media. The metadata includes information about the device that captured the media and more.

2 You can find a complete list of Drill’s GIS functions in Appendix A.

3 For more information, check out the complete documentation for the Apache POI libraries.

4 The source of this file is available from Chris Sanders’s website.

5 In this example, there are many ways of parsing the TCP flags; however, this UDF is implemented to maximize readability and hence does not use any techniques that use bit shifting.

6 POJOs and MOJOs are ways of serializing an object for reuse in other programs, similar to Python pickles.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset