In previous chapters, one of our areas of focus was looking at how data can be indexed and searched on Elasticsearch. We looked at index mappings and the importance of defining correct mappings in downstream use cases such as computing aggregations, running alerting, and using machine learning features.
In this chapter, we look at how ETL tools such as Logstash can be used to extract data from a range of source systems (such as Syslog streams, CSV files, message-streaming platforms, or Beats agents), and transform events to their desired format before loading them into Elasticsearch. Upon completion of this chapter, you will be able to use Logstash to process and ingest a variety of data sources into Elasticsearch.
In this chapter, we will specifically focus on the following:
To follow along with the sample Logstash configurations for real-world data-processing use cases, you will need access to a virtual machine with Logstash installed or the ability to run Logstash on your host machine, as shown in Chapter 2, Installing and Running the Elastic Stack.
The code for this chapter can be found on the GitHub repository for the book: https://github.com/PacktPublishing/Getting-Started-with-Elastic-Stack-8.0/tree/main/Chapter7.
In Chapter 6, Collecting and Shipping Data with Beats, we explored how a key characteristic of modern IT environments is the concept of valuable data being generated in multiple parts of the technology stack. While Beats go a long way in collecting this data to send to Elasticsearch, a key challenge is transforming data to make it useful for search and analysis.
Logstash is a flexible Extract, Transform, Load (ETL) tool designed to solve this problem. While Logstash has no real dependency on Elasticsearch and Beats and can be used for any generic ETL use case, it plays a key role as part of the Elastic Stack.
Logstash is generally used in two main ways as part of the Elastic Stack:
Logstash can act as the receiver for data from sources such as Beats agents or Syslog streams. It can also listen for data over HTTP for any compatible source system to send events through.
Logstash can extract data from source systems such as Kafka (or other event streams), tail log files on the Logstash host, poll an HTTP API to retrieve data, or read files on object stores such as Amazon S3 or Google Cloud Storage (GCS).
Once data is input, Logstash can perform operations such as parsing out important fields, transforming nonstandard data formats/codecs into JSON, enriching events with contextual information, sanitizing field values, and dropping unnecessary event types. Both approaches heavily leverage the versatility and flexibility Logstash provides in dealing with a multitude of different source systems. The two models are not mutually exclusive and can be used in a hybrid fashion if required.
Logstash is designed to run as a standalone component to load data into Elasticsearch (among other destination systems). Logstash is a plugin-based component, meaning it is highly extensible in what sorts of source/destination systems it supports and transformations it can do.
It is worth understanding the following concepts in relation to Logstash:
Logstash works by running one or more Logstash pipelines as part of a Logstash instance to process ETL workloads.
Logstash is not a clustered component and cannot be made aware of other Logstash instances. Multiple Logstash instances can be used for high availability and scaling requirements by load-balancing data across instances.
Logstash has the ability to use an in-memory or a disk-based (persistent) queue within pipelines. The use of persistent queuing allows for short-term buffering of events from inputs before being processed, lending to the resiliency of ingestion pipelines at a slight performance cost.
Settings and configuration options for Logstash are defined in the logstash.yml configuration file.
A full list of supported settings can be found in the reference guide: https://www.elastic.co/guide/en/logstash/8.0/logstash-settings-file.html
The following settings are worth noting when setting up your Logstash instance:
Next, we will look at running your first Logstash pipeline.
It is easy to get up and running with Logstash. In this section, we will look at a simple Logstash pipeline to take simple string input and output the text as part of a JSON event.
Important Note
If Logstash is installed using a package manager, the examples in this chapter need to be run as the root user when run interactively. The logstash user is used when run as a systemd service.
Follow these steps to run your first Logstash pipeline:
input {
stdin {}
}
The filter block of the pipeline is where event processing happens. This can include parsing, transformations, enrichment, and sanitization tasks. In this pipeline, a field called description is added to the event with the "First pipeline!" value. Logstash will add this field to every event coming through the input:
filter {
mutate {
add_field => { "description" => "First pipeline!" }
}
}
Finally, the pipeline is configured to send the event to the standard output interface, so the events are visible to the terminal running Logstash:
output {
stdout {}
}
echo -e 'Hello world!' | /usr/share/logstash/bin/logstash -f logstash-pipeline.conf
You should see the following output from Logstash. As expected, the message field contains the text input via standard input, while the description field contains the static text added in the filter block. Logstash also adds some additional fields, such as @timestamp, @version, and host, to indicate the event-processing time, the version of the pipeline, and the host Logstash runs on respectively. Some of these additional fields can be removed if desired, as shown in later sections of this chapter:
echo 'Hello world x1 Hello world x2' | /usr/share/logstash/bin/logstash -f logstash-pipeline.conf
The character indicates a new line in the input string, which is interpreted as a separate message by Logstash. You should see two events produced by Logstash, as shown in the following screenshot:
Now that we understand the very basics of stashing your first event using Logstash, let's look at some more complex scenarios that you might encounter in the real world.
This section will explore a number of real-world ETL scenarios and the corresponding Logstash pipelines that can be used to implement them.
The full datasets used and the pipeline configuration files can be found in the code repository for the book.
Comma-Separated Value (CSV) files are a commonly used file format and can be easily generated by a range of source systems and tools. We will explore how a dataset containing taxi trip details from the city of Chicago can be parsed and loaded into Elasticsearch for analysis.
Navigate to Chapter7/processing-csv-files in the code repository and explore the chicago-taxi-data.csv file. The first row contains header information, indicating what information each column contains. The following screenshot is an extract of some of the key fields in the file:
Follow these instructions to process the CSV dataset:
input {
file {
path => "/<your_path_to>/processing-csv-files/chicago-taxi-data.csv"
start_position => "beginning"
sincedb_clean_after => "1 s"
}
}
The pipeline accepts input from the file plugin, used to read the CSV file on disk. Replace the value of the path with the absolute path to the chicago-taxi-data.csv file on your host. The start_position parameter tells the file input plugin to read the file from the beginning when Logstash runs, while the sincedb_clean_after parameter tells Logstash to forget the current position of the file input after 1 second of inactivity.
Important Note
We use the sincedb parameter here to be able to replay the CSV file repeatedly for demonstration purposes. You would generally remove this parameter in a production setting where you do not want to replay the same data through Logstash.
More details and configuration options for the file input plugin for Logstash can be found in the reference guide: https://www.elastic.co/guide/en/logstash/8.0/plugins-inputs-file.html.
Before processing, a sample event looks as follows:
The CSV filter plugin is used to parse out the data into JSON. The skip_header parameter tells the plugin to skip the first line in the file, and the columns parameter defines the CSV column names in the order that they appear in the file. The resulting data from the plugin will be in JSON format:
csv {
skip_header => true
columns => ["unique_key","taxi_id","trip_start_timestamp","trip_end_timestamp","trip_seconds","trip_miles","pickup_census_tract","dropoff_census_tract","pickup_community_area","dropoff_community_area","fare","tips","tolls","extras","trip_total","payment_type","company","pickup_latitude","pickup_longitude","pickup_location","dropoff_latitude","dropoff_longitude","dropoff_location"]
}
Now that the event is mostly in JSON, let's look at how some of the fields can be cleaned up for analysis in Elasticsearch.
dissect {
mapping => {
"pickup_location" => "POINT (%{pickup_location} %{+pickup_location})"
"dropoff_location" => "POINT (%{dropoff_location} %{+dropoff_location})"
}
}
The dissect filter supports field notation to control how values are extracted into the event. The + character is used to append to an existing field in the event. More information on the dissect filter can be found in the reference guide: https://www.elastic.co/guide/en/logstash/8.0/plugins-filters-dissect.html.
mutate {
gsub => [
"pickup_location", " ", ",",
"dropoff_location", " ", ","
]
remove_field => ["host", "@version", "message", "path"]
}
date {
match => [ "trip_start_timestamp", "yyyy-MM-dd HH:mm:ss zzz"]
target => "trip_start_timestamp"
}
date {
match => [ "trip_end_timestamp", "yyyy-MM-dd HH:mm:ss zzz"]
target => "trip_end_timestamp"
}
The event should now look as follows:
output {
stdout {}
elasticsearch {
hosts => "http://<your_es_cluster>"
user => "<your_es_username>"
password => "<your_password>"
}
}
/usr/share/logstash/bin/logstash -f logstash-taxi.conf
Taxi events should be sent to both the terminal and your Elasticsearch cluster as configured in the output block.
Next, we will look at using Logstash to parse Syslog data.
Syslog is a standard for network-based logging, used in a wide range of Linux-based systems, applications, and appliances. Syslog messages follow a well-defined specification to structure logs. The following pipeline shows how a raw Syslog message can be passed into Logstash for parsing into JSON.
Navigate to Chapter7/parsing-syslog in the code repository and explore the linux-system.log file. The following screenshot shows the first few lines in the file:
The following code blocks describe the pipeline definition for this scenario:
Important Note
Input for this scenario is obtained via the standard input interface (stdin). It is convenient to use stdin when testing different pipeline definitions during the development phase. Production setups should use a file or network-based input for resiliency.
Further information on the Grok plugin is available in the reference guide: https://www.elastic.co/guide/en/logstash/8.0/plugins-filters-grok.html.
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:event.start} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:[%{POSINT:process.pid}])?: %{GREEDYDATA:event.message}" }
}
The extracted events look as follows:
While we have successfully extracted fields such as event.start, process.name, and host.hostname, the event.message field still contains some useful data that should be extracted for downstream usage. Also note that the Syslog data contains logs from multiple processes, each with its own log format.
if [process.name] == "sshd(pam_unix)" {
dissect {
mapping => {
"event.message" => "%{event.outcome}; %{event.message}"
}
}
kv {
source => "event.message"
target => "sshd"
whitespace => "strict"
}
}
The extracted event looks as follows:
The pipeline definition file contains a few other conditionals to handle different processes writing to Syslog, using filter plugins that we have already described. Once the data is ready, the output plugin can be used to load it into your destination system as required.
/usr/share/logstash/bin/logstash -f logstash-syslog.conf < linux-system.log
Next, we will look at enriching events with additional context using Logstash.
A key use case in ETL workloads is enriching events with contextual information to make sense of the data in downstream consumption. Data enrichment is useful to tie in data from disparate systems in the context of your event (such as enriching a username with detailed information on the user obtained from a directory system) or to make inferences of a given field to make analysis more straightforward (such as identifying the geographic location tied to a public IP address):
The file is already in JSON format, so parsing will not be necessary for this pipeline. The following code blocks describe the pipeline in more detail.
input {
stdin {
codec => "json"
}
}
The event looks as follows:
Now that we have the original event extracted into JSON, let's look at some enrichment options.
geoip {
source => "destination.ip"
target => "destination.geo"
}
Important Note
The geo IP lookup process will only work on public IP addresses and not on private IP values such as source.ip.
translate {
dictionary_path => "/<path_to>/enrich-events/user-lookup.json"
destination => "user"
field => "user.name"
fallback => "user_not_found"
}
The resulting event should look as follows:
if [event.action] == "threat_filter" and [event.outcome] == "allowed" {
mutate {
add_field => {
"threat_match.outcome" => true
"threat_match.severity" => 2
"threat_match.rule_name" => "FW_ALLOWED_THREAT"
}
}
}
The resulting event where a potential threat may have been allowed should look as follows:
/usr/share/logstash/bin/logstash -f logstash-firewall-enrich.conf < firewall.log
Next, we will look at how event streams can be aggregated on Logstash before being ingested into Elasticsearch.
Log volumes can be significantly high in certain scenarios, making it costly and resource-intensive to collect, transform, and store every individual event for analysis. In certain use cases, it can be sufficient to collect an aggregated view of the incoming events for analysis instead of every single event. This scenario will look at how the network activity of a given user can be summarized over a time window prior to ingestion into Elasticsearch:
input {
stdin {
codec => "json"
}
}
mutate {
convert => {
"client.bytes" => "integer"
"server.bytes" => "integer"
}
}
The aggregate filter plugin is used to summarize the data in the events. The filter groups events by the user.name value as defined by the task_id parameter. The code block contains Ruby code to calculate the sum for client.bytes and server.bytes values.
aggregate {
task_id => "%{user.name}"
code => "
# define and set total_client_bytes to zero if it does not exist
map['total_client_bytes'] ||= 0;
map['total_client_bytes'] += event.get('client.bytes');
# define and set total_server_bytes to zero if it does not exist
map['total_server_bytes'] ||= 0;
map['total_server_bytes'] += event.get('server.bytes');
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user.name"
timeout => 5
timeout_code => "event.set('client_bytes_summary', event.get('total_client_bytes') > 1)"
}
More details on the aggregate filter plugin can be found in the reference guide: https://www.elastic.co/guide/en/logstash/8.0/plugins-filters-aggregate.html.
if [event.action] == "threat_filter" {
drop {}
}
The resulting events should look as follows:
As expected, the original events are not sent to the output.
/usr/share/logstash/bin/logstash -f logstash-firewall-agg.conf
Once the Logstash process starts up, copy and paste the contents of the firewall.log file into your console and press the Enter key. Wait for the timeout used in the aggregation plugin for the aggregated results to appear on your console.
In environments where data is generated on multiple source systems, an agent-based data collection mechanism is a common architectural pattern for data ingestion. As described in Chapter 6, Collecting and Shipping Data with Beats, Filebeat can be used to collect and send log files to Elasticsearch or Logstash if further processing is required.
Navigate to Chapter7/logs-filebeat-logstash in the code repository and explore the access.log file. The following screenshot shows the first few lines in the file containing standard Apache access logs:
This example looks at using Filebeat to collect data from a given log file and sending it to Logstash for parsing. Logstash can then be used to load the data into the destination system:
filebeat.inputs:
- type: log
paths:
- /.../path_to/logs-filebeat-logstash/access.log
output.logstash:
hosts: ["127.0.0.1:5044"]
Remember to configure the path value to the absolute filesystem path for the access.log file.
input {
beats {
port => 5044
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
mutate {
remove_field => ["host", "@version", "message"]
}
}
Once parsed, the event should look as follows:
To see this pipeline in action, perform the following steps:
/usr/share/logstash/bin/logstash -f logstash-filebeat.conf
/usr/share/filebeat/filebeat -c filebeat.yml
This concludes the various real-world ETL scenarios using Logstash.
In this chapter, we looked at using Logstash to extract, transform, and load data into Elasticsearch.
First, we looked at the internal workings of Logstash and familiarized ourselves with Logstash pipelines. Next, we explored common and generally useful Logstash input, filter, and output plugins. Finally, we ran through various real-world ETL scenarios and put together corresponding ETL pipelines to process and transform data before loading it into Elasticsearch.
In the next chapter, we look at how Kibana allows for the analysis and visualization of data and makes it possible for users to build and consume solutions on the Elastic Stack.