Chapter 7. Complex Event Processing

This chapter introduces some state-of-the-art technologies like Event Stream Processing (ESP) and Complex Event Processing (CEP) and their applications in BPMs. We will look at an OSWorkflow function provider that interfaces with the Esper CEP engine and allows monitoring of real-time process information and events. This chapter assumes basic knowledge of SQL and the relational data model concepts.

Complex Event Processing (CEP)

CEP is a relatively new technology to process events and discover complex patterns inside streams of events. CEP engines are also known as Event Stream Processing (ESP) engines. Events can be anything that happens outside or inside your application. These events contain data about the business situations that occurred and information about the data (also known as metadata). A sequence of events from the same source is called an event stream.

By processing the event streams with business-defined patterns, you can detect and react to business situations in real time. For example, you can monitor a financial stock index in real time. If it reaches certain values within an hour, you can react to this by selling a percentage of the stock. The threshold values are defined in a CEP pattern. This pattern tells the CEP engine how to detect and react to this event.

Patterns and Data

CEP technologies use patterns to match events inside the stream and then watch the stream for matches of those patterns. The event stream flows through a pattern chain. If a pattern matches with an event, the engine is notified. This flow of information is the opposite of traditional data handling, such as SQL databases, in which the data is stored and then the patterns (SQL queries) are run over the data to select the relevant data that matches the pattern. This flow is better understood visually.

The following figure displays the event stream as an ongoing flow through which the patterns match the relevant events.

Patterns and Data

It's common to confuse CEP technology with rule engine technology, such as we saw in Chapter 5. This is because patterns are like rules, in the sense they both have a condition and a consequence.

The difference between the two technologies lies in the temporal component of CEP. This allows the patterns to use time as another dimension. CEP technology enables patterns such as: "select event A if the previous event was B". This would be awkward and difficult to implement in a rule engine. Also, a CEP engine is optimized for real-time or near real-time processing of high volumes of event data.

CEP in BPM

A BPM is an ideal place to implement a CEP engine.

Business events exists naturally in a continuous stream; they happen all the time in the real world and to have a real-time response to them, we must have real-time processing capabilities.

A BPMS naturally identifies its events and it reacts in a manner similar to the workflow—if a sale is closed, the workflow engine will finish this sale workflow instance. So, the event input is provided by all BPM suites. The CEP engine, taking advantage of this event input provided by the BPMS, gives them pattern matching and real-time processing capabilities.

A CEP engine is a new but very useful component of a BPMS and it enables the BPMS to react in real time to relevant business events. Additionally, it provides real-time Business Activity Monitoring to important business situations and especially to business exceptions.

In addition to BPM event processing, real-world uses of CEP include fraud detection, automatic stock trading, SLA and KPI monitoring, and sensor applications (RFID, etc.).

What is Esper?

Esper is an open-source Java CEP engine. Esper, like OSWorkflow needs to be embedded inside your application, it doesn't run in standalone mode. Esper is optimized for the processing of very large volume of events in real time.

The Esper Architecture

This section describes the three most important concepts in Esper architecture: events, patterns, and listeners.

Esper, like every CEP engine analyzes an event stream and detects relevant events, matching them with user-defined patterns. For Esper, these events are regular JavaBeans and the patterns are represented in EQL. EQL is a special language designed for pattern matching and its syntax is very similar to that of SQL.

The patterns in EQL are registered in the Esper engine and when an event arrives, it is checked against all the active patterns. What happens when one or more patterns match the event? The Esper engine notifies the listeners of the event's matching patterns.

This mechanism is analogous to the observer design pattern. This enables a nice decoupling between the event and the event listeners. Each pattern can have many listeners. The listeners are free to do what they want when notified of an event occurrence—they can print the event information, send an email, etc.—as listeners are made of regular Java code. The following figure displays these interactions in an ordered fashion for better understanding.

The Esper Architecture

Downloading and Installing Esper

You can download Esper from its web page located at http://esper.codehaus.org/ and can obtain help and support from the user mailing list. To install Esper, all you need to do is download the Esper distribution and unpack it.

Note

Esper needs JDK 5 or a higher version to run.

Hello World from Esper

