We have seen that for any query, Elasticsearch by default returns only the top 10 documents after scoring and sorting them. However, they are not always enough to serve the purpose. A user always needs more and more data either to render on a page or to process in the backend. Let's see how we can do this.
In the previous chapters, we discussed how Elasticsearch offers the from
and to
parameters to be passed with search requests. So, you always have an option to either increase the size
parameter to load more results from Elasticsearch or send another query with the changed from
and size
values to get more data.
This pagination approach makes sense when you have to fetch a limited number of documents from Elasticsearch. As this approach is too costly and can kill Elasticsearch if you are hitting a request, for example, where from
= 100000
and size
= 100010
to get 10 documents, which have less score than those 1 lac
documents in the index.
While working with Elasticsearch, a functionality that is needed most of the time is: returning a large set of data to process or to simply re-index from one index to another. This type of data fetching does not require any document scoring or sorting. Elasticsearch offers a scan search type to fulfil this requirement.
A scan search type works in the same manner as how you scan a Facebook or Twitter web page with your eyes and scroll to see more content.
Python example:
You can define a query for which you want all the documents to be returned, as follows:
query = {"query":{"match_all":{}}}
Also, you can create a list that will hold the returned documents:
documents = []
Then execute the following request to get the scroll ID from Elasticsearch, which will be used to get the actual data in subsequent requests. The scroll
parameter (timeout for scrolling) in the following request specifies for how long the scroll will be open. It can be defined using 100s
(100 seconds) or 2m
(two minutes):
resp = es.search(index=source_index, doc_type=source_doc_type, body=query, search_type="scan", scroll='100s', size=100)
Once scroll_id
is returned with the preceding request, you can use it inside a while
loop, which will run until Elasticsearch returns the entire document for your query:
while True: print 'scrolling for ',str(scroll_count)+' time' #A new scroll id generated for each request. Scroll parameter is also need to be set for each request. resp = es.scroll(resp['_scroll_id'], scroll='100s') if len(resp['hits']['hits']) == 0: print 'data re-indexing completed..!!' break else: #add the documents to the documents list documents.extend(resp['hits']['hits']) #send the documens to for re-indexing perform_bulk_index(destination_index, destination_doc_type, documents) #Empty your documents list so that it can hold another batch of response documents = []
The perform_bulk_index
function can be implemented in the same way as we have seen in bulk indexing. It will take a set of documents and will be sent to Elasticsearch in bulk:
actions = [] for document in documents: actions.append({ '_op_type': 'create', '_index': destination_index, '_type': destination_doc_type, '_id': document['_id'], '_source': document['_source'] }) try: helpers.bulk(es, actions, request_timeout=100) except Exception as e: print "bulk index raised exception", str(e)
Java Example (using bulk processor):
We have already seen how bulk indexing can be done using BulkRequestBuilder
. You will now learn how to do bulk indexing using the BulkProcessor
class.
As mentioned in the Elasticsearch documentation:
"A bulk processor is a thread safe bulk processing class, allowing you to easily set when to "flush a new bulk request (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk requests allowed to be executed in parallel."
The most important parameters offered by BulkProcessor
are as follows:
translog
to prevent data loss.Let's import the packages into our code to get data through scan-scroll
and bulk
processing:
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit;
The following are the main variables you need to declare to index using a bulk processor:
//The maximum time to wait for the bulk requests to complete public static final int SCROLL_TIMEOUT_SECONDS = 30; //Number of documents to be returned, maximum would be scroll_size*number of shards public static final int SCROLL_SIZE = 10; //Sets when to flush a new bulk request based on the number of actions currently added. defaults to 1000 public static final int BULK_ACTIONS_THRESHOLD = 10000; //Sets the number of concurrent requests allowed to be executed. public static final int BULK_CONCURRENT_REQUESTS = 2; //Sets a flush interval flushing (specified in seconds) public static final int BULK_FLUSH_DURATION = 30;
Create an instance of the Bulk Processor
class using the previous variables:
BulkProcessor bulkProcessor = BulkProcessor.builder(clientTo, createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS) .setFlushInterval(createFlushIntervalTime().build();
Getting the data from scan-scroll can be done as follows:
SearchResponse searchResponse = clientFrom.prepareSearch(fromIndex) .setTypes(sourceDocType) .setQuery(matchAllQuery()) .setSearchType(SearchType.SCAN) .setScroll(createScrollTimeoutValue()) .setSize(SCROLL_SIZE).execute().actionGet();
This will return a scroll ID, which will be used to scroll the documents and return them for processing:
while (true) { searchResponse = clientFrom.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(createScrollTimeoutValue()).execute().actionGet(); if (searchResponse.getHits().getHits().length == 0) { System.out.println("Closing the bulk processor"); bulkProcessor.close(); break; //Break condition: No hits are returned } //Add the documents to the bulk processor and depending on the bulk threshold they will be flushed to ES for (SearchHit hit : searchResponse.getHits()) { IndexRequest request = new IndexRequest(toIndex, destinationDocType, hit.id()); request.source(hit.getSource()); bulkProcessor.add(request); } }
The bulk processor has a listener, which flushes the request index depending on the bulk threshold. This listener can be defined in the following way:
private BulkProcessor.Listener createLoggingBulkProcessorListener() { return new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { System.out.println("Going to execute new bulk composed "+ request.numberOfActions()+" no. of actions"); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println("Executed bulk composed "+ request.numberOfActions()+" no. of actions"); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { System.out.println("Error executing bulk "+ failure); } }; }
You also need to define the following helper function to create time units to be used by bulk processing:
private TimeValue createFlushIntervalTime() { return new TimeValue(BULK_FLUSH_DURATION, TimeUnit.SECONDS); } private TimeValue createScrollTimeoutValue() { return new TimeValue(SCROLL_TIMEOUT_SECONDS, TimeUnit.SECONDS); }