Once we configure the input file, we need to filter the input based on our needs so that we can identify which fields we need, and process them as per the required analysis.
A
filter
plugin will perform the intermediary processing on the input event. We can apply the filter conditionally based on certain fields.
Since our input file is a CSV file, we will use the csv
filter for the same. The csv
filter takes an event field that contains CSV formatted data, parses it, and stores it as individual fields. It can also parse data with any separator other than commas. A typical csv
filter is as follows:
filter { csv { columns => #Array of column names. separator => #String ; default -"," } }
The attribute
columns take the name of fields in our CSV file, which is optional. By default, the columns will be named as column 1
, column 2
, and so on.
The attribute
separator defines what character is used to separate the different columns in the file. The default is a comma, but it can be any other separator too.
In our example, we can specify a simple csv
filter as follows:
filter { csv { columns => ["date_of_record","open","high","low","close","volume","adj_close"] separator => "," } }
Here, we specified the column names as defined in our CSV file, and explicitly defined the separator as a comma just to make it clear.
Now, we are done with csv
filter configuration, but we still need to do some intermediary processing on the columns to associate specific data types with our columns.
First of all, we need to specify which column represents the date
field so that it can be explicitly indexed as date type and can be used to filter based on date. Logstash has a specific filter called date
for the same. A typical date
filter looks as follows:
filter { date { match => # array (optional), default: [] target => # string (optional), default: "@timestamp" timezone => # string (optional) } }
Here, in the match
attribute, we define an array, which is in the [field, formats]
format; that is, field, followed by a set of time formats that can be applied to that field. For example, if our log file has multiple formats, we can use the the following code:
match => [ "date_field", "MMM dd YYY HH:mm:ss", "MMM d YYY HH:mm:ss", "MMddYYYY","ISO8601" ]
Date formats in Logstash: Date formats allowed are as per the allowed JodaTime DateTimeFormat
library:
http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
As per our date
format, our date
filter will be as follows:
date{ match => ["date_of_record", "yyyy-MM-dd"] target => "@timestamp" }
The target
filter defines where to map the matching timestamp. By default, it maps to @timestamp
(the field that represents the time stamp of the event, which defaults to the time when the event was captured). In our case, since we are taking some historic data, we don't want the event captured time to be in @timestamp
, but the date of record. We will map our date
field to @timestamp
. It is not mandatory to define this, but recommended to use.
After updating the data type of date
fields, the next operation we require is updating the data type of fields, which we need for numeric comparisons or operations. By default, the value will be of string
data type. We will convert them to integers so that we can perform some aggregations and comparisons on the data.
We will use mutate
filter for the conversion of fields to a specific data type. This filter helps perform general mutations on the fields, which includes modifications of data types, renaming, replacing fields, and removing fields. It can also help merge two fields, perform uppercase and lowercase conversions, split and strip fields, and so on.
A typical mutate
filter looks like this:
filter { mutate { convert => # hash of field and data type (optional) join => # hash of fields to be joined (optional) lowercase => # array of fields to be converted (optional) merge => # hash of fields to be merged (optional) rename => # hash of original and rename field (optional) replace => # hash of fields to replaced with (optional) split => # hash of fields to be split (optional) strip => # array of fields (optional) uppercase => # array of fields (optional) } }
Let's see what our mutate
filter looks like:
mutate { convert => ["open","float"] convert => ["high ","float"] convert => ["low ","float"] convert => ["close ","float"] convert => ["volume","integer"] convert => ["adj_close","float"] }
We are using the convert
functionality to convert our price
and volume
fields to integer
. Valid data types are "integer"
, "float",
and "string"
.