wtorek, 28 września 2010

Hadoop instalacja, część II.

Do tej pory pracowaliśmy na Hadoopie w wersie Pseudo-Distributed, czyli nie rozproszonej. W tym poście postaram się opisać konfiguracje klastra złożonego z dwóch maszyn:
  1. Master (ip 192.xxx.xx.101) - będzie pełnił rolę NameNode, oraz JobTrackera. Jako że nasz klaster składa się zaledwie z dwóch maszyn, uruchomimy na nim DataNode oraz TaskTracker.
  2. Slave (ip 192.xxx.xx.102) - uruchomimy na nim DataNode oraz TaskNode.
Dla zapewnienia bezpieczeństwa oraz ułatwienia konfiguracji na obu maszynach, dodajemy grupę oraz użytkownika:
sudo addgroup hadoop
sudo adduser --ingroup hadoop hadoop
.

Logujemy się na nowo stworzonego użytkownika. Instalujemy oraz testujemy Hadoopa na obu maszynach, tak jak to pisałem w części pierwszej tego artykułu.

Możemy dla wygody ustawić na obydwóch maszynach:
etc/hosts
192.xxx.xx.101 master
192.xxx.xx.102 slave

Ponieważ Hadoop do komunikacji między Nodami używa ssh do poprawnego działania, użytkownik zalogowany na maszynę Master musi mieć możliwość zalogowania się przez ssh bez podawania hasła na maszyne Slave. W tym celu kopiujemy klucz z Mastera z katalogu <HOME>/.ssh/nazwa_klucza.pub do <HOME>/.ssh/authorized_keys na Slavie.

Przechodzimy do konfiguracji Hadoopa.
Najpierw konfigurujemy maszynę Master:
W pliku conf/masters dodajemy namiar na maszynę gdzie uruchomiony będzie NameNode oraz JobTracker:
master

Wpliku conf/slaves ustawiamy maszyny gdzie znajdują się DataNode oraz TaskNode:
master
slave

Na wszystkich maszynach modyfikujemy konfiguracje w następujący sposób:
W pliku conf/core-site.xml:
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>

W pliku conf/mapred-site.xml:
<property>
<name>mapred.job.tracker</name>
<value>master:9001</value>
</property> 
W pliku conf/hdfs-site.xml:
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
Warto ustawić dodatkowe parametry, choć są one opcjonalne:
W pliku conf/mapred-site.xml:
mapred.local.dir - lista lokalnych katalogow dla tymczasowych danych - jeśli na maszynie mamy kilka dysków możemy je tutaj podać poprawi to czas operacji I/O.
mapred.map.tasks - maksymalna ilosc maperów - dokumentacja mówi, że najlepszym ustawieniem jest 10x ilość maszyn Slave.
mapred.reduce.tasks - maksymalna liczba reducerów - dokumentacja mówi, że najlepszym ustawieniem jest  2x ilość rdzeni na wszystkich maszynach Slave.

Uruchamiamy klaster:
Najpierw formatujemy NameNode:
bin/hadoop namenode -format
Uruchamiamy HDFS:
bin/start-dfs.sh

Uruchamiamy MapReduce:
bin/start-mapred.sh

Jeśli wszystko pójdzie po naszej myśli, na maszynie Master oraz Slave w katalogu <HADOOP_HOME>/logs pojawią się logi mówiące o poprawnym uruchomieniu poszczególnych elementów klastra.

środa, 1 września 2010

Pig, nie taki znowu leniwy zwierzak...


Tak wiem, tym razem powinienem opisać konfiguracje klastra Hadoop, ale nie mogłem się oprzeć pokusie zabawy dzieckiem korporacji Yahoo. Pig, bo o nim mowa, to platforma oraz język wysokiego poziomu do pisania Jobów MapReduce. Wyczytałem na jednej z marketingowych prezentacji, że używając Piga wystarczy 5% kodu, oraz 5% czasu, który musielibyśmy poświęcić, aby osiągnąć ten sam efekt w Javie. Brzmi nieźle.

Tradycyjnie zaczynamy od instalacji.

Pig pobieramy z tego adresu.
Rozpakowujemy paczkę. 
Sprawdzamy czy działa poleceniem:
bin/pig -help

Pig może działać w dwóch trybach:
1. Lokalnym - nie potrzebuje uruchomionego Hadoopa
bin/pig -x local
2. MapReduce - używa uruchomionego klastra:
W pliku <pig_home>/bin/pig.sh dodajemy zmienne środowiskowe:
PIG_PATH=<pig_home>
PIG_CLASSPATH=$PIG_PATH/pig-0.7.0-core.jar:<hadoop_home>/conf
PIG_HADOOP_VERSION=0.20.2
Uruchamiamy poleceniem:
bin/pig
lub
bin/pig -x mapreduce

To co właśnie widzimy, to konsola, grunt umożliwiająca pisanie skryptów. Kolejną możliwością jest uruchamianie skryptów z pliku:
bin/pig skrypt.pig
Możemy również wywołać skrypt z kodu Javowego.

