Pipeline aggregations

The last type of aggregation we will discuss is pipeline aggregations. Till now we've learned about metrics aggregations and bucket aggregations. The first one returned metrics while the second type returned buckets. And both metrics and buckets aggregations worked on the basis of returned documents. Pipeline aggregations are different. They work on the output of the other aggregations and their metrics, allowing functionalities such as moving-average calculations (https://en.wikipedia.org/wiki/Moving_average).

Note

Remember that pipeline aggregations were introduced in Elasticsearch 2.0 and are considered experimental. This means that the API can change in the future, breaking backwards-compatibility.

Available types

There are two types of pipeline aggregation. The so called parent aggregations family works on the output of other aggregations. They are able to produce new buckets or new aggregations to add to existing buckets. The second type is called sibling aggregations and these aggregations are able to produce new aggregations on the same level.

Referencing other aggregations

Because of their nature, the pipeline aggregations need to be able to access the results of the other aggregations. We can do that via the buckets_path property, which is defined using a specified format. We can use a few keywords that allow us to tell Elasticsearch exactly which aggregation and metric we are interested in. The > separates the aggregations and the . character separates the aggregation from its metrics. For example, my_sum.sum means that we take the sum metric of an aggregation called my_sum. Another example is popular_tags>my_sum.sum, which means that we are interested in the sum metric of a sub aggregation called my_sum, which is nested inside the popular_tags aggregation. In addition to this, we can use a special path called _count. This can be used to calculate the pipeline aggregations on document count instead of specified metrics.

Gaps in the data

Our data can contain gaps – situations where the data doesn't exist. For such use cases, we have the ability to specify the gap_policy property and set it to skip or insert_zeros. The skip value tells Elasticsearch to ignore the missing data and continue from the next available value, while insert_zeros replaces the missing values with zero.

Pipeline aggregation types

Most of the aggregations we will show in this section are very similar to the ones we've already seen in the sections about metrics and buckets aggregations. Because of that, we won't discuss them in depth. There are also new, specific pipeline aggregations that we want to talk about in a little more data.

Min, max, sum, and average bucket aggregations

The min_bucket, max_bucket, sum_bucket, and avg_bucket aggregations are sibling aggregations, similar in what they return to the min, max, sum, and avg aggregations. However, instead of working on the data returned by the query, they work on the results of the other aggregations.

To show you a simple example of how this aggregation works, let's calculate the sum of all the buckets returned by the other aggregations. The query that will do that looks as follows:

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 100
   },
   "aggs" : {
    "copies_per_100_years" : {
     "sum" : {
      "field" : "copies"
     }
    }
   }
  },
  "sum_copies" : {
   "sum_bucket" : {
    "buckets_path" : "periods_histogram>copies_per_100_years"
   }
  }
 }
}

As you can see, we used the histogram aggregation and we included a nested aggregation that calculates the sum of the copies field. Our sum_bucket sibling aggregation is used outside the main aggregation and refers to it using the buckets_path property. It tells Elasticsearch that we are interested in summing the values of metrics returned by the copies_per_100_years aggregation. The result returned by Elasticsearch for this query looks as follows:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "periods_histogram" : {
      "buckets" : [ {
        "key" : 1800,
        "doc_count" : 1,
        "copies_per_100_years" : {
          "value" : 0.0
        }
      }, {
        "key" : 1900,
        "doc_count" : 3,
        "copies_per_100_years" : {
          "value" : 7.0
        }
      } ]
    },
    "sum_copies" : {
      "value" : 7.0
    }
  }
}

As you can see, Elasticsearch added another bucket to the results, called sum_copies, which holds the value we were interested in.

Cumulative sum aggregation

The cumulative_sum aggregation is a parent pipeline aggregation that allows us to calculate the sum in the histogram or date_histogram aggregation. A simple example of the aggregation looks as follows:

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 100
   },
   "aggs" : {
    "copies_per_100_years" : {
     "sum" : {
      "field" : "copies"
     }
    },
    "cumulative_copies_sum" : {
     "cumulative_sum" : {
      "buckets_path" : "copies_per_100_years"
     }
    }
   }
  }
 }
}

Because this aggregation is a parent pipeline aggregation, it is defined in the sub aggregations. The returned result looks as follows:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "periods_histogram" : {
      "buckets" : [ {
        "key" : 1800,
        "doc_count" : 1,
        "copies_per_100_years" : {
          "value" : 0.0
        },
        "cumulative_copies_sum" : {
          "value" : 0.0
        }
      }, {
        "key" : 1900,
        "doc_count" : 3,
        "copies_per_100_years" : {
          "value" : 7.0
        },
        "cumulative_copies_sum" : {
          "value" : 7.0
        }
      } ]
    }
  }
}

