Aggregation

Aggregation operation applies logic to the grouped rows of the DataSet after applying groupBy() by  some columns. groupBy() takes index of column which is used to aggregate the rows by and the aggregation operation takes index of the column to aggregate on.

Following command groups by Description and adds the Quantities for each Description and then prints first 10 records.

dataSet.map(x => (x.split(",")(2), x.split(",")(3).toInt))
.groupBy(0)
.sum(1)
.first(10).print()

This will print the contents of the DataSet once loaded as shown in the following code: 

 (,-2117)
(*Boombox Ipod Classic,1)
(*USB Office Mirror Ball,2)
(10 COLOUR SPACEBOY PEN,823)
(12 COLOURED PARTY BALLOONS,102)
(12 DAISY PEGS IN WOOD BOX,62)
(12 EGG HOUSE PAINTED WOOD,16)
(12 IVORY ROSE PEG PLACE SETTINGS,80)
(12 MESSAGE CARDS WITH ENVELOPES,238)
(12 PENCIL SMALL TUBE WOODLAND,444)

Following command groups by Description and adds the Quantities for each Description and then prints the top Description with maximum Quantity:

dataSet.map(x => (x.split(",")(2), x.split(",")(3).toInt))
.groupBy(0)
.sum(1)
.max(1)
.first(10).print()

This will print the contents of the DataSet once loaded as shown in the following code:

(reverse 21/5/10 adjustment,8189)

Following command groups by Description and adds the Quantities for each Description and then prints the top Description with minimum Quantity:

dataSet.map(x => (x.split(",")(2), x.split(",")(3).toInt))
.groupBy(0)
.sum(1)
.min(1)
.first(10).print()

This will print the contents of the DataSet once loaded as shown in the following code: 

(reverse 21/5/10 adjustment,-7005)

The sum() API is defined as follows:

// private helper that allows to set a different call location name
private AggregateOperator<T> aggregate(Aggregations agg, int field, String callLocationName) {
return new AggregateOperator<T>(this, agg, field, callLocationName);
}
/**
* Syntactic sugar for aggregate (SUM, field).
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the summed DataSet.
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> sum (int field) {
return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset