Usage example of Cascading Tube

EXAMPLE I
Step 0 is pure cascading flow example Cascading.

Step 0.

Short description of method: it takes logs from one source and produces tree different reports

public FlowDef flow(Tap pv, Tap trans, Tap transToFirstSource, Tap transToLastSource, Tap transDistributedAmongSources) {
// agregation
Pipe transPipe = new Each("trans", new Fields("t_quantity", "t_itemCost"), new ExpressionFunction(new Fields("t_income"),
"(double) t_quantity * (double) t_itemCost", Double.class), Fields.ALL);
transPipe = new AggregateBy(transPipe, new Fields("t_userId", "t_transactionId"), new FirstBy("t_createTime"),
new SumBy(new Fields("t_quantity"), new Fields("t_quantity"), Integer.class),
new SumBy(new Fields("t_income"), new Fields("t_income"), Double.class));

// divide
Pipe pvPipe = new Pipe("pv");
pvPipe = new GroupBy(pvPipe, new Fields("p_userId"), new Fields("p_createTime"));
Pipe visitPipe = new Every(pvPipe, new GetVisit(selfReferred()), Fields.RESULTS);

// merge
Pipe crossTransWithVisits = new CoGroup(visitPipe, new Fields("p_userId"), transPipe, new Fields("t_userId"));
crossTransWithVisits = new Each(crossTransWithVisits, new Fields("start", "t_createTime"), new OlderThan());
crossTransWithVisits = new Checkpoint(crossTransWithVisits);

return FlowDef.flowDenew Fields().setName("ConversionAttribution").addSource(pvPipe, pv).addSource(transPipe, trans)
.addTailSink(assignTransactionToFirstVisit(crossTransWithVisits), transToFirstSource)
.addTailSink(assignTransactionToVisit(crossTransWithVisits), transToLastSource)
.addTailSink(distributeTransactionAmongVisits(crossTransWithVisits), transDistributedAmongSources);
}

|
|
|
|
\/

Step 1a.

You can shorten your code by adding helper class with and use “f(_)” instead of “new Fields(_)”:

static public Fields f(String... names) {
return new Fields(names);
}

static public SumBy sum(String field, Class<?> resultType) {
return new SumBy(f(field), f(field), resultType);
}

static public SumBy sum(String field) {
return sum(field, Double.class);
}

static public SumBy sumInt(String field) {
return sum(field, Integer.class);
}

static public CountBy count(String outField) {
return new CountBy(f(outField));
}

replacing

new Fields(_) == > f(_)
new FirstBy(field) == > first(field)
new SumBy(new Fields("t_quantity"), new Fields("t_quantity"), Integer.class) == > sumInt("t_quantity")
new SumBy(new Fields("t_income"), new Fields("t_income"), Double.class) == > sum("t_income")

—– OR
|
|
|
\/

Step 1b.

Or you can try to use Cascading Tube wrapper, that implements builder pattern upon cascading and makes your code shorter, concise, readable and intuitive?

(This method does exactly same thing as that above – it’s only rewritten to “Cascading Tube”)
Add some imports first

import jj.tube.Aggregators._
import jj.tube.Io._
import jj.tube.Tube._
import jj.tube._

and put this method into your class

def flow(pv: Io, trans: Io, transToFirstSource: Io, transToLastSource: Io, transDistributedAmongSources: Io, transTrap: Io, pvTrap: Io) = {
val transPipe = Tube("trans")
.multiply("t_quantity", "t_itemCost", "t_income")
.aggregateBy(("t_userId", "t_transactionId"), first("t_createTime"), sumInt("t_quantity"), sum("t_income"))

val pvPipe = Tube("pv")
.groupBy("p_userId", "p_createTime")
.every(buffer = new GetVisit(selfReferred))

val crossTransWithVisits = Tube("crossVisitAndTrans", pvPipe).coGroup("p_userId", transPipe, "t_userId", new RightJoin)
.discard("p_userId")
.groupBy(("t_userId", "t_transactionId"), "start")
.every(buffer = new FillMisingMatchingVisit)
.filter(("start", "t_createTime"), new OlderThan)
.checkpoint

FlowDef.flowDef().setName("ConversionAttribution").addSource(pvPipe, pv)
.addSource(transPipe, trans)
.addTailSink(assignTransactionToVisit(crossTransWithVisits, FIRST), transToFirstSource)
.addTailSink(assignTransactionToVisit(crossTransWithVisits, MATCHING), transToLastSource)
.addTailSink(distributeTransactionAmongVisits(crossTransWithVisits), transDistributedAmongSources)
.addTrap("trans", transTrap)
.addTrap("pv", pvTrap)
}

Which one do you like to read more?

Posted

And so many ideas. I have found an interesting reading nathanmarz.com. I was just seeking some hadoop related solution and found that guy. Looks interesting, so I will follow his twitts for a while.

  • I use feedly instead of google reader, I hate it on my mobile when it comes to run it first time after upgrade, it always asks me to login to google reader, I don’t know how this will end
  • I am almost ready with scala course from coursera, even though I don’t feel comfortable with scala. I tried to modify playframework scala todolist and failed with that “syntactic sugar”. [my scala play todolist]
  • As we are adopting Cascading into our project (We believe it will replace apache pig for our mapreduce computations) I finished “learn.cascading” — (link in cascading documentation page) there are given test in junit and you are asked to write an implementation for it — exactly as in groovy-koans. [Cascading isn’t as intuitive as pig but running it’s “ultrafast” tests -comparing to pigunit- makes it worth spending time on it. We use cloudera distro of hadoop and in latest CDH4.2.1 there is still pig in version 0.10, maybe pig current apache stable version 0.11 tests run faster?]
  • This week I’ve started to use idea ide, I was a big fun of netbeans working in Orange, I am fun of sts eclipse working in allegro (except the moments its window simply disappears in my mint nadia 14 – I hope it was missing repo or plugin issue) but the eclipse groovy plugin does not highlight unused imports… and idea does.
  • I wish I know how to “keep fire under motivations”. I wish to run something. I wish to participate in something cool. Of course my work is something that kind but still I feel there is a room for more. I am 30 right now and the most important thing I’ve changed in my point of view is about people. It is very important to have great guys around. The only way to build something working fine is to do  it with a team. Take a look on stendi.pl. I thought I am able to build such a shop myself. But I was wrong. I see a great team working on it. I see testers hard work. And I know now how big cost is to build something shiny.

Posted

Wczoraj byłem z Jarkiem na spotkaniu w Wawie pt. Hadoop in Spotify. Fajny klaster 190 nodów a wkrótce 690 – impressive.
Było kilka rzeczy dla których warto było się tam wybrać m.in projekty-narzędzia z jakich korzystają i z których my moglibyśmy:
1. https://github.com/spotify/snakebite – szybka komunikacja z hdfs
(wzgledem wolnej “hdfs dfs -ls /”)
2. https://github.com/spotify/luigi – scheduler jobow
3. apache sqoop – import/export danych hadoop/db

Posted

Notatki z wykładów

WSZYSTKIE NAGRANE PREZENTACJE MAJĄ BYĆ WKRÓTCE UDOSTĘPNIONE
Sladjy: http://2013.nosql-matters.org/cgn/abstracts/

Strona konferencji: http://2013.nosql-matters.org/cgn/
Moje zdjęcia: https://plus.google.com/photos/104618273358355901903/albums/5876328933446465249?authkey=CN6R9Mu2zY2jcw

1. Keynote Martin Fowler

Świetny wstęp do konferencji. Autor próbował zdefiniować pojęcie nosql, zaprezentował genezę ruchu nosql. Charakterystyka nosql:
non-relational, open-source, cluster-friendly, 21st Century Web, schema-less.
Pokazał główne kategorie baz key-value, document, column i graph datasotre. Pokazał “CAP theorem” twierdzenie CAP – consistency, avaliabilty, partitioning – oraz pokazał, że w jednej chwili można mieć tylko dowolne dwie cechy zachowane (jeśli baza jest rozdystrybuowana czyli P >2 to trzeba wybierać pomiedzy A i C). Reklama książki NoSql Distiled oraz definicja polyglot persistance.
Dokładnie taki sam wstęp tutaj

2. ModeShape 3, red hat
Prezentacja produktu red hat piąty typ baz danych – hierarchiczna – podobna do filesystemu.

3. Big data beyond hadoop. Hadoop with apche camel
talend, polaczenie esb -apache camel – z hadoop
article to read: parallel ETL tools are dead
Ciekawe rozwiązanie ESB do zastosowań przeliczania hadoop.
Slajdy:
Darmowy produk Apache Camel + Apache Hadoop:

4. How to compare nosql dbs
Super prezentacja pokazująca benchmarki poniższych technologi.
aerospike, couchebase(good for fast cache), mongo, cassandra
Raport benchmarku:
Ciekawy key-valute store: http://www.aerospike.com/

5. Query Languages for Document Stores
Jan Steemann
Ponownie Jan z firmy triAGENS, prezentacja to krótkie preview języków zaptań do danych w postaci JSON
język FOX autorstwa w/w firmy producenta arangoDB, oraz JSONiq, przypominające składniowo mix sql i pytan do mongo
avocadoDB: http://www.nosql-cologne.org/aql_martin_schoenert_jan_steemann/page6.html#/6
Być może standart do zapytań baz dokumentowych: http://www.jsoniq.org/

6. Data Structures in Riak
Sean Cribbs
AWESOME
Riak kieruje się zupełnie inną strategią zapisu danych niż cassandra. W sytuacji gdy dwóch klientów próbuje zapisać rekord w klastrze o tym samym id cassandra odrzuca ten ze starszym timestamp i zapisuje najświeższy. Riak uwzględnia oba rekordy.

Teoretyczne wprowadzenie do struktur danych w riaku. Opis modelu matematycznego w/w struktur, logika zbiorów, definicja matematyczna, monodiy. Na końcu świetne demo klastra 5 nodowego, podzielonego na 2 partycje. “Mądre” convergence – zbliżanie wyników po wystąpieniu awari. Demo polegało na wyświetleniu dwóch okien przeglądarek jedno prezentowało jedną partycję klastra drugie drugą.

Zasymulowane use casy gdy partycje traca polaczenie ze soba. Na jednej do struktury – counter – wprowadzono wartość 5 na drugiej -5. Po odzyskaniu połączenia wartości się zsumowały do 0.

W drugim przykładzie na połączonych partycjach ustawiono wartosc 6 na obu. Wyłaczono połaczenie i zwiekszono wartosc do 13 na jednym. Po uruchomieniu polaczenia ponownie na drugiej partycji pojawila sie wartosc 13. Riak nie dystrybuje danych lecz operacje. Tj jesli na jednej partycji wykonano inkrementacje 5 razy riak przekazuje wykonanie tej operacji do innej czesci klastra.

Riak key-value w przypadkach rozdystrybuowanych zastepuje filesystem. Czasy odp 10 ms.

Slajdy: https://speakerdeck.com/basho/data-structures-in-riak
Prezentacja: http://vimeo.com/52414903 (w Koloni był tylko Sean Cribbs)

7. Processing a Trillion Cells per Mouse Click
Alex Hall
REWELACJA!
Columna storage, praca doktorska Alexa http://vldb.org/pvldb/vol5/p1436_alexanderhall_vldb2012.pdf
Wewnętrzne narzedzie w google – powerdrill. Niesamowite osiagi wydajnosci, czasu odpytywania zapytan grupujacych setu ok 500 mb, ktore w bazie trwaly ponad minute do 7 sekund w drill oraz 70 ms w powerdrill.

— co pomoglo, wprowadzenie ids int
— slownikowanie danych, ineksy po intach niesamowicie przyspieszyly wydajnosc zapytania
— kompresja zbioru rozwiazan??

co mozna u nas:
id w tabeli accounts

— w tabeli uri_pv tez id konta
— nawet id daty :P
— zeslownikowanie uri?
— tabela archive

praca dyplomowa: http://vldb.org/pvldb/vol5/p1436_alexanderhall_vldb2012.pdf
slajdy: http://www.d1-solutions.com/fileadmin/user_upload/d1solutions/Events/praesentationen_common_sense/ah-google-CommonSense-2013.01.pdf

8. 100% Big Data, 0% Hadoop, 0% Java
Pavlo Baron
REWELACJA! python + erlang
Big data without jvm do badania sematycznie pozytywnych i negatywnych tweetow.
Slajdy: http://www.slideshare.net/pavlobaron/100-big-data-0-hadoop-0-java
Prezentacja: http://www.youtube.com/watch?v=Q-Tm-HO7auE

====================================

1. How to survive in a BASE world
Uwe Friedrichsen
REWELACJA
Dzień rozpoczoł się od super wykładu na którym był pokazany przykład dwóch klas implementujących biblioteke: książka i klient. Przykład pokazywął implementację w orm hibernate i pokazywał jakie błedy popełniają programiści implementując ten sam interfejs z nosql, całkowicie tracąć funkcjonalności baz nosql. Oczywiście był również prezentowany prawidłowy model architektury. Gotowe do wykorzystania natychmiast.
slajdy: http://www.slideshare.net/ufried/how-to-survive-in-a-base-world

2. With a hammer in your hand… ElasticSearch
Simon Willnauer
REWELACJA!

Świetne demo elasticSearch – silnik przeszukiwania pełnotekstowego. Świetny prelegen! Jak będę implementował wyszukiwanie to na pewno użyje ElasticSearch (obecna wersja 0.9, pod spodem apache lucene)

3. Triple R – Riak, Redis and RabbitMQ at XING
Sebastian Röbke & Dr. Stefan Kaes

Świetna prezentacja pokazująca migracje technologii w serwisie XING (niemiecki goldenline) do w/w technologii. Motywem była potrzeba zwiększenia wydajności i szybkości serwisu. Stąd kompletna zmiana architektury z relacyjnej na baze key-value i rozdystrybuowany cache.

4. Boon and Bane for Software Architects
Ralf S. Engelschall

Ralf pokazywał jak bardzo może zaboleć korzystanie z polyglot persistance, czyli kilku baz danych w ramach jednego systemu aby pokryć różne funkcjonalności najbardziej odpowiednimi i wydajnymi zastosowaniami.

5. CAP and the Architectural Consequences
Martin Schönert

Prezentacja genezy twierdzenia cap oraz kilku wariantów różnych schematów rozdystrybuowanych systemów. CAP theorem
Teza a w zasadzie udowodnione twierdzenie przez Brewera twierdzi ze gdy mamy baze danych na klastrze (czyli nie single server) to w danej chwili możemy mieć spełnione dwie własności z trzech, spójność danych, dostępność systemu, partitioning cluster(ilość nodów >2).

6. Map/Reduce in Action: Large Scale Reporting Based on Hadoop and Vertica
Dennis Meyer & Uwe Seiler

Prezentacja firmy Adtech zajmującej się wyświetlaniem reklam na stronach www. Stos technologiczny zupełnie jak u nas tylko zamiast mysql – vertica, oraz flume.
Do schedulowania jobow jobs.

====================================

DZIEŃ WARSZTATÓW

Katalog baz nosql: http://www.nosql-database.org/

1. Data modeling
Wstęp do nosql + praktyczne użycie:
memchage, redis, couchdb, mongodb

ArangoDB (comparing to mongoDB)
+joins
+web access (REST/ http)
-sharding
+sql like queries

javascript queries ArangoDB
db._query(“RETURN persons[*].name”).toArray();
db._query(“FOR p IN persons FILTER p.name == ‘jan’ RETURN p.age”).toArray();

JOIN
db._query(“FOR p IN persons FOR p2 IN persons FILTER p.name == ‘jan’ RETURN p.age”).toArray();

+graph db TRAVERSAL function

2.
Apache Drill hands-on
Niestety projekt nie jest dojrzały. Informacji jest niewiele nawet na stronie projektu. Jak sie okazało implementacja Apache Drill to praca doktorska prowadzącego jest i jest w fazie alpha. Tj. można korzystać, ściągnąc kod, zbudować przykład ale są spore ograniczenia na input oraz język zapytań. (JSON)
https://github.com/apache/incubator-drill/tree/master/sandbox/prototype/exec/ref/src/test/resources
https://cwiki.apache.org/confluence/display/DRILL/Demo+HowTo
https://cwiki.apache.org/confluence/display/DRILL/AOL+Search

Posted

W ubiegłą sobotę 23 marca miały miejsce warsztaty z gita w centrum Warszawy na ulicy Hożej. Warsztaty organizował Wojtek z Maćkiem z Polidei i Łukasz z Pzu i Kamil (jak kogoś pominąłem to sory). Sponsorami spotkania byli Polidea, Touk, Codility, Macoscope z resztę te i inne info można przeczytać na gitkata.pl.

Ponownie (ad coderetreat) fajnie, że impreza zaczęła się o 11. Był czas dla rodziny i dla kolegów na gitkata.pl. Idąc na warsztaty zastanawiałem się czym mnie zaskoczą? Przecież na co dzień pracuję w gicie/ z gitem. Ku mojemu wielkiemu zaskoczeniu to co dotychczas wyprawiałem z gitem to był szczyt góry lodowej. To narzędzie wydaje się tak wszechstronnym kombajnem (chociaż chyba to złe porównanie bo kombajn zasadniczo świadczy jedną usługę) niczym bash się okazał po kilku latach pisania! Już pierwszy warsztat był dla mnie kompletną nowością.

1. Submodules Paweł Szklarz.
W pracy mamy projekt mvn, który też jest podzielony na moduły i też korzysta z różnych bibliotek zewn. Jednak nie mieliśmy dotychczas problemu, że nowa wersja zależności failowała nam build całej aplikacji na bamboo. Paweł mówił o jenkinsie. Może jak wystąpi kiedyś taki problem to będę pamiętał o tym wystąpieniu i skonfigurujemy repo wówczas odpowiednio.

2. Refspec (Mateusz Grzechociński).
Mateusza można słuchać zawsze! Sposób w jaki tłumaczy jest dla mnie wprost super klarowny. Nie muszę się nad niczym zastanawiać po prostu rozumiem o czym mówi. To jest bardzo fajny fjuczer Mateusza. Dlatego bardzo lubie go słychać. A jeśli chodzi o refspeci to to są takie wskaźniki source, destination. Do konfiguracji w git configu. Ale lepiej nie kombinować, chyba że później chcesz tłumaczyć wszystkim w koło co to za magii gitowej użyłeś :)

3. Chciałem sobie pójść na Git rerere (Mateusz Grzechociński) ale jakoś tak mnie Wojtek zagaił, że poszedłem na konkurs :) i świetnie na tym wyszedłem ponieważ otrzymałem nagrodę (godną polecenie książkę o gicie wydanictwa helion) za ukończenie z sukcesem zadania jako pierwszy! Thanks a lot once again!

4. Interactive rebase (Jakub Nabrdalik)

5. Git bisect (Grzegorz Kubiak)

… sory ale post długo leżał u mnie w drafcie a potem… wypadło z pamięci…

Jestem bardzo szczęsliwym posiadaczem książki “Git. Rozproszony system kotroli wersji” Włodzimierza Gajdy, którą wygrałem na gitkata.pl jako, że skończyłem jako pierwszy pierwsze zadanie konkursowe! Dzięki! Rewelacja nagroda! Zaglądam do niej. Fajnie, że czasem się tłumaczy nowoczesne technologie na język polski! BTW. Ciekawe jaki jest ewentualny wolumen odbiorców takiej książki, czy myślisz, że w Polsce 10 tyś ludzi korzysta z gita? A może 50 tys? Wszystkie nowoczesne firmy IT w polsce + studenci informatyki ~~100 tyś??

Posted