Spróbujmy napisać skrypt, który zrobi dokładnie to samo, co testowy Job MapReduce, czyli zliczy wystąpienie poszczególnych słów w pliku.
Uruchamiamy konsole grunt:
bin/pig
Załadujemy dane poleceniem:
fs -copyFromLocal conf pig_test_data
Tak wygląda skrypt:
rawData = load 'pig_test_data';
words = foreach rawData generate flatten(TOKENIZE($0)) as word;
filteredWords = filter words by word matches '\\w+';
groupedWords = group filteredWords by word;
countedWords = foreach groupedWords generate COUNT(filteredWords), group;
orderedCountedWords = order countedWords by $0 desc;
store orderedCountedWords into 'pig_test_data_output/char_freq';

Wyniki sprawdzamy poleceniem:
fs -cat pig_test_data_output/char_freq/part-r-00000
Po szczegóły dotyczące składni języka Pig odysłam do naprawdę niezłej dokumentacji.

poniedziałek, 30 sierpnia 2010

Hadoop instalacja, część I.

Platformę Hadoop możemy instalować w trzech trybach:
1. Standalone - nie rozproszona, wszystko działa w jednym procesie Javowym, użyteczna podczas debugowanie Jobów. Brak HDSF-a.
2. Pseudo-Distributed - również nie rozproszona, ale każdy z elementów działa w osobnym wątku,  wszystkie elementy są uruchomione.
3. Fully-Distributed - czyli pełnoprawny klaster.


Dziś postaram się omówić dwa pierwsze tryby.


Aby zainstalować Hadoopa potrzebne nam będą:
1. System operacyjny z rodziny Linux - ja użyłem Ubuntu 9.10 (można uruchomić Hadoopa  pod systemem Windows ale tylko jako wersję deweloperską).
2. Zainstalowana Java 6.
3. Skonfigurowane ssh.
4. Hadoop - ja używam wersji 20.02


Ok, a więc zaczynamy!
Postów na temat jak instalować Ubuntu powstało już naprawdę dużo, dlatego przejdźmy od razu do instalowania Javy, aby to zrobić wykonujemy polecenie:
sudo apt-get install sun-java6-jre sun-java6-plugin sun-java6-fonts


Sprawdzamy czy mamy najnowsze pakiety dla ssh:
sudo apt-get install ssh
sudo apt-get install rsync


Uruchamiamy ssh:
ssh localhost


Jeśli musimy się uwierzytelniać podając hasło, dobrze jest dodać RSA dla naszego użytkownika:
Tworzymy klucz poleceniem:
ssh-keygen -t rsa -P ""
Nastepnie kopiujemy go do odpowiedniego katalogu używając polecenia:
cat /scieżka gdzie zapisalimy klucz/.ssh/nazwa_klucza.pub >> /katalog domowy/.ssh/authorized_keys
Potrzebny będzie restart!
Środowisko mamy gotowe, czas na Hadoopa.
Rozpakowujemy paczkę, w pliku conf/hadoop-env.sh ustawiamy JAVA_HOME. Hadoop w wersji Standalone powinień działać, aby się upewnić uruchamiamy przykład, który zliczy nam słowa z wszystkich plików znajdujących sie w katalogu input:
bin/hadoop jar hadoop-*-examples.jar grep input output '\w+'
Jeśli wszystko poszło dobrze w katalogu output znajdziemy plik z wynikami.


Czas na Pseudo-Distributed.
W pliku conf/core-site.xml podajemy adres węzła HDFS:
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>


Skoro mamy tylko jeden węzeł Hadoopa w pliku conf/hdfs-site.xml ilość replikacji ustawiamy na jeden:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>


W pliku conf/mapred.site.xml wskazujmy gdzie będzie znajdował się węzeł, który będzie odpowiedzialny za uruchamianie jobów:
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>


Opcjonalnie warto jeszcze dodać w pliku conf/hdfs-site.xml ścieżki gdzie HDFS ma przetrzymywać swoje dane:
Katalog dla metadanych NameNode:
<property>
<name>dfs.name.dir</name>
<value>/home/user/hadoop/dfs/namenode</value>
</property>
Katalog dla bloków danych:
<property>
<name>dfs.data.dir</name>
<value>/home/user/hadoop/dfs/data</value>
</property>
Gdy wszystko mamy już skonfigurowane formatujemy HDFS-a:
bin/hadoop namenode -format

Uruchamiamy Hadoopa:
bin/start-all.sh
Jeśli wszystko jest w porządku będziemy mieli możliwość obejrzenia dwóch stron:
  • NameNode - http://localhost:50070/
  • JobTracker - http:localhost:50030/
W razie jakichkolwiek problemów bardzo pomocne są logi znajdujące się w katalogu hadoopa.


Uruchamiamy przykład:
Kopiujemy katalog input do HDFS-a:
bin/hadoop fs -put input input
sprawdzamy czy udało się skopiować:
bin/hadoop fs -ls
uruchamiamy ten sam przykład co poprzednio:
bin/hadoop jar hadoop-*-examples.jar grep input output '\w+'

sprawdzamy wyniki:
bin/hadoop fs -cat output/*
Aby zatrzymać Hadoopa wywołujemy:
bin/stop-all.sh

czwartek, 12 sierpnia 2010

Jak to robią wielcy...

Ostatnio przeglądając blogosferę natknąłem się na informacje na temat ilości przetwarzanych danych przez portale które można uznać za wielkie. Muszę przyznać, że zrobiło to na mnie spore wrażenie, mianowicie obecnie Facebook posiada 36 petabajtów danych, a do końca roku spodziewa się wzrostu do około 50 petabajtów, Google gromadzi każdego dnia 400 terabajtów logów (dane z 2007 roku). Zacząłem się zastanawiać, jak można sobie poradzić z taką ilością różnych danych, jak je przechowywał, filtrować, oraz nimi zarządzać. Okazało się, że inżynierowie Google wymyślili oraz opublikowali dokumentacje do dwóch technologii które potrafią sobie z tym poradzić, są to:

  • MapReduce
  • GFS (Google File System)
Na postawie tej dokumentacji Yahoo stworzyło, oraz udostępniło na licencji Apache platformę o nazwie Hadoop. Hadoop to zbiór 8 narzędzi, mnie jak na razie interesuje 5 z nich:

1. MapReduce - podstawa całej platformy. Wymyślona przez Google koncepcja jest genialna w swojej prostocie, w dwóch słowach chodzi tutaj o to, aby programista w najprostszej wersji dostarczył dwóch klas: mappera i reducera. Mapper na wejściu otrzymuje pewien obiekt z danymi wejściowymi (najczęściej linie z pliku), a na wyjściu produkuje pary klucz=wartość. Instancji mappera może być oczywiście wiele (Facebook ma klaster Hadoopa złożony z 2200 maszyn) gdy mappery skończą prace Hadoop scala klucze tak aby powstały pary klucz = {lista wartości z wszystkich mapperów}, tak z agregowane dane przekazywane są do reducera (także może ich być wiele), który wykonuję jakieś operacje biznesowe i przekazuje wyniki również w formie klucz=wartość.

2. HDSF (Hadoop Distributed File System) - czyli rozproszony system plików. HDSF ma zapewnić szybki odczyt, zapis, oraz replikacje danych. Jak on to robi? Wyobraźmy sobie klaster złożony z 10 komputerów, jeden pełni role NameNode Server, oznacza to, że on wie gdzie i jakie pliki są zapisane, pozostałe 9 tylko przechowują dane. Gdy wgrywamy plik do HDFS zostaje on podzielony na paczki (zazwyczaj 64, lub 128 mb), paczki są równolegle zapisywane na wszystkich maszynach oraz replikowane na określoną przez nas ilość maszyn. Zmniejsza to znacznie ograniczenie związane prędkością zapisu oraz odczytu dysku ponieważ jednocześnie piszemy na wiele dysków na wielu maszynach. Ma to jednak swoje słabe strony, gdy już raz zapiszemy plik w HDSF nie ma możliwości edytowania jego zawartości.

3. Hive - ludzi od Facebooka zaczął uwierać fakt, że do stworzenia joba, który będzie obrabiał dane używając MapReduce potrzebni są programiści, oraz sporo kodu który muszą napisać. Drugim problemem są analitycy danych, którzy chcą stworzyć sobie na bieżąco jakieś statystyki bez potrzeby zaprzęgania do roboty programistów. W odpowiedzi na te problemy powstał Hive, czyli technologia która potrafi opisać jakiś plik lub katalog HDSF-a schemą (stworzyć tabele podobną do tych znanych z baz danych opisującą jakie dane znajdują się w środku), gdy dane są opisane możemy uruchamiać na nich zapytania łudząco podobne do SQL-a, możliwe są nawet JOIN-y pomiędzy "tabelami", a wyniki możemy zapisać w kolejnej "tabeli" czyli tak naprawdę w pliku znajdującym się w HDSF. A i byłbym zapomniał - oczywiście nasze zapytania SQL są tak naprawdę konwertowane do jobów MapReduce.

4. Pig - Yahoo uznało, że Hive to nie do końca to co jest im potrzebne, dlatego powstał Pig. Jest to język, który umożliwia budowanie jobów MapReduce na znacznie wyższym poziomie abstrakcji niż możliwe jest to np. w Javie. Postawiono tutaj na duże możliwości związane nie tylko z przeszukiwaniem danych, ale również z ich przetwarzaniem, kolejnym bardzo ważnym elementem języka jest łatwość integracji z kodem napisanym w Javie, co pozwala na pisanie specyficznych elementów joba.

5. HBase - czyli implementacja BigTable, jest to nierelacyjna baza danych typu klucz=wartość, która wykorzystuje HDFS oraz łatwo integruje się z pozostałymi technologiami.

Ok, przydługi i trochę teoretyczny wstęp mamy za sobą. W kolejnych postach postaram się opisać w praktyczny sposób każdą z wymienionych technologi. Trzymajcie rękę na pulsie ponieważ w następnym wpisie uruchomimy nasz pierwszy klaster Hadoopa.