Tag: hadoop

  • Cascading Tube

    Usage example of Cascading Tube

    EXAMPLE I
    Step 0 is pure cascading flow example Cascading.

    Step 0.

    Short description of method: it takes logs from one source and produces tree different reports

    public FlowDef flow(Tap pv, Tap trans, Tap transToFirstSource, Tap transToLastSource, Tap transDistributedAmongSources) {
    // agregation
    Pipe transPipe = new Each("trans", new Fields("t_quantity", "t_itemCost"), new ExpressionFunction(new Fields("t_income"),
    "(double) t_quantity * (double) t_itemCost", Double.class), Fields.ALL);
    transPipe = new AggregateBy(transPipe, new Fields("t_userId", "t_transactionId"), new FirstBy("t_createTime"),
    new SumBy(new Fields("t_quantity"), new Fields("t_quantity"), Integer.class),
    new SumBy(new Fields("t_income"), new Fields("t_income"), Double.class));
    
    // divide
    Pipe pvPipe = new Pipe("pv");
    pvPipe = new GroupBy(pvPipe, new Fields("p_userId"), new Fields("p_createTime"));
    Pipe visitPipe = new Every(pvPipe, new GetVisit(selfReferred()), Fields.RESULTS);
    
    // merge
    Pipe crossTransWithVisits = new CoGroup(visitPipe, new Fields("p_userId"), transPipe, new Fields("t_userId"));
    crossTransWithVisits = new Each(crossTransWithVisits, new Fields("start", "t_createTime"), new OlderThan());
    crossTransWithVisits = new Checkpoint(crossTransWithVisits);
    
    return FlowDef.flowDenew Fields().setName("ConversionAttribution").addSource(pvPipe, pv).addSource(transPipe, trans)
    .addTailSink(assignTransactionToFirstVisit(crossTransWithVisits), transToFirstSource)
    .addTailSink(assignTransactionToVisit(crossTransWithVisits), transToLastSource)
    .addTailSink(distributeTransactionAmongVisits(crossTransWithVisits), transDistributedAmongSources);
    }

    |
    |
    |
    |
    \/

    Step 1a.

    You can shorten your code by adding helper class with and use “f(_)” instead of “new Fields(_)”:

    static public Fields f(String... names) {
    return new Fields(names);
    }
    
    static public SumBy sum(String field, Class<?> resultType) {
    return new SumBy(f(field), f(field), resultType);
    }
    
    static public SumBy sum(String field) {
    return sum(field, Double.class);
    }
    
    static public SumBy sumInt(String field) {
    return sum(field, Integer.class);
    }
    
    static public CountBy count(String outField) {
    return new CountBy(f(outField));
    }

    replacing

    new Fields(_) == > f(_)
    new FirstBy(field) == > first(field)
    new SumBy(new Fields("t_quantity"), new Fields("t_quantity"), Integer.class) == > sumInt("t_quantity")
    new SumBy(new Fields("t_income"), new Fields("t_income"), Double.class) == > sum("t_income")

    —– OR
    |
    |
    |
    \/

    Step 1b.

    Or you can try to use Cascading Tube wrapper, that implements builder pattern upon cascading and makes your code shorter, concise, readable and intuitive?

    (This method does exactly same thing as that above – it’s only rewritten to “Cascading Tube”)
    Add some imports first

    import jj.tube.Aggregators._
    import jj.tube.Io._
    import jj.tube.Tube._
    import jj.tube._

    and put this method into your class

    def flow(pv: Io, trans: Io, transToFirstSource: Io, transToLastSource: Io, transDistributedAmongSources: Io, transTrap: Io, pvTrap: Io) = {
    val transPipe = Tube("trans")
    .multiply("t_quantity", "t_itemCost", "t_income")
    .aggregateBy(("t_userId", "t_transactionId"), first("t_createTime"), sumInt("t_quantity"), sum("t_income"))
    
    val pvPipe = Tube("pv")
    .groupBy("p_userId", "p_createTime")
    .every(buffer = new GetVisit(selfReferred))
    
    val crossTransWithVisits = Tube("crossVisitAndTrans", pvPipe).coGroup("p_userId", transPipe, "t_userId", new RightJoin)
    .discard("p_userId")
    .groupBy(("t_userId", "t_transactionId"), "start")
    .every(buffer = new FillMisingMatchingVisit)
    .filter(("start", "t_createTime"), new OlderThan)
    .checkpoint
    
    FlowDef.flowDef().setName("ConversionAttribution").addSource(pvPipe, pv)
    .addSource(transPipe, trans)
    .addTailSink(assignTransactionToVisit(crossTransWithVisits, FIRST), transToFirstSource)
    .addTailSink(assignTransactionToVisit(crossTransWithVisits, MATCHING), transToLastSource)
    .addTailSink(distributeTransactionAmongVisits(crossTransWithVisits), transDistributedAmongSources)
    .addTrap("trans", transTrap)
    .addTrap("pv", pvTrap)
    }

    Which one do you like to read more?

  • so little time…

    And so many ideas. I have found an interesting reading nathanmarz.com. I was just seeking some hadoop related solution and found that guy. Looks interesting, so I will follow his twitts for a while.

    • I use feedly instead of google reader, I hate it on my mobile when it comes to run it first time after upgrade, it always asks me to login to google reader, I don’t know how this will end
    • I am almost ready with scala course from coursera, even though I don’t feel comfortable with scala. I tried to modify playframework scala todolist and failed with that “syntactic sugar”. [my scala play todolist]
    • As we are adopting Cascading into our project (We believe it will replace apache pig for our mapreduce computations) I finished “learn.cascading” — (link in cascading documentation page) there are given test in junit and you are asked to write an implementation for it — exactly as in groovy-koans. [Cascading isn’t as intuitive as pig but running it’s “ultrafast” tests -comparing to pigunit- makes it worth spending time on it. We use cloudera distro of hadoop and in latest CDH4.2.1 there is still pig in version 0.10, maybe pig current apache stable version 0.11 tests run faster?]
    • This week I’ve started to use idea ide, I was a big fun of netbeans working in Orange, I am fun of sts eclipse working in allegro (except the moments its window simply disappears in my mint nadia 14 – I hope it was missing repo or plugin issue) but the eclipse groovy plugin does not highlight unused imports… and idea does.
    • I wish I know how to “keep fire under motivations”. I wish to run something. I wish to participate in something cool. Of course my work is something that kind but still I feel there is a room for more. I am 30 right now and the most important thing I’ve changed in my point of view is about people. It is very important to have great guys around. The only way to build something working fine is to do  it with a team. Take a look on stendi.pl. I thought I am able to build such a shop myself. But I was wrong. I see a great team working on it. I see testers hard work. And I know now how big cost is to build something shiny.
  • Wrażenia po Hadoop in Spotify

    Wczoraj byłem z Jarkiem na spotkaniu w Wawie pt. Hadoop in Spotify. Fajny klaster 190 nodów a wkrótce 690 – impressive.
    Było kilka rzeczy dla których warto było się tam wybrać m.in projekty-narzędzia z jakich korzystają i z których my moglibyśmy:
    1. https://github.com/spotify/snakebite – szybka komunikacja z hdfs
    (wzgledem wolnej “hdfs dfs -ls /”)
    2. https://github.com/spotify/luigi – scheduler jobow
    3. apache sqoop – import/export danych hadoop/db