A typical messaging requirement is to aggregate multiple related messages for processing within a single BPEL process instance. There are two parts to this recipe; the first is to route related messages through to the same instance of a BPEL process. This can be achieved using a correlation set defined against a common value present in each message.
The second is to determine when we have all the messages that belong to the aggregation. Typically, most use cases fall into two broad patterns:
An example of the first pattern is an order aggregation process, whereby we aggregate the orders we receive for a particular book over a period of time (for example, 24 hours) and then place a single order with the publisher for the total amount of orders received.
This recipe makes use of the new aggregation feature in Oracle SOA Suite 11gR1 Patch Set 5 (11.1.1.6.0), so you need to ensure that you have installed either this or a later release of the Oracle SOA Suite.
Create an SOA application with a project containing an empty composite (named WarehouseService
in the following example).
For the purpose of following this example, select the WarehouseService_1.0.wsdl
node included with the samples for this chapter.
Ensure Warehouse Service is selected as the Port Type, and click on OK. This will add BPEL Process Warehouse Service to our composite.
bpel.config.reenableAggregationOnComplete
, and in the Value field, enter true
; then, click on OK.While
loop to process our messages. Create an xsd:boolean variable named orderComplete
and use an Assign activity to set it to false()
.Create a variable named waitUntil
, of type xsd:dateTime, and use an Assign activity to set it to the following value:
xp20:add-dayTimeDuration-to-dateTime(xp20:current-dateTime(), 'PT1H')
loop
condition to the following value:$orderComplete = false()
For Partner Link, select the same partner link used by the initial receive activity (warehouseservice_client
, in this example), and select the same operation (submitBookOrder
).
Click on the auto-create variable (the plus icon) to launch the Create Variable window and give the variable a meaningful name (for example, nextBookOrder
). Next, click on OK.
Add an expression to increment the quantity of the original order received, by the quantity of the new order; in other words:
origOrder/quantity = origOrder/quantity + nextOrder/quantity
The XPath expression, should look something like the following:
$inputVariable.payload.ns2:bookOrder/ns2:quantity + $nextBookOrder.payload.ns2:bookOrder/ns2:quantity
Set the first radio button to Until, and set the second radio button to Expression; then, set the expression value to be $waitUntil.
true()
to $orderComplete
.This will launch the Create CorrelationSet Property window. Give the property a meaningful name (for example, isbn
), and then click on the search icon to launch the Type Chooser window and select the appropriate schema type (for example, xsd:string
).
This will open the Property Alias window. In the Type Explorer window, expand the WarehouseService_1.0.wsdl
node (under the Project WSDL Files folder). Next, expand the Message Types folder and select Part - payload (under submitBookOrder
) as highlighted in the following screenshot:
submitBookOrder
message, which is:/tns: ns1:bookOrder/ns1:isbn
Then click on OK.
This will launch the Create Correlation Set window; give the correlation set a meaningful name, for example, isbnCS
.
isbn
property created in step 12.WarehouseService
composite to the Oracle SOA Suite and use Enterprise Manager to submit multiple submitBookOrder
messages.For book orders that contain the same ISBN number, you should see that they are routed through to the same instance of the BPEL process.
Since the 11.1.1.6 release of the Oracle SOA Suite, Oracle BPEL Process Manager has supported a message aggregation feature. When multiple messages are routed to the same process, the first message is routed to create a new instance and subsequent messages can be routed to continue the created instance using a mid-process receive activity.
By default, this feature is disabled. To enable it, we need to set the property bpel.config.reenableAggregationOnComplete
to true.
Once we have enabled aggregation, we still have to define the key that the BPEL engine should use in order to aggregate messages. For this, we use a correlation set (isbnCS
, in the previous example) that consists of one or more properties (isbn
in our case). These properties are then mapped, using a property alias, to the corresponding field in the messages that are being aggregated.
The combined value of these properties at runtime should result in a unique value (at least, unique across all instances of the same process) that allows the BPEL engine to route the message to the appropriate instance of a process.
On receipt of the first message in the aggregation, we need to tell the BPEL process to initialize the correlation set, which we did by setting the initiate property to true
(in step 19). This then tells the BPEL engine to route all other messages that contain the same value to this instance of the BPEL process.