Dataflow concurrency is a concurrent programming paradigm that has been around for three decades now. What is so exciting about it?
The main idea behind Dataflow concurrency is to reduce the number of variable assignments to one. A variable can only be assigned a value once in its lifetime, while the number of reads is unlimited. If a variable value is not written by a write operation, all the read operations are blocked until the variable is actually written (bind). With this straightforward, single-assignment approach, it is impossible to access an inconsistent value or experience data race conflicts. The deterministic nature of Dataflow concurrency ensures that it will always behave the same. You can run the same operation 5 or 10 million times the result will always be the same. Conversely, if an operation enters into a deadlock the first time, it will do the same every other time you run it. These qualities make it very easy to reason about concurrency, but it comes at a price: code must be deterministic. Random, time, exceptions, and so on are not allowed. The section of code that employs Dataflow concurrency must act as a pure function, with input and output.
The Groovy's GPars framework exposes this alternative concurrency model, and in this recipe we are going to explore how to solve the problem of high latency when invoking external systems, exposed in the Running tasks in parallel and asynchronously recipe.
For setting up this recipe, please refer to the Getting Ready section of the Running tasks in parallel and asynchronously recipe.
Start the dummy web service using groovy app.groovy
.
The following steps expose how to modify the CriminalService
class to leverage Dataflow concurrency.
CriminalServiceWithDataflow
.package org.groovy.cookbook.dataflow import static groovyx.gpars.dataflow.Dataflow.task import groovyx.gpars.dataflow.DataflowVariable class CriminalServiceWithDataflow { def baseUrl CriminalServiceWithDataflow(String url) { baseUrl = url } }
def fetchData(String country) { println "fetching data for ${country}" def jsonResponse = new DataflowVariable() task { try { "${baseUrl}/${country}".toURL().openConnection().with { if( responseCode == 200 ) { jsonResponse << inputStream.text } else { jsonResponse << new RuntimeException('Invalid Response Code from HTTP GET:' + responseCode ) } disconnect() } } catch( e ) { jsonResponse << e } } jsonResponse }
List getData(List countries) { List aggregatedJson = [] countries.each { aggregatedJson << fetchData(it) } aggregatedJson*.val }
@Test void testDataflow() { def serviceUrl = 'http://localhost:5050' def criminalService =new CriminalServiceWithDataflow(serviceUrl) def data = criminalService. getData(['germany', 'us', 'canada']) assert 3 == data.size() data.each { try { println it } catch (e) { e.printStackTrace() } } }
The fetchData
function of the CriminalServiceWithDataflow
class is where the power of Dataflow in action is really visible. The function contains a DataflowVariable
named jsonResponse
and a task that has the responsibility to populate the variable. This variable can be written only once, through the <<
operator. The task contains the actual code to access the Criminal Service web service with some simplistic exception handling code. When the value of a DataFlowVariable
is read, it will block until the value is set (using <<
). In this way, the time required to collect the data for three countries will be equal to the longest response time.
The getData
function spans the HTTP requests over 3 threads. Note that the fetchData
method is not blocking. The blocking takes place only in the last line of the getData
method, when the val
method is invoked (and therefore the variable read) on each DataflowVariable
containing the HTTP GET response.
It's also worth noting how the exception handling is organized. Let's zoom into the code:
def jsonResponse = new DataflowVariable() try { ... } catch( e ) { jsonResponse << e }
When an exception occurs inside the task, we assign the Exception to the jsonResponse
variable of type DataflowVariable
. The DataflowVariable
class has two methods to access the stored value:
val
method that simply returns the Exceptionget
method that will rethrow the Exception, if anyUse val
or get
depending on your exception handling requirements. You can test how the exception handling works, by shutting down the Ratpack server or passing invalid countries that will yield a 404 response code.
Dataflow concurrency is a very elegant paradigm, and there are more concepts in this model than the one expressed in this recipe. The best way to learn them is head to the official GPars Dataflow documentation located at the following link: http://www.gpars.org/guide/guide/dataflow.html