To create our Hello World example we need three things—an event class, a pattern to match events, and a listener to notify when the pattern matches. These basic building blocks are depicted in the following figure:

Hello World from Esper

The figure shows these interactions in a numbered sequence. We will first create our event class. Remember that Esper sees events as common POJOs with JavaBean-like properties. This is a very non-intrusive approach; you don't need to couple your application to any Esper API. Additionally, Esper also supports maps and XML elements as events.

A very simple event class is shown in the following code snippet:

package packtpub.osw.cep;
/**
* Sample event class for CEP.
*/
public class Deposit
{
private float amount;
private String customerId;
/**
* @return the customerId
*/
public String getCustomerId()
{
return customerId;
}
/**
* @param customerId the customerId to set
*/
public void setCustomerId(String customerId)
{
this.customerId = customerId;
}
/**
* @return the amount
*/
public float getAmount()
{
return amount;
}
/**
* @param amount the amount to set
*/
public void setAmount(float amount)
{
this.amount = amount;
}
}

The Java class shown in this code has two JavaBean properties, namely amount and customerId, and the two respective pairs of JavaBean getters and setters. These properties are handled by Esper using Java's reflection. Next, we need the pattern that the engine will match with the event data. The pattern is just a string containing the following EQL:

Every A=packtpub.osw.cep.Deposit

This pattern is very simple. It tells the engine that for every arrived event of type Deposit, it will notify our listener. The Every keyword ensures that Esper will notify not just the first time but every time. No other criterion needs to be matched. The A= token pair is a variable assignment and the packtpub.osw.cep.Deposit is our JavaBean class name.

Following the pattern, we need the listener. The listener for this example only prints the event information when notified. Listeners need to implement the net.esper.client.UpdateListener interface as shown in the PrintListener class:

package packtpub.osw.cep;
import java.util.Arrays;
import net.esper.client.UpdateListener;
import net.esper.event.EventBean;
/**
* Simple listener that prints the event to console.
*/
public class PrintListener implements UpdateListener
{
public void update(EventBean[] newEvent, EventBean[] oldEvent)
{
System.out.println(Arrays.toString(newEvent));
}
}

The code shows the PrintListener class that implements the UpdateListener interface provided by Esper. This listener interface has only one method, the update() method, which is invoked when a pattern matches an event occurrence.

This method receives two EventBean arrays. The first one is an array of the new events and the other is an array of the events that have already been notified to this listener. Esper stores them in memory, which is known as an event window. You can access this window by using the old events array.

The EventBean class is a wrapper class provided by Esper for all type of events.

Finally, to execute the example we need a class with a main method, to set up the Esper engine and send it some events. The following code shows this class:

package packtpub.osw.cep;
import net.esper.client.EPAdministrator;
import net.esper.client.EPRuntime;
import net.esper.client.EPServiceProvider;
import net.esper.client.EPServiceProviderManager;
import net.esper.client.EPStatement;
/**
* CEP Example test class.
*/
public class CEPTest
{
public static void main(String[] args)
{
EPServiceProvider epService =
EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
EPStatement pattern = admin.createPattern("every A=
packtpub.osw.cep.Deposit");
PrintListener listener = new PrintListener();
pattern.addListener(listener);
Deposit d = new Deposit();
d.setAmount(100);
d.setCustomerId("123");
EPRuntime runtime = epService.getEPRuntime();
runtime.sendEvent(d);
}
}

The class shown in the code snippet consists of only a main method with all the code needed to set up Esper and a Deposit JavaBean and send it to the engine.

First, it generates an EPServiceProvider, which is an administrative Esper object. From this provider, we get the EPAdministrator and the EPRuntime. The first is responsible for creating patterns and EQL queries, and the second is a gateway to the engine, for passing events to it.

The EPAdministrator.createPattern(String pattern) method generates an EPStatement with the pattern being passed as argument. The EPStatement type is a common superclass for EQL and pattern expression. The registered pattern becomes active right away.

Then our PrintListener is instantiated and associated to the pattern with the EPStatement.addListener(UpdateListener listener) method. The listener becomes active right away. As soon as the pattern matches an event, the update() method of the listener class will be called.

Finally, this class generates a Deposit instance and sends it to the engine via the sendEvent() method of the EPRuntime object. Remember, that our Deposit object is a POJO that also serves as an event for Esper. The pattern is matched every time an object of the class Deposit arrives, so the PrintListener update method is invoked printing the event data to the console.

This is a very simple example but serves as a basic block for any use of Esper inside your applications.

Patterns expressions and EQL

Esper makes the distinction between patterns expressions and EQL queries. Both are enclosed in what Esper calls a statement.

Patterns expressions begin with the keyword every, that is, they are activated every time the expression matches an event. EQL queries begin with select or insert keywords.

The EPAdministrator.createPattern(String pattern) and EPAdministrator.createEQL(String query) methods both return an instance of an EPStatement, a superclass for both pattern expressions and EQL queries.

Push and Pull Mode

The Esper engine supports two usage modes: the push mode and the pull mode. The push mode is like the example we just implemented. The engine notifies (pushes to) the listener when an event matching the pattern arrives.

On the other hand, the pull mode queries the engine any time (not only at the time an event arrives) to get the relevant events matching a pattern or query. This mode works by first registering the pull pattern or query in the engine. The pull mode is useful for periodically polling the engine's data and when we are not interested in every pattern match event. The following figure illustrates this mechanism:

Push and Pull Mode

In the figure you can see the Query Puller as the element that retrieves the event data from the EQL Query. Esper allows for both push and pull usage modes with EQL and pattern expressions.

Every EPStatement has the iterator() method. This method returns a java.util.Iterator to be able to visit the pattern or EQL data. This is basically the way the pull mode is implemented in Esper.

Let's modify the example a bit to see how the pull mode works. We'll use the pull mode to ask the engine for the output data of the pattern expression.

The EQL pattern in the following example is the same as before but our test class needs a few changes:

package packtpub.osw.cep;
import java.util.Iterator;
import net.esper.client.*;
import net.esper.event.EventBean;
/**
* CEP Pull model example.
*/
public class CepCountTest
{
public static void main(String[] args)
{
EPServiceProvider epService =
EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
EPStatement pattern = admin.createPattern("every A=
packtpub.osw.cep.Deposit");
Deposit d = new Deposit();
d.setAmount(100);
d.setCustomerId("123");
EPRuntime runtime = epService.getEPRuntime();
runtime.sendEvent(d);
Iterator<EventBean> it = pattern.iterator();
System.out.println(it.next().get("A"));
}
}

This time there's no listener registered for the pattern, instead a EPStatement.iterator() method is invoked, for which Esper API returns a java.util.Iterator with the pulled pattern event data.

When you run this code, it will print the number of times an event of type Deposit arrived.

EQL Queries

In the earlier examples, Esper matched the event that arrived with pattern expressions. These expressions are suited for inspection and detection of events; if we want to benefit fully from the CEP technology we must be able to make queries and manipulate the event streams. Esper uses its own proprietary EQL language to both query and handle event streams.

The EQL language is very similar to the SQL standard, especially in the use of the select and insert statements. But contrary to SQL, the EQL from clause applies to event streams and not to relational tables. EQL has several shared characteristics with SQL such as aggregated functions, alias, where conditions, order by statements, etc.

Like the pattern expression, EQL statements, when matched by the engine, call the respective UpdateListeners with the event data extracted from the event stream.

For example, the following pattern expression sends an event named A to the UpdateListener:

Every A=packtpub.osw.cep.Deposit

This event named A is of type Deposit. When working with EQL queries, the same behavior applies:

Select * from packtpub.osw.cep.Deposit

This query feeds the whole bean to the UpdateListener. The select clause must be expressed in the EQL Query; it is not optional. The * symbol after the select clause tells Esper to send every property of the bean to the listener. The from indicates the event stream from which to get the event data.

This is a very powerful feature. You can join events from different event streams. It is very useful for the BPM engine, as you can have every business process in its own stream, and join them when necessary for complex computations.

Remember that the EPAdministrator uses distinct methods for creating pattern expressions (the createPattern() method) and EQL queries (the createEQL() method). Both methods receive a string parameter containing the pattern expression or the EQL query.

You can specify the engine to get more detailed information as shown in the following EQL query:

Select customerId from packtpub.osw.cep.Deposit

This query selects only the customerId JavaBean property of the Deposit beans from the Deposit event stream.

What happens if you want to select some but not all events from the event stream? The familiar SQL operator where enters in action. The where is an optional clause in EQL queries. The following query restricts the events of type Deposit, to those that have amounts over 1,000 units.

Select customerId from packtpub.osw.cep.Deposit where amount > 1000

Esper has a handful of operators available in the where clause as shown in the following table:

Operator

Description

=

Equals

<

Less than

>

Greater than

>=

Greater or equal than

<=

Less or equal than

!=

Not

< >

Distinct from

Is null

The expression preceded is null

Is not null

The expression preceded is not null

In

Restricts the range to the values expressed inside the clause

Between

Restricts the range between two values

Like

Restricts the range to values similar to the like expression

Regexp

Restricts the range to values that match the regular expression assigned

EQL also has the alias capability of SQL. For example the first example can be rewritten like this:

Select customerId as cId from packtpub.osw.cep.Deposit as d

In this EQL query, the customerId property is renamed to cId and the Deposit event stream is named to d. The alias feature is useful for simplifying and shortening queries, making them more readable.

Group By

EQL also supports the group by operator. It has the same behavior as that of its SQL counterpart—it groups a column's similar result data into one record:

Select customerId, sum(amount) from packtpub.osw.cep.Deposit group by
customerId

This EQL query groups the event data by the customer identification number. The listener receives the identification and the sum of all the amounts of each customer. The output of our PrintListener is as follows:

…
--------------------------
sum(amount):200.0
customerId:1
--------------------------
sum(amount):200.0
customerId:2
--------------------------
sum(amount):200.0
customerId:3
--------------------------
sum(amount):200.0
customerId:4
--------------------------
sum(amount):200.0
customerId:5
--------------------------
sum(amount):200.0
customerId:6
--------------------------
sum(amount):200.0
customerId:7
--------------------------
sum(amount):200.0
customerId:8
--------------------------
sum(amount):200.0
customerId:9
--------------------------
…

This query doesn't act over the whole event stream. On the contrary, it groups only one event's data! This is because, as each event arrives, the EQL query is executed upon it. Thus the result is the grouping of just one record, and not of all the records that have arrived so far.

To group a set of events the way SQL does, you must specify an event output rate. To define event output rate in EQL a special and new operator is needed; the output by operator serves this purpose.

The following EQL notifies the listener after every two events, thus generating the desired effect:

Select customerId, sum(amount) from packtpub.osw.cep.Deposit group by
customerId output every 2 events

The EQL's output every 2 events clause controls the rate of information sent to the listener. The output is what you may expect:

…
sum(amount):100.0
customerId:1
sum(amount):100.0
customerId:2
sum(amount):100.0
customerId:3
sum(amount):100.0
customerId:4
sum(amount):100.0
customerId:5
sum(amount):100.0
customerId:6
sum(amount):100.0
customerId:7
sum(amount):100.0
customerId:8

The PrintListener's output shows the grouping of two events each time (this output is rendered with two rounds of identical events).

Time and Event Windows

Esper supports time and event windows for EQL queries and pattern expressions. This enables a very flexible handling of events and supports advanced use cases for CEP. Esper restricts events with time delimiters or event number delimiters.

For example, you can specify an EQL query to have effect over the last four events regardless of the time they arrived. We have modified the previous query:

Select customerId, amount from packtpub.osw.cep.Deposit.win:length(4)

The window length is defined after the. next to the event stream name. The win:length(4) signals Esper to make a window of four events in which the EQL will take effect. Besides event window, you can use a time window.

For example, the following EQL acts on the last four minutes of events regardless of the number of events:

Select customerId, amount from packtpub.osw.cep.Deposit.win:time
(4 minutes)

Both time length and event number restrictions support a batch mode, in which the events arrive in a batch to the listener. The previous two examples would look as follows:

Select customerId, amount from
packtpub.osw.cep.Deposit.win:length_batch(4)
Select customerId, amount from
packtpub.osw.cep.Deposit.win:time_batch(4 minutes)

The event has an element number parameter and the time has a time period parameter. Examples of time period parameters are as follows:

15 seconds
90 minutes 30 seconds
1 day 2 hours 20 minutes 15 seconds 110 milliseconds

These examples are self descriptive. The syntax of time windows is as follows:

time-period : [day-part] [hour-part] [minute-part] [seconds-part] [milliseconds-part]

The time and event windows are very useful tools in CEP solutions.

Aggregate Functions

Just like SQL, Esper provides aggregates functions to summarize and perform calculations on the event stream data. The aggregate functions are listed in the following table.

Aggregate function name

Description

sum(expression)

Returns the sum of the values of the expression

avg(expression)

Returns the average of the values in the expression

count(expression)

Returns the number of non-null values in the expression

count(*)

Returns the number of events

max(expression)

Returns the highest value in the expression

min(expression)

Returns the lowest value in the expression

median(expression)

Returns the median value in the expression

stddev(expression)

Returns the standard deviation of the values in the expression

avedev(expression)

Returns the mean deviation of the values in the expression

All aggregate functions listed in this table except count(*) support the distinct modifier. This changes the behavior in some cases. Consult the Esper documentation for more details on the aggregate functions.

This section finishes the EQL and pattern expression overview. For a more in-depth reference to EQL or pattern expressions refer to the Esper official documentation.

Integration with OSWorkflow

This section discusses the Esper integration with OSWorkflow to process business event data and react with user-defined patterns.

Esper Function Provider

To enable Esper functionality inside an OSWorkflow definition, we must create a way to emit events from OSWorkflow into the Esper engine.

OSWorkflow can provide this extension via a custom FunctionProvider. This FunctionProvider will emit events into the engine. The code for the class is as follows:

package packtpub.osw.cep;
import java.util.Map;
import net.esper.client.*;
import com.opensymphony.module.propertyset.PropertySet;
import com.opensymphony.util.BeanUtils;
import com.opensymphony.workflow.FunctionProvider;
import com.opensymphony.workflow.WorkflowException;
/**
* Function provider that sends an event to the esper engine.
*/
public class PushEventFunctionProvider implements FunctionProvider
{
public void execute(Map transientVars, Map args, PropertySet ps)
throws WorkflowException
{
// the event POJO class name to be instantiated by reflection
String eventClassName = (String) args.get("event.class.name");
Object event;
try
{
event = Class.forName(eventClassName).newInstance();
} catch (Exception e)
{
throw new WorkflowException(e);
}
//invokes the setters of the event with the argument map data.
// if a setter name is equals to a map key, it will be called
// with the key's value as a parameter.
// f.e.: if the arguemnt Map contains a key "name" with value
// "john"
// and the event bean has a method called setName, it will be
// invoked
// by reflection in the following way: setName("john").
// See the javadocs for more detail.BeanUtils.setValues(event,
// args, null);
// gets the default provider and the default runtime for this
// JVM.
EPServiceProvider epService = EPServiceProviderManager
.getDefaultProvider();
EPRuntime runtime = epService.getEPRuntime();
runtime.sendEvent(event);
}
}

This code creates an event of the type passed by the argument to the FunctionProvider and fills its properties with similar matching keys in the transientVars map to send it to the engine. For example, if the bean being instantiated has a long JavaBean property named amount and the transientVars map has a similarly named key, the contents (if the type matches) of the key's value are set into the amount property of the JavaBean.

As a regular FunctionProvider it can be placed in pre and post functions, in a step or an action. A snippet of the XML workflow definition is as follows:

<post-functions>
<function type="class">
<arg name="class.name">packtpub.osw.cep.PushEventFunctionProvider
</arg>
<arg name="event.class.name">packtpub.osw.cep.Deposit
</arg>
<arg name="amount">${amount}
</arg>
<arg name="customerId">${customerId}
</arg>
</function>
</post-functions>

The post-functions element of the snippet has only one function, the PushEventFunctionProvider defined in the class.name argument as usual. The amount and customerId argument names are passed to the FunctionProvider via the transientVars map. Thus ending up in the event.class.name (the JavaBean event) specified as packtpub.osw.cep.Deposit.

Remember that the FunctionProvider arguments are subject to variable interpolation. Finally, a code fragment for testing the functionality of the FunctionProvider is as follows:

Map inputs = new HashMap();
inputs.put("customerId", "123");
inputs.put("amount", new Float(10000f));
long workflowId = workflow.initialize("cep", 100,
Collections.EMPTY_MAP);
workflow.doAction(workflowId, 1, inputs);

This fragment fills an inputs map with the customerId and amount values, and then creates a new workflow of type cep and executes action number 1 sending the inputs map along. This example assumes you have already set up the Esper engine elsewhere in the code, as well as the patterns and corresponding listeners.

Real-World Examples

This section shows examples of CEP within a BPM environment.

Event-based Mail Alerts

After integration with OSWorkflow, the work that is left to implement the CEP solution over a BPMS is to generate the appropriate patterns and listeners. For example, with a SendMailListener we can implement an event-based alert. Additionally, if the pattern is an EQL query, we can use it to mail process performance indicators to a manager in a real-time fashion.

package packtpub.osw.cep;
import java.util.Properties;
import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import net.esper.client.UpdateListener;
import net.esper.event.EventBean;
/**
* This listener send an email with the current data each time
* in each invocation.
*/
public class SendMailListener implements UpdateListener
{
private String host;
private String to;
private String from;
private String subject;
private String bodyPrefix;
/**
* Constructor.
* @param host
* @param to
* @param from
* @param subject
* @param bodyPrefix
*/
public SendMailListener(String host, String to, String from, String
subject, String bodyPrefix)
{
super();
this.host = host;
this.to = to;
this.from = from;
this.subject = subject;
this.bodyPrefix = bodyPrefix;
}
public void update(EventBean[] newData, EventBean[] oldData)
{
sendMail(host, to, from, subject, bodyPrefix + "
" + newData);
}
private void sendMail(String host, String to, String from, String
subject, String body)
{
Properties config = new Properties();
config.put("mail.host", host);
Session session = Session.getDefaultInstance(config);
MimeMessage message = new MimeMessage(session);
try
{
message.addRecipient(Message.RecipientType.TO, new
InternetAddress(to));
message.addFrom(new InternetAddress[]
{ new InternetAddress(from) });
message.setSubject(subject);
message.setText(body);
Transport.send(message);
} catch (Exception e)
{
e.printStackTrace();
}
}
}

Example 1—Measuring SLA through KPIs

This example uses Esper as a performance monitor component of a BPMS solution. The aggregate functions such as count, avg, and sum within Esper EQL queries allow a business manager to monitor the Service Level Agreement (SLA) of a business process. In this example, we make use of the SendMailListener features.

An SLA in BPMS is defined in terms of the following metrics or KPIs:

  • Number of new process instances: Arrival rate per minute

  • Completion time of process instances: Completion rate per minute

  • Number of errors in process instances: Failure rate per minute

These metrics are sent every minute to a manager by mail. You can easily change this setting by modifying the time. The SLA information is better understood visually, for which a charting feature will be implemented.

Brief Overview of the Example

This example has the following parts:

  • Previous section's FunctionProvider

  • EQL queries, custom Listener, and SendMailListener

  • Charting component

This example uses the FunctionProvider defined in the last section. This FunctionProvider will send events to the Esper engine during the creation, completion, and error steps of a business process. The events will be sent to the registered listeners. There is one event class for each type of event:

  1. The ProcessCreationEvent class

  2. The ProcessCompletionEvent class

  3. The ProcessErrorEvent class.

There is an EQL query for each type of event, calculating the statistics needed for the KPI's values. The same listener is registered for these three. This listener passes the values to the Charting component, for displaying them to the user.

The Charting component uses the open-source charting library JFreeChart to graph the KPIs.

Detailed Explanation

We will use a very basic sample workflow definition in the example. This definition is the one used previously on the Hello World section of this chapter. We have modified it to send more events to the engine during its lifespan and to use the event type we talked about in the previous section. The XML definition is as following:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE workflow PUBLIC "-//OpenSymphony Group//DTD OSWorkflow 2.6//EN" "http://www.opensymphony.com/osworkflow/workflow_2_8.dtd">
<workflow>
<initial-actions>
<action id="100" name="Start Workflow">
<post-functions>
<function type="class">
<arg name="class.name">
packtpub.osw.cep.PushEventFunctionProvider
</arg>
<arg name="event.class.name">
packtpub.osw.cep.ex1.ProcessCreationEvent
</arg>
</function>
</post-functions>
<results>
<unconditional-result old-status="Finished" status="Underway"
step="1" />
</results>
</action>
</initial-actions>
<steps>
<step id="1" name="Sending event">
<actions>
<action id="1" name="Finish First Draft">
<results>
<unconditional-result old-status="Finished"
step="2" status="Underway">
<post-functions>
<function type="class">
<arg name="class.name">
packtpub.osw.cep.PushEventFunctionProvider
</arg>
<arg name="event.class.name">
packtpub.osw.cep.ex1.ProcessFailureEvent
</arg>
</function>
</post-functions>
</unconditional-result>
</results>
</action>
</actions>
</step>
<step id="2" name="Edit Doc">
<post-functions>
<function type="class">
<arg name="class.name">
packtpub.osw.cep.PushEventFunctionProvider
</arg>
<arg name="event.class.name">
packtpub.osw.cep.ex1.ProcessCompletionEvent
</arg>
</function>
</post-functions>
</step>
</steps>
</workflow>

This definition uses the FunctionProvider to send events to Esper as soon as it is created (ProcessCreationEvent), a sample ProcessFailureEvent on the completion of the first step, and a ProcessCompletionEvent when it finishes. This allows us to test all three events.

The EQL queries that make up the KPIs are as follows:

select 'creation' as name,count(*) as value from packtpub.osw.cep.ex1.ProcessCreationEvent.win:time(1 minutes)
select 'completion' as name,count(*) as value from packtpub.osw.cep.ex1.ProcessCompletionEvent.win:time(1 minutes)
select 'failure' as name,count(*) as value from packtpub.osw.cep.ex1.ProcessFailureEvent.win:time(1 minutes)

The three EQL queries are essentially the same, calculating how many events of each type were processed each minute. Finally, the GraphListener code is as follows:

package packtpub.osw.cep.ex1;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import net.esper.client.UpdateListener;
import net.esper.event.EventBean;
/**
* Simple listener that prints the event to console.
*/
public class GraphListener implements UpdateListener
{
public static List graphValues = new ArrayList(50);
public void update(EventBean[] newEvent, EventBean[] oldEvent)
{
for (int i = 0; i < newEvent.length; i++)
{
EventBean bean = newEvent[i];
Map eventMap = (Map)bean.getUnderlying();
GraphData gd = new GraphData(new Date(),
(Long)eventMap.get("value"),
(String)eventMap.get("name"));
if(graphValues.size() > 50)
{
synchronized (graphValues)
{
System.out.println("removing");
graphValues.remove(0);
}
}
graphValues.add(gd);
}
}
}
class GraphData
{
private Date timestamp;
private double value;
private String category;
/**
* Constructor.
* @param timestamp
* @param value
* @param category
*/
GraphData(Date timestamp, double value, String category)
{
super();
EsperSLA thru KPIs, measuringthis.timestamp = timestamp;
this.value = value;
this.category = category;
}
/**
* @return the category
*/
public String getCategory()
{
return category;
}
/**
* @return the timestamp
*/
public Date getTimestamp()
{
return timestamp;
}
/**
* @return the value
*/
public double getValue()
{
return value;
}
}

This GraphListener code is invoked when the pattern is matched. It communicates with the charting component through the use of a shared data structure called graphValues. The main section of the code is as follows:

package packtpub.osw.cep.ex1;
import java.awt.Rectangle;
import java.awt.image.BufferedImage;
import java.util.*;
import javax.swing.*;
import net.esper.client.*;
import org.jfree.chart.*;
import org.jfree.data.time.*;
import com.opensymphony.workflow.Workflow;
import com.opensymphony.workflow.basic.BasicWorkflow;
import com.opensymphony.workflow.config.*;
import packtpub.osw.cep.CEPWorkflowTest;
/**
* CEP Example1
*/
public class Example1
{
public static void main(String[] args)
{
setupEngine();
Workflow workflow = new BasicWorkflow("test");
Configuration config = new DefaultConfiguration();
workflow.setConfiguration(config);
try
{
while(true)
{
long workflowId = workflow.initialize("cep-example1", 100,
Collections.EMPTY_MAP);
workflow.doAction(workflowId, 1, Collections.EMPTY_MAP);
graph();
}
} catch (Exception e)
{
e.printStackTrace();
}
}
private static void setupEngine()
{
EPServiceProvider epService = EPServiceProviderManager
.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
//creation rate
EPStatement eql = admin.createEQL("select 'creation' as
name,count(*) as value from
packtpub.osw.cep.ex1.
ProcessCreationEvent.win:time
(1 minutes)");
//completion rate
EPStatement eql2 = admin.createEQL("select 'completion' as
name,count(*) as value from
packtpub.osw.cep.ex1.
ProcessCompletionEvent.
win:time(1 minutes)");
//failure rate
EPStatement eql3 = admin.createEQL("select 'failure' as
name,count(*) as value from
packtpub.osw.cep.ex1.
ProcessFailureEvent.win:time
(1 minutes)");
GraphListener pl = new GraphListener();
eql.addListener(pl);
eql2.addListener(pl);
eql3.addListener(pl);
}
private static JFrame jf;
private static JLabel lblChart;
private static void graph()
{
if (jf == null)
{
setupGraphicSubsystem();
}
TimeSeriesCollection categoryDataset = setupChartData();
JFreeChart chart = ChartFactory.createTimeSeriesChart
("Event Chart", // Title "KPIs",
// X-Axis label "Number of Event",
// Y-Axis label categoryDataset,
// Dataset true // Show legend, true, true);
BufferedImage image = chart.createBufferedImage(500, 300);
lblChart.setIcon(new ImageIcon(image));
}
private static void setupGraphicSubsystem()
{
jf = new JFrame();
jf.setBounds(new Rectangle(600, 400));
lblChart = new JLabel();
jf.add(lblChart);
jf.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
jf.setVisible(true);
}
private static TimeSeriesCollection setupChartData()
{
TimeSeriesCollection categoryDataset = new
TimeSeriesCollection();
TimeSeries tsc = new TimeSeries("Creation", Second.class);
TimeSeries tsf = new TimeSeries("Failure", Second.class);
TimeSeries tsp = new TimeSeries("Completion", Second.class);
categoryDataset.addSeries(tsp);
categoryDataset.addSeries(tsc);
categoryDataset.addSeries(tsf);
for (Iterator iter = GraphListener.graphValues.iterator();
iter.hasNext();)
{
GraphData data = (GraphData) iter.next();
if (data.getCategory().equalsIgnoreCase("Creation"))
{
tsc.addOrUpdate(new Second(data.getTimestamp()),
data.getValue());
}
if (data.getCategory().equalsIgnoreCase("Failure"))
{
tsf.addOrUpdate(new Second(data.getTimestamp()),
data.getValue());
}
if (data.getCategory().equalsIgnoreCase("Completion"))
{
tsp.addOrUpdate(new Second(data.getTimestamp()),
data.getValue());
}
}
return categoryDataset;
}
}

The main code first sets up the engine, and then it generates new Workflow instances until the JVM is closed. Every time a new Workflow is created, a new graph with information about the last 50 events is displayed.

This code uses the open-source charting library JFreeChart to create a TimeSeries graph, shown in the following screenshot:

Real-World Examples

The graph shows the number of events arriving each second (the graph scales automatically).

This example explained the integration between a BPMS and a CEP engine; it can be extended to handle more complex queries. For more information on JFreeChart refer to its website at http://www.jfree.org/jfreechart/.

Summary

This chapter covered Esper, a CEP implementation, with BPMS-enabled real‑time processing for business process events. The Esper distribution comes with several other examples to evaluate the potential of this new technology.

In the next chapter we'll introduce Pentaho, a very powerful open-source reporting solution, and then we will integrate it with OSWorkflow to generate information charts and reports based on the business process data.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset