This chapter introduces some state-of-the-art technologies like Event Stream Processing (ESP) and Complex Event Processing (CEP) and their applications in BPMs. We will look at an OSWorkflow function provider that interfaces with the Esper CEP engine and allows monitoring of real-time process information and events. This chapter assumes basic knowledge of SQL and the relational data model concepts.
CEP is a relatively new technology to process events and discover complex patterns inside streams of events. CEP engines are also known as Event Stream Processing (ESP) engines. Events can be anything that happens outside or inside your application. These events contain data about the business situations that occurred and information about the data (also known as metadata). A sequence of events from the same source is called an event stream.
By processing the event streams with business-defined patterns, you can detect and react to business situations in real time. For example, you can monitor a financial stock index in real time. If it reaches certain values within an hour, you can react to this by selling a percentage of the stock. The threshold values are defined in a CEP pattern. This pattern tells the CEP engine how to detect and react to this event.
CEP technologies use patterns to match events inside the stream and then watch the stream for matches of those patterns. The event stream flows through a pattern chain. If a pattern matches with an event, the engine is notified. This flow of information is the opposite of traditional data handling, such as SQL databases, in which the data is stored and then the patterns (SQL queries) are run over the data to select the relevant data that matches the pattern. This flow is better understood visually.
The following figure displays the event stream as an ongoing flow through which the patterns match the relevant events.
It's common to confuse CEP technology with rule engine technology, such as we saw in Chapter 5. This is because patterns are like rules, in the sense they both have a condition and a consequence.
The difference between the two technologies lies in the temporal component of CEP. This allows the patterns to use time as another dimension. CEP technology enables patterns such as: "select event A if the previous event was B". This would be awkward and difficult to implement in a rule engine. Also, a CEP engine is optimized for real-time or near real-time processing of high volumes of event data.
A BPM is an ideal place to implement a CEP engine.
Business events exists naturally in a continuous stream; they happen all the time in the real world and to have a real-time response to them, we must have real-time processing capabilities.
A BPMS naturally identifies its events and it reacts in a manner similar to the workflow—if a sale is closed, the workflow engine will finish this sale workflow instance. So, the event input is provided by all BPM suites. The CEP engine, taking advantage of this event input provided by the BPMS, gives them pattern matching and real-time processing capabilities.
A CEP engine is a new but very useful component of a BPMS and it enables the BPMS to react in real time to relevant business events. Additionally, it provides real-time Business Activity Monitoring to important business situations and especially to business exceptions.
In addition to BPM event processing, real-world uses of CEP include fraud detection, automatic stock trading, SLA and KPI monitoring, and sensor applications (RFID, etc.).
Esper is an open-source Java CEP engine. Esper, like OSWorkflow needs to be embedded inside your application, it doesn't run in standalone mode. Esper is optimized for the processing of very large volume of events in real time.
This section describes the three most important concepts in Esper architecture: events, patterns, and listeners.
Esper, like every CEP engine analyzes an event stream and detects relevant events, matching them with user-defined patterns. For Esper, these events are regular JavaBeans and the patterns are represented in EQL. EQL is a special language designed for pattern matching and its syntax is very similar to that of SQL.
The patterns in EQL are registered in the Esper engine and when an event arrives, it is checked against all the active patterns. What happens when one or more patterns match the event? The Esper engine notifies the listeners of the event's matching patterns.
This mechanism is analogous to the observer design pattern. This enables a nice decoupling between the event and the event listeners. Each pattern can have many listeners. The listeners are free to do what they want when notified of an event occurrence—they can print the event information, send an email, etc.—as listeners are made of regular Java code. The following figure displays these interactions in an ordered fashion for better understanding.
You can download Esper from its web page located at http://esper.codehaus.org/ and can obtain help and support from the user mailing list. To install Esper, all you need to do is download the Esper distribution and unpack it.
To create our Hello World example we need three things—an event class, a pattern to match events, and a listener to notify when the pattern matches. These basic building blocks are depicted in the following figure:
The figure shows these interactions in a numbered sequence. We will first create our event class. Remember that Esper sees events as common POJOs with JavaBean-like properties. This is a very non-intrusive approach; you don't need to couple your application to any Esper API. Additionally, Esper also supports maps and XML elements as events.
A very simple event class is shown in the following code snippet:
package packtpub.osw.cep; /** * Sample event class for CEP. */ public class Deposit { private float amount; private String customerId; /** * @return the customerId */ public String getCustomerId() { return customerId; } /** * @param customerId the customerId to set */ public void setCustomerId(String customerId) { this.customerId = customerId; } /** * @return the amount */ public float getAmount() { return amount; } /** * @param amount the amount to set */ public void setAmount(float amount) { this.amount = amount; } }
The Java class shown in this code has two JavaBean properties, namely amount
and customerId
, and the two respective pairs of JavaBean getters and setters. These properties are handled by Esper using Java's reflection. Next, we need the pattern that the engine will match with the event data. The pattern is just a string containing the following EQL:
Every A=packtpub.osw.cep.Deposit
This pattern is very simple. It tells the engine that for every arrived event of type Deposit
, it will notify our listener. The Every
keyword ensures that Esper will notify not just the first time but every time. No other criterion needs to be matched. The A=
token pair is a variable assignment and the packtpub.osw.cep.Deposit
is our JavaBean class name.
Following the pattern, we need the listener. The listener for this example only prints the event information when notified. Listeners need to implement the net.esper.client.UpdateListener
interface as shown in the PrintListener
class:
package packtpub.osw.cep; import java.util.Arrays; import net.esper.client.UpdateListener; import net.esper.event.EventBean; /** * Simple listener that prints the event to console. */ public class PrintListener implements UpdateListener { public void update(EventBean[] newEvent, EventBean[] oldEvent) { System.out.println(Arrays.toString(newEvent)); } }
The code shows the PrintListener
class that implements the UpdateListener
interface provided by Esper. This listener interface has only one method, the update()
method, which is invoked when a pattern matches an event occurrence.
This method receives two EventBean
arrays. The first one is an array of the new events and the other is an array of the events that have already been notified to this listener. Esper stores them in memory, which is known as an event window. You can access this window by using the old events array.
The EventBean
class is a wrapper class provided by Esper for all type of events.
Finally, to execute the example we need a class with a main
method, to set up the Esper engine and send it some events. The following code shows this class:
package packtpub.osw.cep; import net.esper.client.EPAdministrator; import net.esper.client.EPRuntime; import net.esper.client.EPServiceProvider; import net.esper.client.EPServiceProviderManager; import net.esper.client.EPStatement; /** * CEP Example test class. */ public class CEPTest { public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPStatement pattern = admin.createPattern("every A= packtpub.osw.cep.Deposit"); PrintListener listener = new PrintListener(); pattern.addListener(listener); Deposit d = new Deposit(); d.setAmount(100); d.setCustomerId("123"); EPRuntime runtime = epService.getEPRuntime(); runtime.sendEvent(d); } }
The class shown in the code snippet consists of only a main
method with all the code needed to set up Esper and a Deposit
JavaBean and send it to the engine.
First, it generates an EPServiceProvider
, which is an administrative Esper object. From this provider, we get the EPAdministrator
and the EPRuntime
. The first is responsible for creating patterns and EQL queries, and the second is a gateway to the engine, for passing events to it.
The EPAdministrator.createPattern(String pattern)
method generates an EPStatement
with the pattern being passed as argument. The EPStatement
type is a common superclass for EQL and pattern expression. The registered pattern becomes active right away.
Then our PrintListener
is instantiated and associated to the pattern with the EPStatement.addListener(UpdateListener listener)
method. The listener becomes active right away. As soon as the pattern matches an event, the update()
method of the listener class will be called.
Finally, this class generates a Deposit
instance and sends it to the engine via the sendEvent()
method of the EPRuntime
object. Remember, that our Deposit
object is a POJO that also serves as an event for Esper. The pattern is matched every time an object of the class Deposit
arrives, so the PrintListener
update method is invoked printing the event data to the console.
This is a very simple example but serves as a basic block for any use of Esper inside your applications.
Patterns expressions and EQL
Esper makes the distinction between patterns expressions and EQL queries. Both are enclosed in what Esper calls a statement.
Patterns expressions begin with the keyword every
, that is, they are activated every time the expression matches an event. EQL queries begin with select
or insert
keywords.
The EPAdministrator.createPattern(String pattern)
and EPAdministrator.createEQL(String query)
methods both return an instance of an EPStatement
, a superclass for both pattern expressions and EQL queries.
The Esper engine supports two usage modes: the push mode and the pull mode. The push mode is like the example we just implemented. The engine notifies (pushes to) the listener when an event matching the pattern arrives.
On the other hand, the pull mode queries the engine any time (not only at the time an event arrives) to get the relevant events matching a pattern or query. This mode works by first registering the pull pattern or query in the engine. The pull mode is useful for periodically polling the engine's data and when we are not interested in every pattern match event. The following figure illustrates this mechanism:
In the figure you can see the Query Puller as the element that retrieves the event data from the EQL Query. Esper allows for both push and pull usage modes with EQL and pattern expressions.
Every EPStatement
has the iterator()
method. This method returns a java.util.Iterator
to be able to visit the pattern or EQL data. This is basically the way the pull mode is implemented in Esper.
Let's modify the example a bit to see how the pull mode works. We'll use the pull mode to ask the engine for the output data of the pattern expression.
The EQL pattern in the following example is the same as before but our test class needs a few changes:
package packtpub.osw.cep; import java.util.Iterator; import net.esper.client.*; import net.esper.event.EventBean; /** * CEP Pull model example. */ public class CepCountTest { public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPStatement pattern = admin.createPattern("every A= packtpub.osw.cep.Deposit"); Deposit d = new Deposit(); d.setAmount(100); d.setCustomerId("123"); EPRuntime runtime = epService.getEPRuntime(); runtime.sendEvent(d); Iterator<EventBean> it = pattern.iterator(); System.out.println(it.next().get("A")); } }
This time there's no listener registered for the pattern, instead a EPStatement.iterator()
method is invoked, for which Esper API returns a java.util.Iterator
with the pulled pattern event data.
When you run this code, it will print the number of times an event of type Deposit
arrived.
In the earlier examples, Esper matched the event that arrived with pattern expressions. These expressions are suited for inspection and detection of events; if we want to benefit fully from the CEP technology we must be able to make queries and manipulate the event streams. Esper uses its own proprietary EQL language to both query and handle event streams.
The EQL language is very similar to the SQL standard, especially in the use of the select
and insert
statements. But contrary to SQL, the EQL from
clause applies to event streams and not to relational tables. EQL has several shared characteristics with SQL such as aggregated functions, alias, where conditions, order by statements, etc.
Like the pattern expression, EQL statements, when matched by the engine, call the respective UpdateListeners
with the event data extracted from the event stream.
For example, the following pattern expression sends an event named A
to the UpdateListener:
Every A=packtpub.osw.cep.Deposit
This event named A
is of type Deposit
. When working with EQL queries, the same behavior applies:
Select * from packtpub.osw.cep.Deposit
This query feeds the whole bean to the UpdateListener
. The select
clause must be expressed in the EQL Query; it is not optional. The *
symbol after the select
clause tells Esper to send every property of the bean to the listener. The from
indicates the event stream from which to get the event data.
This is a very powerful feature. You can join events from different event streams. It is very useful for the BPM engine, as you can have every business process in its own stream, and join them when necessary for complex computations.
Remember that the EPAdministrator
uses distinct methods for creating pattern expressions (the createPattern()
method) and EQL queries (the createEQL()
method). Both methods receive a string parameter containing the pattern expression or the EQL query.
You can specify the engine to get more detailed information as shown in the following EQL query:
Select customerId from packtpub.osw.cep.Deposit
This query selects only the customerId
JavaBean property of the Deposit
beans from the Deposit
event stream.
What happens if you want to select some but not all events from the event stream? The familiar SQL operator where
enters in action. The where
is an optional clause in EQL queries. The following query restricts the events of type Deposit
, to those that have amounts over 1,000 units.
Select customerId from packtpub.osw.cep.Deposit where amount > 1000
Esper has a handful of operators available in the where
clause as shown in the following table:
Operator |
Description |
---|---|
= |
Equals |
< |
Less than |
> |
Greater than |
>= |
Greater or equal than |
<= |
Less or equal than |
!= |
Not |
< > |
Distinct from |
Is null |
The expression preceded is null |
Is not null |
The expression preceded is not null |
In |
Restricts the range to the values expressed inside the clause |
Between |
Restricts the range between two values |
Like |
Restricts the range to values similar to the like expression |
Regexp |
Restricts the range to values that match the regular expression assigned |
EQL also has the alias capability of SQL. For example the first example can be rewritten like this:
Select customerId as cId from packtpub.osw.cep.Deposit as d
In this EQL query, the customerId
property is renamed to cId
and the Deposit
event stream is named to d
. The alias feature is useful for simplifying and shortening queries, making them more readable.
Group By
EQL also supports the group by
operator. It has the same behavior as that of its SQL counterpart—it groups a column's similar result data into one record:
Select customerId, sum(amount) from packtpub.osw.cep.Deposit group by customerId
This EQL query groups the event data by the customer identification number. The listener receives the identification and the sum of all the amounts of each customer. The output of our PrintListener
is as follows:
… -------------------------- sum(amount):200.0 customerId:1 -------------------------- sum(amount):200.0 customerId:2 -------------------------- sum(amount):200.0 customerId:3 -------------------------- sum(amount):200.0 customerId:4 -------------------------- sum(amount):200.0 customerId:5 -------------------------- sum(amount):200.0 customerId:6 -------------------------- sum(amount):200.0 customerId:7 -------------------------- sum(amount):200.0 customerId:8 -------------------------- sum(amount):200.0 customerId:9 -------------------------- …
This query doesn't act over the whole event stream. On the contrary, it groups only one event's data! This is because, as each event arrives, the EQL query is executed upon it. Thus the result is the grouping of just one record, and not of all the records that have arrived so far.
To group a set of events the way SQL does, you must specify an event output rate. To define event output rate in EQL a special and new operator is needed; the output by
operator serves this purpose.
The following EQL notifies the listener after every two events, thus generating the desired effect:
Select customerId, sum(amount) from packtpub.osw.cep.Deposit group by customerId output every 2 events
The EQL's output every 2 events
clause controls the rate of information sent to the listener. The output is what you may expect:
… sum(amount):100.0 customerId:1 sum(amount):100.0 customerId:2 sum(amount):100.0 customerId:3 sum(amount):100.0 customerId:4 sum(amount):100.0 customerId:5 sum(amount):100.0 customerId:6 sum(amount):100.0 customerId:7 sum(amount):100.0 customerId:8
The PrintListener's
output shows the grouping of two events each time (this output is rendered with two rounds of identical events).
Time and Event Windows
Esper supports time and event windows for EQL queries and pattern expressions. This enables a very flexible handling of events and supports advanced use cases for CEP. Esper restricts events with time delimiters or event number delimiters.
For example, you can specify an EQL query to have effect over the last four events regardless of the time they arrived. We have modified the previous query:
Select customerId, amount from packtpub.osw.cep.Deposit.win:length(4)
The window length is defined after the. next to the event stream name. The
win:length(4)
signals Esper to make a window of four events in which the EQL will take effect. Besides event window, you can use a time window.
For example, the following EQL acts on the last four minutes of events regardless of the number of events:
Select customerId, amount from packtpub.osw.cep.Deposit.win:time (4 minutes)
Both time length and event number restrictions support a batch mode, in which the events arrive in a batch to the listener. The previous two examples would look as follows:
Select customerId, amount from packtpub.osw.cep.Deposit.win:length_batch(4) Select customerId, amount from packtpub.osw.cep.Deposit.win:time_batch(4 minutes)
The event has an element number parameter and the time has a time period parameter. Examples of time period parameters are as follows:
15 seconds 90 minutes 30 seconds 1 day 2 hours 20 minutes 15 seconds 110 milliseconds
These examples are self descriptive. The syntax of time windows is as follows:
time-period : [day-part] [hour-part] [minute-part] [seconds-part] [milliseconds-part]
The time and event windows are very useful tools in CEP solutions.
Aggregate Functions
Just like SQL, Esper provides aggregates functions to summarize and perform calculations on the event stream data. The aggregate functions are listed in the following table.
Aggregate function name |
Description |
---|---|
sum(expression) |
Returns the sum of the values of the expression |
avg(expression) |
Returns the average of the values in the expression |
count(expression) |
Returns the number of non-null values in the expression |
count(*) |
Returns the number of events |
max(expression) |
Returns the highest value in the expression |
min(expression) |
Returns the lowest value in the expression |
median(expression) |
Returns the median value in the expression |
stddev(expression) |
Returns the standard deviation of the values in the expression |
avedev(expression) |
Returns the mean deviation of the values in the expression |
All aggregate functions listed in this table except count(*)
support the distinct
modifier. This changes the behavior in some cases. Consult the Esper documentation for more details on the aggregate functions.
This section finishes the EQL and pattern expression overview. For a more in-depth reference to EQL or pattern expressions refer to the Esper official documentation.
This section discusses the Esper integration with OSWorkflow to process business event data and react with user-defined patterns.
To enable Esper functionality inside an OSWorkflow definition, we must create a way to emit events from OSWorkflow into the Esper engine.
OSWorkflow can provide this extension via a custom FunctionProvider
. This FunctionProvider
will emit events into the engine. The code for the class is as follows:
package packtpub.osw.cep; import java.util.Map; import net.esper.client.*; import com.opensymphony.module.propertyset.PropertySet; import com.opensymphony.util.BeanUtils; import com.opensymphony.workflow.FunctionProvider; import com.opensymphony.workflow.WorkflowException; /** * Function provider that sends an event to the esper engine. */ public class PushEventFunctionProvider implements FunctionProvider { public void execute(Map transientVars, Map args, PropertySet ps) throws WorkflowException { // the event POJO class name to be instantiated by reflection String eventClassName = (String) args.get("event.class.name"); Object event; try { event = Class.forName(eventClassName).newInstance(); } catch (Exception e) { throw new WorkflowException(e); } //invokes the setters of the event with the argument map data. // if a setter name is equals to a map key, it will be called // with the key's value as a parameter. // f.e.: if the arguemnt Map contains a key "name" with value // "john" // and the event bean has a method called setName, it will be // invoked // by reflection in the following way: setName("john"). // See the javadocs for more detail.BeanUtils.setValues(event, // args, null); // gets the default provider and the default runtime for this // JVM. EPServiceProvider epService = EPServiceProviderManager .getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); runtime.sendEvent(event); } }
This code creates an event of the type passed by the argument to the FunctionProvider
and fills its properties with similar matching keys in the transientVars
map to send it to the engine. For example, if the bean being instantiated has a long
JavaBean property named amount
and the transientVars
map has a similarly named key, the contents (if the type matches) of the key's value are set into the amount
property of the JavaBean.
As a regular FunctionProvider
it can be placed in pre and post functions, in a step or an action. A snippet of the XML workflow definition is as follows:
<post-functions> <function type="class"> <arg name="class.name">packtpub.osw.cep.PushEventFunctionProvider </arg> <arg name="event.class.name">packtpub.osw.cep.Deposit </arg> <arg name="amount">${amount} </arg> <arg name="customerId">${customerId} </arg> </function> </post-functions>
The post-functions
element of the snippet has only one function, the PushEventFunctionProvider
defined in the class.name
argument as usual. The amount
and customerId
argument names are passed to the FunctionProvider
via the transientVars
map. Thus ending up in the event.class.name
(the JavaBean event) specified as packtpub.osw.cep.Deposit
.
Remember that the FunctionProvider
arguments are subject to variable interpolation. Finally, a code fragment for testing the functionality of the FunctionProvider
is as follows:
Map inputs = new HashMap(); inputs.put("customerId", "123"); inputs.put("amount", new Float(10000f)); long workflowId = workflow.initialize("cep", 100, Collections.EMPTY_MAP); workflow.doAction(workflowId, 1, inputs);
This fragment fills an inputs
map with the customerId
and amount
values, and then creates a new workflow of type cep
and executes action number 1 sending the inputs
map along. This example assumes you have already set up the Esper engine elsewhere in the code, as well as the patterns and corresponding listeners.
This section shows examples of CEP within a BPM environment.
Event-based Mail Alerts
After integration with OSWorkflow, the work that is left to implement the CEP solution over a BPMS is to generate the appropriate patterns and listeners. For example, with a SendMailListener
we can implement an event-based alert. Additionally, if the pattern is an EQL query, we can use it to mail process performance indicators to a manager in a real-time fashion.
package packtpub.osw.cep; import java.util.Properties; import javax.mail.Message; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import net.esper.client.UpdateListener; import net.esper.event.EventBean; /** * This listener send an email with the current data each time * in each invocation. */ public class SendMailListener implements UpdateListener { private String host; private String to; private String from; private String subject; private String bodyPrefix; /** * Constructor. * @param host * @param to * @param from * @param subject * @param bodyPrefix */ public SendMailListener(String host, String to, String from, String subject, String bodyPrefix) { super(); this.host = host; this.to = to; this.from = from; this.subject = subject; this.bodyPrefix = bodyPrefix; } public void update(EventBean[] newData, EventBean[] oldData) { sendMail(host, to, from, subject, bodyPrefix + " " + newData); } private void sendMail(String host, String to, String from, String subject, String body) { Properties config = new Properties(); config.put("mail.host", host); Session session = Session.getDefaultInstance(config); MimeMessage message = new MimeMessage(session); try { message.addRecipient(Message.RecipientType.TO, new InternetAddress(to)); message.addFrom(new InternetAddress[] { new InternetAddress(from) }); message.setSubject(subject); message.setText(body); Transport.send(message); } catch (Exception e) { e.printStackTrace(); } } }
Example 1—Measuring SLA through KPIs
This example uses Esper as a performance monitor component of a BPMS solution. The aggregate functions such as count, avg
, and sum
within Esper EQL queries allow a business manager to monitor the Service Level Agreement (SLA) of a business process. In this example, we make use of the SendMailListener
features.
An SLA in BPMS is defined in terms of the following metrics or KPIs:
Number of new process instances: Arrival rate per minute
Completion time of process instances: Completion rate per minute
Number of errors in process instances: Failure rate per minute
These metrics are sent every minute to a manager by mail. You can easily change this setting by modifying the time. The SLA information is better understood visually, for which a charting feature will be implemented.
Brief Overview of the Example
This example has the following parts:
Previous section's FunctionProvider
EQL queries, custom Listener, and SendMailListener
Charting component
This example uses the FunctionProvider
defined in the last section. This FunctionProvider
will send events to the Esper engine during the creation, completion, and error steps of a business process. The events will be sent to the registered listeners. There is one event class for each type of event:
The ProcessCreationEvent
class
The ProcessCompletionEvent
class
The ProcessErrorEvent
class.
There is an EQL query for each type of event, calculating the statistics needed for the KPI's values. The same listener is registered for these three. This listener passes the values to the Charting
component, for displaying them to the user.
The Charting
component uses the open-source charting library JFreeChart to graph the KPIs.
Detailed Explanation
We will use a very basic sample workflow definition in the example. This definition is the one used previously on the Hello World section of this chapter. We have modified it to send more events to the engine during its lifespan and to use the event type we talked about in the previous section. The XML definition is as following:
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE workflow PUBLIC "-//OpenSymphony Group//DTD OSWorkflow 2.6//EN" "http://www.opensymphony.com/osworkflow/workflow_2_8.dtd"> <workflow> <initial-actions> <action id="100" name="Start Workflow"> <post-functions> <function type="class"> <arg name="class.name"> packtpub.osw.cep.PushEventFunctionProvider </arg> <arg name="event.class.name"> packtpub.osw.cep.ex1.ProcessCreationEvent </arg> </function> </post-functions> <results> <unconditional-result old-status="Finished" status="Underway" step="1" /> </results> </action> </initial-actions> <steps> <step id="1" name="Sending event"> <actions> <action id="1" name="Finish First Draft"> <results> <unconditional-result old-status="Finished" step="2" status="Underway"> <post-functions> <function type="class"> <arg name="class.name"> packtpub.osw.cep.PushEventFunctionProvider </arg> <arg name="event.class.name"> packtpub.osw.cep.ex1.ProcessFailureEvent </arg> </function> </post-functions> </unconditional-result> </results> </action> </actions> </step> <step id="2" name="Edit Doc"> <post-functions> <function type="class"> <arg name="class.name"> packtpub.osw.cep.PushEventFunctionProvider </arg> <arg name="event.class.name"> packtpub.osw.cep.ex1.ProcessCompletionEvent </arg> </function> </post-functions> </step> </steps> </workflow>
This definition uses the FunctionProvider
to send events to Esper as soon as it is created (ProcessCreationEvent
), a sample ProcessFailureEvent
on the completion of the first step, and a ProcessCompletionEvent
when it finishes. This allows us to test all three events.
The EQL queries that make up the KPIs are as follows:
select 'creation' as name,count(*) as value from packtpub.osw.cep.ex1.ProcessCreationEvent.win:time(1 minutes) select 'completion' as name,count(*) as value from packtpub.osw.cep.ex1.ProcessCompletionEvent.win:time(1 minutes) select 'failure' as name,count(*) as value from packtpub.osw.cep.ex1.ProcessFailureEvent.win:time(1 minutes)
The three EQL queries are essentially the same, calculating how many events of each type were processed each minute. Finally, the GraphListener
code is as follows:
package packtpub.osw.cep.ex1; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import net.esper.client.UpdateListener; import net.esper.event.EventBean; /** * Simple listener that prints the event to console. */ public class GraphListener implements UpdateListener { public static List graphValues = new ArrayList(50); public void update(EventBean[] newEvent, EventBean[] oldEvent) { for (int i = 0; i < newEvent.length; i++) { EventBean bean = newEvent[i]; Map eventMap = (Map)bean.getUnderlying(); GraphData gd = new GraphData(new Date(), (Long)eventMap.get("value"), (String)eventMap.get("name")); if(graphValues.size() > 50) { synchronized (graphValues) { System.out.println("removing"); graphValues.remove(0); } } graphValues.add(gd); } } } class GraphData { private Date timestamp; private double value; private String category; /** * Constructor. * @param timestamp * @param value * @param category */ GraphData(Date timestamp, double value, String category) { super(); EsperSLA thru KPIs, measuringthis.timestamp = timestamp; this.value = value; this.category = category; } /** * @return the category */ public String getCategory() { return category; } /** * @return the timestamp */ public Date getTimestamp() { return timestamp; } /** * @return the value */ public double getValue() { return value; } }
This GraphListener
code is invoked when the pattern is matched. It communicates with the charting component through the use of a shared data structure called graphValues
. The main section of the code is as follows:
package packtpub.osw.cep.ex1; import java.awt.Rectangle; import java.awt.image.BufferedImage; import java.util.*; import javax.swing.*; import net.esper.client.*; import org.jfree.chart.*; import org.jfree.data.time.*; import com.opensymphony.workflow.Workflow; import com.opensymphony.workflow.basic.BasicWorkflow; import com.opensymphony.workflow.config.*; import packtpub.osw.cep.CEPWorkflowTest; /** * CEP Example1 */ public class Example1 { public static void main(String[] args) { setupEngine(); Workflow workflow = new BasicWorkflow("test"); Configuration config = new DefaultConfiguration(); workflow.setConfiguration(config); try { while(true) { long workflowId = workflow.initialize("cep-example1", 100, Collections.EMPTY_MAP); workflow.doAction(workflowId, 1, Collections.EMPTY_MAP); graph(); } } catch (Exception e) { e.printStackTrace(); } } private static void setupEngine() { EPServiceProvider epService = EPServiceProviderManager .getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); //creation rate EPStatement eql = admin.createEQL("select 'creation' as name,count(*) as value from packtpub.osw.cep.ex1. ProcessCreationEvent.win:time (1 minutes)"); //completion rate EPStatement eql2 = admin.createEQL("select 'completion' as name,count(*) as value from packtpub.osw.cep.ex1. ProcessCompletionEvent. win:time(1 minutes)"); //failure rate EPStatement eql3 = admin.createEQL("select 'failure' as name,count(*) as value from packtpub.osw.cep.ex1. ProcessFailureEvent.win:time (1 minutes)"); GraphListener pl = new GraphListener(); eql.addListener(pl); eql2.addListener(pl); eql3.addListener(pl); } private static JFrame jf; private static JLabel lblChart; private static void graph() { if (jf == null) { setupGraphicSubsystem(); } TimeSeriesCollection categoryDataset = setupChartData(); JFreeChart chart = ChartFactory.createTimeSeriesChart ("Event Chart", // Title "KPIs", // X-Axis label "Number of Event", // Y-Axis label categoryDataset, // Dataset true // Show legend, true, true); BufferedImage image = chart.createBufferedImage(500, 300); lblChart.setIcon(new ImageIcon(image)); } private static void setupGraphicSubsystem() { jf = new JFrame(); jf.setBounds(new Rectangle(600, 400)); lblChart = new JLabel(); jf.add(lblChart); jf.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); jf.setVisible(true); } private static TimeSeriesCollection setupChartData() { TimeSeriesCollection categoryDataset = new TimeSeriesCollection(); TimeSeries tsc = new TimeSeries("Creation", Second.class); TimeSeries tsf = new TimeSeries("Failure", Second.class); TimeSeries tsp = new TimeSeries("Completion", Second.class); categoryDataset.addSeries(tsp); categoryDataset.addSeries(tsc); categoryDataset.addSeries(tsf); for (Iterator iter = GraphListener.graphValues.iterator(); iter.hasNext();) { GraphData data = (GraphData) iter.next(); if (data.getCategory().equalsIgnoreCase("Creation")) { tsc.addOrUpdate(new Second(data.getTimestamp()), data.getValue()); } if (data.getCategory().equalsIgnoreCase("Failure")) { tsf.addOrUpdate(new Second(data.getTimestamp()), data.getValue()); } if (data.getCategory().equalsIgnoreCase("Completion")) { tsp.addOrUpdate(new Second(data.getTimestamp()), data.getValue()); } } return categoryDataset; } }
The main code first sets up the engine, and then it generates new Workflow
instances until the JVM is closed. Every time a new Workflow
is created, a new graph with information about the last 50 events is displayed.
This code uses the open-source charting library JFreeChart to create a TimeSeries
graph, shown in the following screenshot:
The graph shows the number of events arriving each second (the graph scales automatically).
This example explained the integration between a BPMS and a CEP engine; it can be extended to handle more complex queries. For more information on JFreeChart refer to its website at http://www.jfree.org/jfreechart/.
This chapter covered Esper, a CEP implementation, with BPMS-enabled real‑time processing for business process events. The Esper distribution comes with several other examples to evaluate the potential of this new technology.
In the next chapter we'll introduce Pentaho, a very powerful open-source reporting solution, and then we will integrate it with OSWorkflow to generate information charts and reports based on the business process data.