The first cumulative_copies_sum is 0 because of the sum defined in the bucket. The second is the sum of all the previous ones and the current bucket, which means 7. The next will be the sum of all the previous ones and the next bucket.

Bucket selector aggregation

The bucket_selector aggregation is another sibling parent aggregation. It allows using a script to decide if a bucket should be retained in the parent multi-bucket aggregation. For example, to keep only buckets that have more than one copy per period, we can run the following query (it needs the script.inline property to be set to on in the elasticsearch.yml file):

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 100
   },
   "aggs" : {
    "copies_per_100_years" : {
     "sum" : {
      "field" : "copies"
     }
    },
    "remove_empty_buckets" : {
     "bucket_selector" : {
      "buckets_path" : {
       "sum_copies" : "copies_per_100_years"
      },
      "script" : "sum_copies > 1"
     }
    }
   }
  }
 }
}

There are two important things here. The first is the buckets_path property, which is different to what we've used so far. Now it uses a key and a value. The key is used to reference the value in the script. The second important thing is the script property, which defines the script that decides if the processed bucket should be retained. The results returned by Elasticsearch in this case are as follows:

{
  "took" : 330,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "periods_histogram" : {
      "buckets" : [ {
        "key" : 1900,
        "doc_count" : 3,
        "copies_per_100_years" : {
          "value" : 7.0
        }
      } ]
    }
  }
}

As we can see, the bucket with the copies_per_100_years value equal to 0 has been removed.

Bucket script aggregation

The bucket_script aggregation (sibling parent) allows us to define multiple bucket paths and use them inside a script. The used metrics must be the numeric type and the returned value also needs to be numeric. An example of using this aggregation follows (the following query needs the script.inline property to be set to on in the elasticsearch.yml file):

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 100
   },
   "aggs" : {
    "copies_per_100_years" : {
     "sum" : {
      "field" : "copies"
     }
    },
    "stats_per_100_years" : {
     "stats" : {
      "field" : "copies"
     }
    },
    "example_bucket_script" : {
     "bucket_script" : {
      "buckets_path" : {
       "sum_copies" : "copies_per_100_years",
       "count" : "stats_per_100_years.count"
      },
      "script" : "sum_copies / count * 1000"
     }
    }
   }
  }
 }
}

There are two things here. The first thing is that we've defined two entries in the buckets_path property. We are allowed to do that in the bucket_script aggregation. Each entry is a key and a value. The key is the name of the value that we can use in the script. The second is the path to the aggregation metric we are interested in. Of course, the script property defines the script that returns the value.

The returned results for the preceding query are as follows:

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "periods_histogram" : {
      "buckets" : [ {
        "key" : 1800,
        "doc_count" : 1,
        "copies_per_100_years" : {
          "value" : 0.0
        },
        "stats_per_100_years" : {
          "count" : 1,
          "min" : 0.0,
          "max" : 0.0,
          "avg" : 0.0,
          "sum" : 0.0
        },
        "example_bucket_script" : {
          "value" : 0.0
        }
      }, {
        "key" : 1900,
        "doc_count" : 3,
        "copies_per_100_years" : {
          "value" : 7.0
        },
        "stats_per_100_years" : {
          "count" : 3,
          "min" : 0.0,
          "max" : 6.0,
          "avg" : 2.3333333333333335,
          "sum" : 7.0
        },
        "example_bucket_script" : {
          "value" : 2333.3333333333335
        }
      } ]
    }
  }
}

Serial differencing aggregation

The serial_diff aggregation is a parent pipeline aggregation that implements a technique where the values in time series data (such as a histogram or date histogram) are subtracted from themselves at different time periods. This technique allows drawing the data changes between time periods instead of drawing the whole value. You know that the population of a city grows with time. If we use the serial differencing aggregation with the period of one day, we can see the daily growth.

To calculate the serial_diff aggregation, we need the parent aggregation, which is a histogram or a date_histogram, and we need to provide it with buckets_path, which points to the metric we are interested in, and lag (a positive, non-zero integer value), which tells which previous bucket to subtract from the current one. We can omit lag, in which case Elasticsearch will set it to 1.

