Usage example of Cascading Tube

Step 0 is pure cascading flow example Cascading.

Step 0.

Short description of method: it takes logs from one source and produces tree different reports

public FlowDef flow(Tap pv, Tap trans, Tap transToFirstSource, Tap transToLastSource, Tap transDistributedAmongSources) {
// agregation
Pipe transPipe = new Each("trans", new Fields("t_quantity", "t_itemCost"), new ExpressionFunction(new Fields("t_income"),
"(double) t_quantity * (double) t_itemCost", Double.class), Fields.ALL);
transPipe = new AggregateBy(transPipe, new Fields("t_userId", "t_transactionId"), new FirstBy("t_createTime"),
new SumBy(new Fields("t_quantity"), new Fields("t_quantity"), Integer.class),
new SumBy(new Fields("t_income"), new Fields("t_income"), Double.class));

// divide
Pipe pvPipe = new Pipe("pv");
pvPipe = new GroupBy(pvPipe, new Fields("p_userId"), new Fields("p_createTime"));
Pipe visitPipe = new Every(pvPipe, new GetVisit(selfReferred()), Fields.RESULTS);

// merge
Pipe crossTransWithVisits = new CoGroup(visitPipe, new Fields("p_userId"), transPipe, new Fields("t_userId"));
crossTransWithVisits = new Each(crossTransWithVisits, new Fields("start", "t_createTime"), new OlderThan());
crossTransWithVisits = new Checkpoint(crossTransWithVisits);

return FlowDef.flowDenew Fields().setName("ConversionAttribution").addSource(pvPipe, pv).addSource(transPipe, trans)
.addTailSink(assignTransactionToFirstVisit(crossTransWithVisits), transToFirstSource)
.addTailSink(assignTransactionToVisit(crossTransWithVisits), transToLastSource)
.addTailSink(distributeTransactionAmongVisits(crossTransWithVisits), transDistributedAmongSources);


Step 1a.

You can shorten your code by adding helper class with and use “f(_)” instead of “new Fields(_)”:

static public Fields f(String... names) {
return new Fields(names);

static public SumBy sum(String field, Class<?> resultType) {
return new SumBy(f(field), f(field), resultType);

static public SumBy sum(String field) {
return sum(field, Double.class);

static public SumBy sumInt(String field) {
return sum(field, Integer.class);

static public CountBy count(String outField) {
return new CountBy(f(outField));


new Fields(_) == > f(_)
new FirstBy(field) == > first(field)
new SumBy(new Fields("t_quantity"), new Fields("t_quantity"), Integer.class) == > sumInt("t_quantity")
new SumBy(new Fields("t_income"), new Fields("t_income"), Double.class) == > sum("t_income")

—– OR

Step 1b.

Or you can try to use Cascading Tube wrapper, that implements builder pattern upon cascading and makes your code shorter, concise, readable and intuitive?

(This method does exactly same thing as that above – it’s only rewritten to “Cascading Tube”)
Add some imports first


and put this method into your class

def flow(pv: Io, trans: Io, transToFirstSource: Io, transToLastSource: Io, transDistributedAmongSources: Io, transTrap: Io, pvTrap: Io) = {
val transPipe = Tube("trans")
.multiply("t_quantity", "t_itemCost", "t_income")
.aggregateBy(("t_userId", "t_transactionId"), first("t_createTime"), sumInt("t_quantity"), sum("t_income"))

val pvPipe = Tube("pv")
.groupBy("p_userId", "p_createTime")
.every(buffer = new GetVisit(selfReferred))

val crossTransWithVisits = Tube("crossVisitAndTrans", pvPipe).coGroup("p_userId", transPipe, "t_userId", new RightJoin)
.groupBy(("t_userId", "t_transactionId"), "start")
.every(buffer = new FillMisingMatchingVisit)
.filter(("start", "t_createTime"), new OlderThan)

FlowDef.flowDef().setName("ConversionAttribution").addSource(pvPipe, pv)
.addSource(transPipe, trans)
.addTailSink(assignTransactionToVisit(crossTransWithVisits, FIRST), transToFirstSource)
.addTailSink(assignTransactionToVisit(crossTransWithVisits, MATCHING), transToLastSource)
.addTailSink(distributeTransactionAmongVisits(crossTransWithVisits), transDistributedAmongSources)
.addTrap("trans", transTrap)
.addTrap("pv", pvTrap)

Which one do you like to read more?