Filtering and processing input

Once we configure the input file, we need to filter the input based on our needs so that we can identify which fields we need, and process them as per the required analysis.

A filter plugin will perform the intermediary processing on the input event. We can apply the filter conditionally based on certain fields.

Since our input file is a CSV file, we will use the csv filter for the same. The csv filter takes an event field that contains CSV formatted data, parses it, and stores it as individual fields. It can also parse data with any separator other than commas. A typical csv filter is as follows:

filter {  
    csv {
        columns => #Array of column names.
        separator => #String ; default -","
    }
}

The attribute columns take the name of fields in our CSV file, which is optional. By default, the columns will be named as column 1, column 2, and so on.

The attribute separator defines what character is used to separate the different columns in the file. The default is a comma, but it can be any other separator too.

In our example, we can specify a simple csv filter as follows:

filter {  
    csv {
    columns => ["date_of_record","open","high","low","close","volume","adj_close"]
        separator => ","
    }
}

Here, we specified the column names as defined in our CSV file, and explicitly defined the separator as a comma just to make it clear.

Now, we are done with csv filter configuration, but we still need to do some intermediary processing on the columns to associate specific data types with our columns.

First of all, we need to specify which column represents the date field so that it can be explicitly indexed as date type and can be used to filter based on date. Logstash has a specific filter called date for the same. A typical date filter looks as follows:

filter {
  date {
    match =>  # array (optional), default: []
    target =>  # string (optional), default: "@timestamp"
    timezone => # string (optional)
}

}

Here, in the match attribute, we define an array, which is in the [field, formats]format; that is, field, followed by a set of time formats that can be applied to that field. For example, if our log file has multiple formats, we can use the the following code:

match => [ "date_field", "MMM dd YYY HH:mm:ss",
          "MMM  d YYY HH:mm:ss",  "MMddYYYY","ISO8601" ]

Note

Date formats in Logstash: Date formats allowed are as per the allowed JodaTime DateTimeFormat library:

http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html

As per our date format, our date filter will be as follows:

date{
match => ["date_of_record", "yyyy-MM-dd"]
target => "@timestamp"
}

The target filter defines where to map the matching timestamp. By default, it maps to @timestamp (the field that represents the time stamp of the event, which defaults to the time when the event was captured). In our case, since we are taking some historic data, we don't want the event captured time to be in @timestamp, but the date of record. We will map our date field to @timestamp. It is not mandatory to define this, but recommended to use.

After updating the data type of date fields, the next operation we require is updating the data type of fields, which we need for numeric comparisons or operations. By default, the value will be of string data type. We will convert them to integers so that we can perform some aggregations and comparisons on the data.

We will use mutate filter for the conversion of fields to a specific data type. This filter helps perform general mutations on the fields, which includes modifications of data types, renaming, replacing fields, and removing fields. It can also help merge two fields, perform uppercase and lowercase conversions, split and strip fields, and so on.

A typical mutate filter looks like this:

filter {
  mutate {
    
    convert => # hash of field and data type (optional)
    join =>  # hash of fields to be joined (optional)
    lowercase =>  # array of fields to be converted (optional)
    merge =>  # hash  of fields to be merged (optional)
    rename =>  # hash of original and rename field (optional)
    replace => # hash of fields to replaced with (optional)
    split =>  # hash of fields to be split (optional)
    strip => # array of fields (optional)
    uppercase =>  # array of fields (optional)
}

}

Let's see what our mutate filter looks like:

mutate {

convert => ["open","float"]

convert => ["high ","float"]

convert => ["low ","float"]
convert => ["close ","float"]

convert => ["volume","integer"]
convert => ["adj_close","float"]

}

We are using the convert functionality to convert our price and volume fields to integer. Valid data types are "integer", "float", and "string".

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset