ABONAMENTE VIDEO REDACȚIA
RO
EN
×
▼ LISTĂ EDIȚII ▼
Numărul 12
Abonament PDF

Hadoop (II)

Radu Vunvulea
Solution Architect
@iQuest
PROGRAMARE

În ultimul număr am descoperit lumea pe care Hadoop o formează și care este secretul prin care putem să stocăm zeci și chiar sute de TB fără nici un fel de probleme. Aceasta se bazează pe un sistem de tip master-slave extrem de simplu, dar care funcționează foarte bine.

Oriunde este nevoie de Hadoop, se impune prezența unui nod numit: NameNode. Acest nod reprezintă nodul de tip master, stocând locația tuturor fișierelor din sistem. El este singurul nod care poate să identifice locația unui fișier pe baza numelui. În jurul acestui nodexistă DataNode-uri, care stochează conținutul fișierelor.

Procesarea datelor

Faima Hadoop-ului vine de la procesarea datelor și extragerea informațiilor de care avem nevoie. În următoarele rânduri vom descrie mecanismul acestei performanțe.

Un prim element este MapReduce. Această paradigmă nu a fost inventată de către Hadoop, dar a ajuns să facă acest lucru cel mai bine. La prima întâlnire cu MapReduce avem senzația că este complicat, dar odată înțeleasă, această paradigmă ne ajută foarte mult.

Totodată, nu încercați să folosiți Hadoop dacă nu ați înțeles MapReduce în totalitate. Fără să înțelegem MapReduce nu putem ști ce anume vrem să cerem de la Hadoop și la ce rezultate să ne așteptăm.

MapReduce și Tupluri

În comparație cu un sistem care stochează datele în tabele, să nu vă așteptați ca Hadoop să fie similar. Acesta știe să lucreze cu date sub forma unui tuplu - o pereche de tip (cheie, valoare). Fiecare task pe care Hadoop trebuie să îl execute are ca input tupluri de tip (cheie, valoare), iar rezultatul pe care îl obținem de la un task are aceeiași formă de tip (cheie, valoare).

Deși pare destul de banal, vom vedea că nu este nevoie de mai mult decât atât pentru a putea obține datele dorite.

Map

Procesul de MapReduce este format din două părți total separate - Map și Reduce. Map se referă la procesul prin care datele pe care dorim să le procesăm sunt convertite la un nou set de date. Datele pe care le obținem după acest pas sunt doar niște date intermediare care în starea în care sunt nu pot fi folosite.

Operația de tip Map nu se execută doar pe un singur nod în cadrul sistemului. Această acțiune va fi executată pe mai multe noduri de tip DataNode. Fiecare DataNode va genera un rezultat intermediar. Din punct de vedere a cantității de date care se generează după procesul de Map, aceasta este mult mai mică în comparație cu datele originale.

Ne putem imagina că după acest pas, Hadoop ne generează un sumar a datelor noastre în funcție de parametri de care noi suntem interesați. Este important să știm că datele intermediare pe care le obținem nu trebuie să aibă același format ca și datele de input.

În momentul de față, cheile ajung să fie partiționate pe baza unei funcții - în general o funcție de hash.

Odată ce procesul s-a finalizat cu succes, Hadoop poate să execute diferite operații peste ele, precum sortarea, separarea sau shuffle - această funcționalitate este destul de nouă, circa un an. Acești pași pregătesc datele intermediare pentru următorul pas. Atenție, acest pas se execută și în cadrul operației de Reduce.

Din punct de vedere al paralelismului, pe fiecare nod unde se execută operația de tip Map, putem să avem de la 10 până la 100-150 de operații. Totul depinde de cât de performante sunt nodurile cu care lucrăm.

Reduce

Odată ce avem datele intermediare, putem să le procesăm pentru a obține datele finale, de care suntem interesați. Până în acest moment puteam executa operațiile de tip Map pe fiecare din nodurile din cluster care conțineau datele noastre. Operația Reduce se va executa doar pe un număr limitat de noduri. Datele sunt partiționate pentru fiecare Reducer în parte.

Dacă operația de Map era formată dintr-un singur pas, operația de Reduce este formată din 3 pași separați

  • Shuffle,
  • Sort,
  • Reduce.

În momentul în care se face shuffle, datele de la nodurile care au fost implicate în procesul de Map sunt trimise la nodurile care urmează să execute următorul pas. Acest lucru se face folosind o conexiune HTTP. Din cauză că suntem într-o rețea privată, nu trebuie să ne facem probleme din punct de vedere al securității.

Toate tuplurile care sunt trimise, sunt apoi sortate pe baza cheii. Acest lucru este necesar deoarece putem avea aceleași chei de la diferite noduri. În general, procesul de sortare se execută simultan cu procesul de shuffle.

Odată ce operația de shuffle s-a finalizat, Hadoop va mai executa încă o sortare. În acest moment putem să controlăm modul în care datele să fie grupate și chiar să facem o sortare intermediară după diferiți parametri. Operația de sortare este o operație care se execută atât pe disk cât și în memorie.

Ultima operație care a mai rămas de executat este reduce. În momentul în care această operație se execută, rezultatele finale vor fi scrise pe disk. La acest pas fiecare tuplu este format dintr-o cheie și o colecție de valori. Din această colecție de valori operația de reduce va selecta valoarea finală.

Deși pasul de reduce este extrem de important, pot să existe cazuri când nu dorim să facem acest lucru. În astfel de cazuri, putem să specificăm că rezultatul obținut după Map să fie scris direct pe disk și să fie considerat rezultat final.

JobTracker, TaskTracker

Operația de tip MapReduce implică două tipuri de servicii care poartă numele de JobTracker și TaskTracker. Acestea două sunt într-o relație de tip master-slave, extrem de asemănătoare cu cea pe care am văzut-o la nivelul modului în care datele sunt stocate - NameNode și DataNode.

Scopul principal pe care JobTracker îl are este să facă scheduling și să monitorizeze fiecare acțiune. În cazul în care una din operații nu se termină cu succes, JobTracker va reprograma această operație. JobTracker-ul discută în permanență cu NameNode-ul și are grijă ca operația care trebuie să se execute să fie pe același DataNode sau cel puțin în același rack în care datele pot fi găsite.

TaskTracker-ul este un nod care acceptă operații de tip Map, Reduce sau Suffle. Acesta poate să fie DataNode-ul unde datele sunt stocate, dar acest lucru nu este obligatoriu. Fiecare TaskTracker are un număr limitat de job-uri pe care le poate executa (slot). Din această cauză JobTracker-ul va căuta întotdeauna TaskTracker-ul care are cât mai multe slot-uri libere.

Un lucru destul de interesant care a fost făcut pe TaskTracker este modul în care fiecare job se execută. Acesta se execută într-un proces JVM separat. Prin acest mod în cazul în care apare o eroare, aceasta nu va fi propagată la toate job-urile care rulează pe TaskTracker-ul curent.

Exemplu

Până în acest moment am prezentat din punct de vedere teoretic cum MapReduce funcționează. Vă propun ca în următoarea parte a articolului să analizăm peste un exemplu practic. În acest mod ne va fi mult mai simplu și ușor să înțelegem ce se întâmplă cu adevărat.

Vom porni de la următoarea problemă. Avem sute de fișiere ce conțin date despre numărul de accidente din fiecare județ din România care s-au întâmplat în fiecare lună. Am ajuns la un număr foarte mare deoarece există nenumărate firme de asigurare, iar fiecare firmă de asigurare are una sau mai multe centre regionale. Din această cauză fiecare fișier poate să conțină mai multe date despre același oraș. Un fișier ar putea avea următoarea formă:

Cluj, Ianuarie 2013, 20

Sibiu, Ianuarie 2013, 10

Brașov, Ianuarie 2013, 3

București, Ianuarie 2013, 100

Cluj, Mai 2013, 50

Brașov, Iulie 2013, 18

Se cere să calculăm numărul maxim de accidente care a avut loc în fiecare oraș în decursul unei luni. Această problemă devine destul de greu de rezolvat dacă avem 500 GB de date. În acest caz, Hadoop ne poate ajuta.

Prima operație pe care MapReduce o face este cea de Map. În acest moment din fiecare fișier vom putea obține o colecție de tip (cheie, valoare). Pentru noi cheia va reprezinta orașul, iar valoarea va indica numărul de accidente. Din fiecare fișier dorim să extragem numărul maxim de accidente per oraș. Pentru exemplul dat mai sus rezultatul pe care l-am putea obține ar avea următoarea formă

(Cluj, 50)

(Sibiu, 10)

(Brașov, 18)

(București, 100)

Din alte fișiere vom obține și alte date. Dacă punem toate aceste date împreună (operația de shuffle) am avea următorul rezultat intermediar

(Cluj, 13), (Brașov, 20), (Cluj, 40), (Sibiu, 2), (Cluj, 50), (Sibiu, 10), (Brașov, 18), (București, 100), (Sibiu, 8) …

Peste acest rezultat intermediar putem să aplicăm operația de Reduce, care ar genera rezultatul final - numărul maxim de accidente din fiecare oraș. La final obținem următorul rezultat

(Cluj, 50)

(Brașov, 20)

(Sibiu, 10)

(București, 100)

Concluzie

După cum am putut observa, MapReduce este o operație simplă, care se bazează pe împărțirea unui task în operații cât mai mici care să poată să fie rulate în paralel. Odată ce fiecare operație a fost executată pe o parte din date, rezultatele intermediare obținute sunt aduse în forma finală. Limbajul nativ pentru Hadoop este Java, dar există suport pentru folosirea sa împreună cu alte limbaje precum Python, C# și chiar și PHP.

Conferință

Sponsori

  • ntt data
  • 3PillarGlobal
  • Betfair
  • Telenav
  • Accenture
  • Siemens
  • Bosch
  • FlowTraders
  • MHP
  • Connatix
  • UIPatj
  • MetroSystems
  • MicroFocus
  • Colors in projects