Now that we have set up the data to be consumed by a CSV file into Logstash, followed by parsing and processing based on the data type needed, we now need to put the data in Elasticsearch so that we can index the different fields and consume them later via the Kibana interface.
We will use the output
plugin of Logstash for an elasticsearch
output.
A typical elasticsearch
plugin configuration looks like this:
output { elasticsearch { action => # string (optional), default: "index" cluster => # string (optional) host => # string (optional) document_id => # string (optional), default: nil index => # string (optional), default: "logstash-%{+YYYY.MM.dd}" index_type => # string (optional) port => # string (optional) protocol => # string, one of ["node", "transport", "http"] (optional) } }
action
: This specifies what action to perform on incoming documents. The default is "index"
and possible values are "index"
or "delete"
. The "index"
value will index a document and "delete"
will delete a document based on document ID.cluster
: This is the name of the cluster set in elasticsearch
.host
: This is the hostname or IP address of the elasticsearch
.document_id
: This is the document ID of the index; it is useful to delete or overwrite the existing entries.index
: This is the index name to which the incoming events have to be written. By default, it is indexed based on each day, and named as "logstash-%{+YYYY.MM.dd}"
.index_type
: This specifies the index type to write events to. This is to ensure that you write similar types of events to the same index type.port
: This specifies the port to be used for the elasticsearch
service.protocol
: This specifies the protocol to be used to connect with Elasticsearch. The values are "http"
, "node"
, and "transport"
.Now, let's take a look at our elasticsearch
output configuration:
output{ elasticsearch { host => "localhost" }}
We used the default value for index and most of the other settings.
Now, when we have seen how individual plugins are configured, let's take a look at what the overall Logstash configuration looks like:
input{ file{ path =>"/opt/logstash/input/GOOG.csv" start_position =>"beginning" } } filter{ csv{ columns => ["date_of_record","open","high","low","close","volume","adj_close"] separator => "," } date { match => ["date_of_record","yyyy-MM-dd"] } mutate { convert => ["open","float"] convert => ["high","float"] convert => ["low","float"] convert => ["close","float"] convert => ["volume","integer"] convert => ["adj_close","float"] } } output{ elasticsearch { host => "localhost" } }
We will save this configuration in the Logstash installation folder with the name logstash.conf
, and as we saw earlier, we can run Logstash with this configuration using the following command:
$ bin/logstash –f logstash.conf
Logstash will start to run with the defined configuration and keep on indexing all incoming events to the elasticsearch
indexes. You may see an output similar to this on the console:
May 31, 2015 4:04:54 PM org.elasticsearch.node.internal.InternalNode start INFO: [logstash-4004-9716] started Logstash startup completed
At this point, we can open the elasticsearch Kopf
plugin console to verify whether we have some documents indexed already, and we can also query the documents.
As we can see that there are 129 documents indexed already, we verified that our Logstash configuration worked well.