Let's now look at a simple query that uses the discussed aggregation:

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 100
   },
   "aggs" : {
    "copies_per_100_years" : {
     "sum" : {
      "field" : "copies"
     }
    },
    "first_difference" : {
     "serial_diff" : {
      "buckets_path" : "copies_per_100_years",
      "lag" : 1
     }
    }
   }
  }
 }
}

The response to the preceding query looks as follows:

{
  "took" : 68,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "periods_histogram" : {
      "buckets" : [ {
        "key" : 1800,
        "doc_count" : 1,
        "copies_per_100_years" : {
          "value" : 0.0
        }
      }, {
        "key" : 1900,
        "doc_count" : 3,
        "copies_per_100_years" : {
          "value" : 7.0
        },
        "first_difference" : {
          "value" : 7.0
        }
      } ]
    }
  }
}

As you can see, with the second bucket we got our aggregation (we will get it with every bucket after that as well). The calculated value is 7 because the current value of copies_per_100_years is 7 and the previous is 0. Subtracting 0 from 7 gives us 7.

Derivative aggregation

The derivative aggregation is another example of parent pipeline aggregation. As its name suggests, it calculates a derivative (https://en.wikipedia.org/wiki/Derivative) of a given metric from a histogram or date histogram. The only thing we need to provide is buckets_path, which points to the metric we are interested in. An example query using this aggregation looks as follows:

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 100
   },
   "aggs" : {
    "copies_per_100_years" : {
     "sum" : {
      "field" : "copies"
     }
    },
    "derivative_example" : {
     "derivative" : {
      "buckets_path" : "copies_per_100_years"
     }
    }
   }
  }
 }
}

Moving avg aggregation

The last pipeline aggregation that we want to discuss is the moving_avg one. It calculates the moving average metric (https://en.wikipedia.org/wiki/Moving_average) over the buckets of the parent aggregation (yes, this is a parent pipeline aggregation). Similar to the few previously discussed aggregations, it needs to be run on the parent histogram or date histogram aggregation.

When calculating the moving average, Elasticsearch will take the window (specified by the window property and set to 5 by default), calculate the average for buckets in the window, move the window one bucket further, and repeat. Of course we also need to provide buckets_path, which points to the metric that the moving average should be calculated for.

An example of using this aggregation looks as follows:

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 10
   },
   "aggs" : {
    "copies_per_10_years" : {
     "sum" : {
      "field" : "copies"
     }
    },
    "moving_avg_example" : {
     "moving_avg" : {
      "buckets_path" : "copies_per_10_years"
     }
    }
   }
  }
 }
}

We will omit including the response for the preceding query as it is quite large.

Predicting future buckets

The very nice thing about moving average aggregation is that it supports predictions; it can attempt to extrapolate the data it has and create future buckets. To force the aggregation to predict buckets, we just need to add the predict property to any moving average aggregation and set it to the number of predictions we want to get. For example, if we want to add five predictions to the preceding query, we will change it to look as follows:

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 10
   }, 
   "aggs" : {
    "copies_per_10_years" : {
     "sum" : {
      "field" : "copies"
     }
    },
    "moving_avg_example" : {
     "moving_avg" : {
      "buckets_path" : "copies_per_10_years",
      "predict" : 5
     }
    }
   }
  }
 }

If you look at the results and compare the response returned for the previous query with the one with predictions, you will notice that the last bucket in the previous query ends on the key property equal to 1960, while the query with predictions ends on the key property equal to 2010, which is exactly what we wanted to achieve.

The models

By default, Elasticsearch uses the simplest model for calculating the moving averages aggregation, but we can control that by specifying the model property; this property holds the name of the model and the settings object, which we can use to provide model properties.

The possible models are: simple, linear, ewma, holt, and holt_winters. Discussing each of the models in detail is beyond the scope of the book, so if you are interested in details about the different models, refer to the official Elasticsearch documentation regarding the moving averages aggregation available at https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-pipeline-movavg-aggregation.html.

An example query using different model looks as follows:

{
 "aggs" : {
  "periods_histogram" : {
   "histogram" : {
    "field" : "year",
    "interval" : 10   },
   "aggs" : {
    "copies_per_10_years" : {
      "sum" : {
        "field" : "copies"
          }  },
  "moving_avg_example" : {
   "moving_avg" : {
    "buckets_path" : "copies_per_10_years",
    "model" : "holt", 
    "settings" : {
     "alpha" : 0.6,
     "beta" : 0.4
    }
   }
  }
   }
  }
 }
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset