Implementing triggers in MongoDB using oplog

A trigger in a relational database is a code that gets invoked when an insert, update, or a delete operation is executed on a table in the database. A trigger can be invoked either before or after the operation. Triggers are not implemented in MongoDB out of the box, and in case you need some sort of notification for your application whenever any insert, update, and delete operations are executed, you are left to manage them by yourself in the application. One approach is to have some sort of data access layer in the application that is the only place to query, insert, update, or delete documents from the collections. However, there are a few challenges to this. First, you need to explicitly code the logic to accommodate this requirement in the application, which may or may not be feasible. If the database is shared and multiple applications access it, things become even more difficult. Second, the access needs to be strictly regulated and no other source of insert, update, and delete should be permitted.

Alternatively, we need to look at running some sort of logic in a layer close to the database. One way to track all write operations is using an oplog. Note that read operations cannot be tracked using oplogs. In this recipe, we will write a small Java application to tail an oplog and get all the insert, update, and delete operations happening on a Mongo instance. Note that this program is implemented in Java and works equally well in any other programming language. The crux lies in the logic for the implementation; the platform for implementation can vary. Also, this works only if the mongod instance is started as a part of a replica set and not a standalone instance. Also, this trigger-like functionality can be invoked only after the operation is performed and not before the data gets inserted, updated, or deleted from the collection.

Getting ready

Refer to the Starting multiple instances as part of a replica set recipe from Chapter 1, Installing and Starting the MongoDB Server, for all the necessary setup for this recipe. If you are interested in more details on Java drivers, refer to the Executing query and insert operations using a Java client and Executing update and delete operations using a Java client recipes in Chapter 3, Programming Language Drivers. The prerequisites of these two recipes are all we need for this recipe.

Refer to the Creating and tailing capped collection cursors in MongoDB recipe in this chapter, to know more about capped collections and tailable cursors if you need a refresher. Finally, though not mandatory, Chapter 4, Administration, explains oplog in depth in the Understanding and analyzing oplogs recipe. This recipe will not explain oplog in depth as we did in Chapter 4, Administration. Open a shell and connect it to the primary of the replica set.

For this recipe, we will be using the project mongo-cookbook-oplogtrigger. This project is available in the source code bundle downloadable from the book's website. The folder needs to be extracted on the local filesystem. Open a command-line shell and go to the root of the project extracted. It should be in the directory where the pom.xml file is found. Also, the TriggerOperations.js file will be needed to trigger operations in the database that we intend to capture.

How to do it…

  1. Open an operating system shell and execute the following command:
    mvn clean compile exec:java -Dexec.mainClass=com.packtpub.mongo.cookbook.OplogTrigger -Dexec.args="test.oplogTriggerTest"
    
  2. With the Java program started, we will open the shell as follows, with the TriggerOperations.js file present in the current directory and the mongod instance listening to port 27000 as the primary:
    $ mongo --port 27000 TriggerOperations.js --shell
    
  3. Once the shell is connected, execute the following function we loaded from the JavaScript:
    test:PRIMARY> triggerOperations()
    
  4. Observe the output printed out on the console where the Java program com.packtpub.mongo.cookbook.OplogTrigger is being executed using Maven.

How it works…

The functionality we implemented is pretty handy for a lot of use cases. Let us see what we did at a higher level first. The Java program com.packtpub.mongo.cookbook.OplogTrigger is something that acts as a trigger when new data is inserted, updated, or deleted from a collection in MongoDB. It uses the oplog collection that is the backbone of replication in Mongo to implement this functionality.

The JavaScript we have just acts as a source of producing, updating, and deleting data from the collection. There's nothing really significant to what this JavaScript function does, but it inserts six documents in a collection, updates one of them, deletes one of them, inserts four more documents, and finally, deletes all the documents. You may choose to open the TriggerOperations.js file and take a look at how it is implemented. The collection on which it performs is present in the test database and is called oplogTriggerTest.

When we execute the JavaScript function, we should see something like the following output printed on the console:

[INFO] <<< exec-maven-plugin:1.2.1:java (default-cli) @ mongo-cookbook-oplogtriger <<<
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:java (default-cli) @ mongo-cookbook-oplogtriger ---
Connected successfully..
Starting tailing oplog...
Operation is Insert ObjectId is 5321c4c2357845b165d42a5f
Operation is Insert ObjectId is 5321c4c2357845b165d42a60
Operation is Insert ObjectId is 5321c4c2357845b165d42a61
Operation is Insert ObjectId is 5321c4c2357845b165d42a62
Operation is Insert ObjectId is 5321c4c2357845b165d42a63
Operation is Insert ObjectId is 5321c4c2357845b165d42a64
Operation is Update ObjectId is 5321c4c2357845b165d42a60
Operation is Delete ObjectId is 5321c4c2357845b165d42a61
Operation is Insert ObjectId is 5321c4c2357845b165d42a65
Operation is Insert ObjectId is 5321c4c2357845b165d42a66
Operation is Insert ObjectId is 5321c4c2357845b165d42a67
Operation is Insert ObjectId is 5321c4c2357845b165d42a68
Operation is Delete ObjectId is 5321c4c2357845b165d42a5f
Operation is Delete ObjectId is 5321c4c2357845b165d42a62
Operation is Delete ObjectId is 5321c4c2357845b165d42a63
Operation is Delete ObjectId is 5321c4c2357845b165d42a64
Operation is Delete ObjectId is 5321c4c2357845b165d42a60
Operation is Delete ObjectId is 5321c4c2357845b165d42a65
Operation is Delete ObjectId is 5321c4c2357845b165d42a66
Operation is Delete ObjectId is 5321c4c2357845b165d42a67
Operation is Delete ObjectId is 5321c4c2357845b165d42a68

The Maven program runs continuously and never terminates as the Java program doesn't terminate. You may hit Ctrl + C to stop the execution.

Let us analyze the Java program, which is where the meat of the content is. The first assumption is that for this program to work, the replica set must be set up, as we will use Mongo's oplog collection. The Java program creates a connection to the primary of the replica set members, connects to the local database, and gets the oplog.rs collection. Then, all it does is find the last, or nearly the last, timestamp in the oplog. This is done not just to prevent the whole oplog from being replayed on startup, but also to mark a point towards the end in the oplog. The following is the code to find this timestamp value:

DBCursor cursor = collection.find().sort(new BasicDBObject("$natural", -1)).limit(1);
int current = (int) (System.currentTimeMillis() / 1000);
return cursor.hasNext() ? (BSONTimestamp)cursor.next().get("ts") : new BSONTimestamp(current, 1);

The oplog is sorted in the reverse natural order to find the time in the last document in it. As oplogs follow the FIFO pattern, sorting the oplog in the natural descending order is equivalent to sorting by the timestamp in descending order.

Once this is done, finding the timestamp as earlier, we query the oplog collection as usual, but with two additional options as follows:

DBCursor cursor = collection.find(QueryBuilder.start("ts")
  .greaterThan(lastreadTimestamp).get())
  .addOption(Bytes.QUERYOPTION_TAILABLE)
  .addOption(Bytes.QUERYOPTION_AWAITDATA);

The query finds all documents greater than a particular timestamp and adds two options Bytes.QUERYOPTION_TAILABLE and Bytes.QUERYOPTION_AWAITDATA. The latter option can only be added when the former option is added. This not only queries and returns the data, but also waits for some time when the execution reaches the end of the cursor for some more data. Eventually, when no data arrives, it terminates.

During every iteration, store the last seen timestamp as well. This is used when the cursor closes when no more data is available, and we query again to get a new tailable cursor instance. The query this time will use the timestamp that we have stored on the previous iteration when the last document was seen. This process continues indefinitely and we basically tail the collection just as we tail a file in Unix using the tail command.

The oplog document contains a field called op, for the operation whose values are i, u, and d for insert, update, and delete, respectively. The field o contains the inserted or deleted object's ID (_id) in the case of insert and delete. In the case the update of the file o2 contains the _id, all we do is simply check for these conditions and print out the operation and the ID of the document inserted, deleted, or updated.

Let's look at things we need to be careful about. Obviously, the deleted documents will not be available in the collection, so _id would not really be useful if you intend to query. Also, be careful when selecting a document after the update using the ID we get, as some other operation, later in the oplog, might already have performed more updates on the same document and our application's tailable cursor is yet to reach that point. This is common in case of high-volume systems. We have a similar problem for inserts as well. The document we might query, using the provided id, might be updated/deleted already. Applications using this logic to track these operations must be aware of them.

Alternatively, take a look at the oplog that contains more details, such as the document inserted or the update statement executed. Updates in the oplog collection are idempotent, which means they can be applied any number of times without unintended side effects. For instance, if the actual update was to increment the value by one, the update in the oplog collection will have the set operator with the final value to be expected. This way, the same update can be applied multiple times. The logic you would use would then have to be more sophisticated, to implement such scenarios.

Also, failovers are not handled here. This is needed for production-based systems. The infinite loop on the other hand, opens a new cursor as soon as the first one terminates. There could be a sleep duration introduced before the oplog is queried again, to avoid overwhelming the server with queries. Note that the program given here is not a production-quality code but just a simple demo of the technique that is being used by a lot of other systems to get notified about new data insertions, deletions, and updates to collections in MongoDB.

MongoDB didn't have the text search feature till version 2.4, and prior to that, all full-text search was handled using external search engines such as Solr or Elasticsearch. Even now, at the time of writing, the text search feature in MongoDB, though production-ready, is something many would still use a dedicated external search indexer. It won't be surprising if a decision is taken to use an external full-text index search tool instead of leveraging MongoDB's inbuilt one. In case of Elasticsearch, the abstraction to flow the data into the indexes is known as river. The MongoDB river in Elasticsearch, which adds data to the indexes as and when the data gets added to the collections in Mongo, is built on the same logic as we saw in the simple program implemented in Java.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset