We can always combine searches, filters bucket aggregations, and metric aggregations to get a more and more complex analysis. Until now, we have seen single levels of aggregations; however, as explained in the aggregation syntax section earlier, an aggregation can contain multiple levels of aggregations within. However, metric aggregations cannot contain further aggregations within themselves. Also, when you run an aggregation, it is executed on all the documents in the index for a document type if specified on a match_all query context, but you can always use any type of Elasticsearch query with an aggregation. Let's see how we can do this in Python and Java clients.
Python example
query = { "query": { "match": { "text": "crime" } }, "aggs": { "hourly_timeline": { "date_histogram": { "field": "created_at", "interval": "hour" }, "aggs": { "top_hashtags": { "terms": { "field": "entities.hashtags.text", "size": 1 }, "aggs": { "top_users": { "terms": { "field": "user.screen_name", "size": 1 }, "aggs": { "average_tweets": { "avg": { "field": "user.statuses_count" } } } } } } } } } ,"size": 0 } res = es.search(index='twitter', doc_type='tweets', body=query)
for timeline_bucket in res['aggregations']['hourly_timeline']['buckets']: print 'time range', timeline_bucket['key_as_string'] print 'tweet count ',timeline_bucket['doc_count'] for hashtag_bucket in timeline_bucket['top_hashtags']['buckets']: print 'hashtag key ', hashtag_bucket['key'] print 'hashtag count ', hashtag_bucket['doc_count'] for user_bucket in hashtag_bucket['top_users']['buckets']: print 'screen_name ', user_bucket['key'] print 'count', user_bucket['doc_count'] print 'average tweets', user_bucket['average_tweets']['value']
And you will find the output as below:
time_range 2015-10-14T10:00:00.000Z tweet_count 1563 hashtag_key crime hashtag_count 42 screen_name andresenior count 2 average_tweets 9239.0 ............
Understanding the response in the context of our search of the term crime in a text field:
daywise_timeline
bucketJava example
Writing multilevel aggregation queries (as we just saw) in Java seems quite complex, but once you learn the basics of structuring aggregations, it becomes fun.
Let's see how we write the previous query in Java:
Building the query using QueryBuilder:
QueryBuilder query = QueryBuilders.matchQuery("text", "crime");
Building the aggregation:
The syntax for a multilevel aggregation in Java is as follows:
AggregationBuilders .aggType("aggs_name") //aggregation_definition .subAggregation(AggregationBuilders .aggType("aggs_name") //aggregation_definition .subAggregation(AggregationBuilders .aggType("aggs_name") //aggregation_definition……..
You can relate the preceding syntax with the aggregation syntax you learned in the beginning of this chapter.
The exact aggregation for our Python example will be as follows:
AggregationBuilder aggregation = AggregationBuilders .dateHistogram("hourly_timeline") .field("@timestamp") .interval(DateHistogramInterval.YEAR) .subAggregation(AggregationBuilders .terms("top_hashtags") .field("entities.hashtags.text") .subAggregation(AggregationBuilders .terms("top_users") .field("user.screen_name") .subAggregation(AggregationBuilders .avg("average_status_count") .field("user.statuses_count"))));
Let's execute the request by combining the query and aggregation we have built:
SearchResponse response = client.prepareSearch(indexName).setTypes(docType) .setQuery(query).addAggregation(aggregation) .setSize(0) .execute().actionGet();
Parsing multilevel aggregation responses:
Since multilevel aggregations are nested inside each other, you need to iterate accordingly to parse each level of aggregation response in loops.
The response for our request can be parsed with the following code:
//Get first level of aggregation data Histogram agg = response.getAggregations().get("hourly_timeline"); //for each entry of hourly histogram for (Histogram.Bucket entry : agg.getBuckets()) { DateTime key = (DateTime) entry.getKey(); String keyAsString = entry.getKeyAsString(); long docCount = entry.getDocCount(); System.out.println(key); System.out.println(docCount); //Get second level of aggregation data Terms topHashtags = entry.getAggregations().get("top_hashtags"); //for each entry of top hashtags for (Terms.Bucket hashTagEntry : topHashtags.getBuckets()) { String hashtag = hashTagEntry.getKey().toString(); long hashtagCount = hashTagEntry.getDocCount(); System.out.println(hashtag); System.out.println(hashtagCount); //Get 3rd level of aggregation data Terms topUsers = hashTagEntry.getAggregations() .get("top_users"); //for each entry of top users for (Terms.Bucket usersEntry : topUsers.getBuckets()) { String screenName = usersEntry.getKey().toString(); long userCount = usersEntry.getDocCount(); System.out.println(screenName); System.out.println(userCount); //Get 4th level of aggregation data Avg average_status_count = usersEntry .getAggregations() .get("average_status_count"); double max = average_status_count.getValue(); System.out.println(max); } } }
As you saw, building these types of aggregations and going for a drill down on data sets to do complex analytics can be fun. However, one has to keep in mind the pressure on memory that Elasticsearch bears while doing these complex calculations. The next section covers how we can avoid these memory implications.