Fetching data from other systems: river

In the first chapter, we've seen how to create and update indices using the REST API. Loading the data to ElasticSearch is the main task (except, of course, searching), which should be solved when building a search application. It would be good to have some infrastructure or plugins that can handle integration of the search engine with various sources of data. ElasticSearch is a relatively new project, but already addresses this goal with a functionality called river.

What we need and what a river is

You can guess that there are two approaches for putting the data into your search system. We can pop the data from the source system, or the source system could push the data into our system. In the first case, we need some kind of service in our ElasticSearch cluster that could monitor the changes of an external data source or check these sources periodically. River is such a service. ElasticSearch takes care of this service and makes sure that only a single instance is running across the whole cluster. If any node dies, all rivers running on this node are moved to another node. A particular river instance is described by its name and type. There are several types available but except for dummy river, which is a simple example river, every one of them should be installed as an additional plugin.

Note

Please consider that when using a river, you are bound to the performance of a single node. So for heavy processing of documents, a standalone indexing application is preferred.

Installing and configuring a river

As we said, there are several rivers already available. Some of them are created by the ElasticSearch team and some of them are available as external projects. You can check the official list at http://www.elasticsearch.org/guide/reference/modules/plugins.html.

In this chapter, we will use the MongoDB river as an example. Let's start by installing this river using the ElasticSearch plugins system. In order to do that, we first run the following command:

bin/plugin -install richardwilly98/elasticsearch-river-mongodb/1.6.1

Note

Please note that this command doesn't work with ElasticSearch newer than 0.20.1 because of the binary plugin location changes. Until this is resolved, you can use the following command:

bin/plugin -url https://github.com/downloads/richardwilly98/elasticsearch-river-mongodb/elasticsearch-river-mongodb-1.6.1.zip -install river-mongodb

After restarting our ElasticSearch instance, we are ready to configure our new river.

All the rivers' configuration is stored in the _river index. By default, this index doesn't exist, but we may easily create it just like any other index. In this index every instance of the river can store its data. ElasticSearch assumes that every instance of river has its own configuration data stored under the type in the index equal to the name of the river. Another assumption is that the document indexed in the mentioned index has the _meta identifier. The third assumption is that ElasticSearch handles an additional document with the _status identifier for every configured river. Let's configure our river and see what this index looks like after this operation.

We've prepared the config.json file, which will be loaded into ElasticSearch. Its contents are as follows:

{
  "type" : "mongodb",
  "mongodb" : {
   "servers" : [
    { "host" : "localhost", "port" : 27017 }
   ],
   "db" : "esbook",
   "collection" : "products"
  },
  "index" : {
   "name" : "esbook"
  }
}

As we can see, there are three keys in this JSON object: type – which tells ElasticSearch which river plugin should be used, mongodb – with the river configuration (available MongoDB servers, database, and collection names), and index – with information about the index where fetched data should be indexed. There are some more options described in the river documentation, but these are sufficient for our example.

If you would like to test this example on your computer, make sure that your MongoDB instance is available and it is configured as a replica set (for more information, go to http://docs.mongodb.org/manual/tutorial/deploy-replica-set/). This is necessary because this river uses a special system collection, oplog, for tracking changes in the database.

Now let's run our river. As we said before, this means creation of a document with an identifier named _meta in the _river index, for example, with this command:

curl -XPUT 'localhost:9200/_river/mongolink/_meta' -d @config.json

If everything went well, you should see information in the ElasticSearch log about the river creation. There should also be an empty esbook index. Let's see the status information about our river:

curl -XGET 'localhost:9200/_river/mongolink/_status?pretty'

We should see something like the following result:

{
  "_index" : "_river",
  "_type" : "mongolink",
  "_id" : "_status",
  "_version" : 1,
  "exists" : true, "_source" : {"ok":true,"node":{"id":"VhN9duujSdOkQdAEClNgpg","name":"Lancer","transport_address":"inet[/192.168.1.101:9300]"}}
}

As you can see, ElasticSearch automatically created the _status document that tells us where our river is running (which instance) and what its status is. Now going to the most important part of our test, let's run the Mongo console and create an example document:

PRIMARY> use esbook
switched to db esbook
PRIMARY> db.products.insert({ "name" : "book", "value" : 200 })
PRIMARY>

And now let's query ElasticSearch:

curl 'localhost:9200/esbook/_search?pretty'

And again, if everything was done correctly, we should see something like the following result:

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "esbook",
      "_type" : "esbook",
      "_id" : "5092cc65e629448dce7212d5",
      "_score" : 1.0, "_source" : {"_id":"5092cc65e629448dce7212d5","name":"book","value":200.0}
    } ]
  }
}

As we can see, there is one document in our index and this has exactly the same data as we created in Mongo. You can check it for yourself to see that the update and delete operations also work as expected.

One last thing—we can of course perform some housekeeping and stop and remove our configured river with the use of the following command:

curl -XDELETE localhost:9200/_river/mongolink/
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset