Hadoop Streamingを利用してJavaScriptでMap Reduce

2009.08.02 / javascript java

久々のBlog更新、というわけでリハビリがてらJavaScriptで軽く遊んでみたいと思います。

いま、巷で流行ってるMapReduceのオープンソース実装Hadoopは「Hadoop Streaming」という標準入出力でデータのやりとりができる仕組みを使って、 Hadoopの実装言語であるJavaにとらわれず、RubyやPerlなど他の言語でもMap+Reduceの処理ができることが1つのウリになっています。 で、僕たちwebエンジニアはみんなJavaScript大好きなので、「JavaScriptでもMap Reduceやりたい!」という流れになるのは必然です。 そこで、試行錯誤でいろいろ試してみると割とさっくり出来たのでそのメモを残しておきたいと思います。

環境の整備

Mac OSX上のVMWare FusionにCentOSの仮想マシンを2台立ち上げて、環境セットアップしました。以下のような手順で環境整備しました。

仮想HWの設定

仮想マシンのイメージファイルですが、僕は毎回thoughtpoliceのサイトのものを利用しています。ここからCentOS 5.3のイメージを落としてFusionでロード。

1つハマったのが、同じイメージをコピーして複数台起動すると、MACアドレスがかぶって同時に複数台の仮想マシンがNWに接続できない点。なので、仮想マシンを最初に起動させる前にMACアドレスの設定をしておくことをおすすめします。

MACアドレスは、vmxファイルのuuid.bios値をもとに自動生成されるようで、この値を最初から全仮想マシンでバラバラにしておけばOKです。と、いっても有効な値の範囲があるので、最後の1byte値だけズラしておけばOKだと思います。たとえば、上記サイトからイメージを取得した場合、uuid.bios値の最後は「7d」になっているので、僕はこの値を「7c」「7b」なんかにズラしておきました。(Fusion上から正規の方法?で、マシンをクローンする方法がよくわからなかったので、こういう強引な方法を取っています。綺麗な方法をご存知の方いらしたらぜひ教えてください!)

ネットワーク設定

/etc/hostsを設定して2台のマシンの名前解決。細かな話ですけど、ホスト名には「(アンダーバー)」は使えないのに要注意。最初「hdp01」みたいなホスト名にしていて、実行時エラーが出てドはまってたときに全然気づかなかった。

  • 192.168.1.15 hdp-01
  • 192.168.1.16 hdp-02

SSHの設定

マシンはそれぞれ自分自身(localhopst)に対して、パスフレーズなしでSSHでログインできるように公開鍵を設定しておきます。

  • ssh-keygen -t rsa -P “”
  • cat ~/.ssh/idrsa.pub >> ~/.ssh/authorizedkeys
  • chmod 600 ~/.ssh/authorized_keys

また、MasterのマシンからSlaveのマシンに対しても同様の設定をしておきます。今回はMasterはhdp-01, Slaveはhdp-02という設定です。hdp-01上で、次の設定をします。

  • cp ~/.ssh/idrsa.pub ~/.ssh/idrsamaster.pub
  • scp ~/.ssh/idrsamaster.pub hdp-02:/home/katsuma/.ssh/
  • chmod 600 ~/.ssh/authorizedkeys

また、hdp-02上で、次の設定をします。

  • cat ~/.ssh/idrsamaster.pub >> ~/.ssh/authorized_keys

これで、Masterからlocalhost, およびSlaveに対してパスフレーズなしでログインできることを確認しておきます。

セキュリティの設定

後でハマるのが面倒なので、iptablesやSELinuxは切っておきます。必要であれば適宜設定してください。

  • sudo /etc/init.d/iptables off
  • sudo /etc/sysconfig iptables off
  • sudo vi /etc/sysconfig/selinux
    • SELINUX=disabled に変更

Shellの設定

.bashrcなんかに以下の設定をしておくと便利です。僕はzsh派なので、.zshrcに記述しました。

hadoop

alias cdh='cd /home/katsuma/hadoop/latest/' alias dls='/home/katsuma/hadoop/latest/bin/hadoop dfs -ls' # ls alias drm='/home/katsuma/hadoop/latest/bin/hadoop dfs -rm' # rm alias dcat='/home/katsuma/hadoop/latest/bin/hadoop dfs -cat' # cat alias drmr='/home/katsuma/hadoop/latest/bin/hadoop dfs -rmr' # rm -r alias dmkdir='/home/katsuma/hadoop/latest/bin/hadoop dfs -mkdir' # mkdir alias dput='/home/katsuma/hadoop/latest/bin/hadoop dfs -put' # HDFS に転送 alias dget='/home/katsuma/hadoop/latest/bin/hadoop dfs -get' # HDFS から転送 alias dcpfl='/home/katsuma/hadoop/latest/bin/hadoop dfs -copyFromLocal' alias dcptl='/home/katsuma/hadoop/latest/bin/hadoop dfs -copyToLocal'

Javaのインストール

Sunのサイトに行き、 「Java SE Development Kit (JDK)」を選択して、インストーラをダウンロード。その後、以下の手順でインストールできます。

  chmod +x jdk-6u7-linux-i586-rpm.bin
  sudo ./jdk-6u7-linux-i586-rpm.bin
  

Hadoopの整備

Hadoopのインストール

Hadoopは今日時点で最新の0.20.0を利用しました。インストール場所はどこでもいいのですが、僕はホームディレクトリ直下に専用のディレクトリ掘って、いろんなバージョン試せるようにこんな感じで設置してます。ここから最新版をDLが可能です。

  • cd path/to/hadoop-0.20.0.tar.gz
  • tar zxvf hadoop-0.20.0.tar.gz
  • mv hadoop-0.20.0 ~/hadoop/
  • ln -s hadoop-0.20.0 latest

他のバージョンを試したくなったら~/hadoop/以下に展開して、シンボリックリンクを付け直すとOKですね。

Hadoopの設定

特に凝ったことはしていません。全ノードともに同じ設定である必要があるので、Masterで設定しちゃって、それをrsyncで他のSlaveと同期をとるのがよいと思います。

conf/hadoop-env.sh
export JAVA_HOME=/usr/java/latest

conf/core-site.xml

  <configuration>
    <property>
      <name>hadoop.tmp.dir</name>
      <value>/home/${user.name}/hadoop/latest/tmp</value>
    </property>
    <property>
      <name>fs.default.name</name>
      <value>hdfs://hdp-01:9000</value>
    </property>
  </configuration&gt
  

conf/hdfs-site.xml

  <configuration>
    <property>
      <name>dfs.replication</name>
      <value>1</value>
    </property>
  </configuration>
  

conf/mapred-site.xml

  <configuration>
    <property>
      <name>mapred.job.tracker</name>
      <value>hdp-01:9001</value>
    </property>
  </configuration>
  

conf/masters

        hdp-01
  

conf/slaves

        hdp-02
  

ここまで設定できたら、masterのイメージをrsyncしておきましょう。

  cd ~/hadoop/
  rsync -r hadoop-0.20.0/ hdp-02:/home/katsuma/hadoop/hadoop-0.20.0
  

SpiderMonkeyの導入

さて、やっとHadoopの設定が終わったので、次にJavaScriptの処理系の導入です。 Hadoop Streamingは前述の通り、標準入出力の仕掛けを使って実現されているので、さすがにブラウザの処理系をそのまま利用することができません。

そこで、FirefoxのJavaScriptのエンジンであるSpiderMonkeyを利用することにします。 SpiderMonkeyはファイルを入力としても処理できるし、irbのように対話型シェルとしても利用できるJavaScriptの処理系です。また、ブラウザを利用しないので、標準出力関数print、標準入力関数readlineが実装されてあるので、今回はこれを利用すればうまくいきそうです。

では、SpiderMonkeyを各ノードに導入します。これも最新版を導入します。あらかじめ、make, gccあたりをyumで入れておきましょう。

参考:SpiderMonkeyのインストール

  • wget http://ftp.mozilla.org/pub/mozilla.org/js/js-1.8.0-rc1.tar.gz
  • tar zxf js-1.8.0-rc1.tar.gz
  • cd js/src
  • BUILDOPT=1 make -f Makefile.ref
  • sudo install -m 755 LinuxAll_OPT.OBJ/js /usr/local/bin

Map, Reduce処理のJavaScriptを記述する。

では、Map, Reduceそれぞれの処理をJavaScriptで書きます。今回はサンプルとしてよくある、ワードカウントの処理を行ってみます。

map.js

map.jsは標準入力された文章を半角スペースごとに分けて、CSVの形式に整形します。JavaScript1.7相当の機能が利用できるので、Array.prototype.forEachが利用できることもポイントです。

!/usr/local/bin/js

var line=""; while ((line = readline())!= null){ var words = line.split(" "); words.forEach(function(w){ print(w + "," + 1); }); }

reduce.js

reduce.jsは、map処理されたCSV形式の入力に対して、counterオブジェクトで各単語をカウントしていきます。

!/usr/local/bin/js

var counter = {}; var line = ""; while ((line = readline()) != null) { var words = line.split(","); var word = words[0] if(!counter[word]) counter[word] = 1; else counter[word]++; }

for(var k in counter){ print(k + ":" + counter[k]); }

これらのJavaScriptファイルをhadoop/latest/script/あたりに保存しておき、全ノードで同期させておきます。

Hadoopの起動

最初にHDFSをフォーマットしておきます。Masterで次の処理を行います。

  cdh
  ./bin/hadoop namenode -format
  

MasterでHadoopを起動します。

  bin/start-all.sh
  

MapReduce用の適当な入力ファイルを作成します。こんな内容のファイルを$HADOOP_HOME/input/file1に作成します。

  we are the world we change the world
  

このファイルをHDFSに転送します。dputは bin/hadoop dfs -putのエイリアスです。

  dput input/file1 in/count
  

転送されてあるかどうかは、dls(bin/hadoop dfs -ls)で確認できます。

  katsuma@hdp-01 ~/hadoop/latest
  $ dls
  Found 1 items
  drwxr-xr-x   - katsuma supergroup          0 2009-07-31 02:14 /user/katsuma/in

katsuma@hdp-01 ~/hadoop/latest $ dls in Found 1 items drwxr-xr-x - katsuma supergroup 0 2009-07-31 02:56 /user/katsuma/in/count

入力用ファイルの存在が確認できたので、これでやっとMapReduce処理ができます。

  ./bin/hadoop jar ./contrib/streaming/hadoop-0.20.0-streaming.jar -input in/count -output out/count -mapper "js /home/katsuma/hadoop/latest/script/map.js" -reducer "js /home/katsuma/hadoop/latest/script/reduce.js"
  

すると、ゆったりですけど処理が進んでいきます。

  packageJobJar: [/home/katsuma/hadoop/latest/tmp/hadoop-unjar4327209233542314881/] [] /tmp/streamjob4726696067913490494.jar tmpDir=null
  09/07/31 10:45:05 INFO mapred.FileInputFormat: Total input paths to process : 1
  09/07/31 10:45:06 INFO streaming.StreamJob: getLocalDirs(): [/home/katsuma/hadoop/latest/tmp/mapred/local]
  09/07/31 10:45:06 INFO streaming.StreamJob: Running job: job2009073102370013
  09/07/31 10:45:06 INFO streaming.StreamJob: To kill this job, run:
  09/07/31 10:45:06 INFO streaming.StreamJob: /home/katsuma/hadoop/latest/bin/../bin/hadoop job  -Dmapred.job.tracker=hdp-01:9001 -kill job2009073102370013
  09/07/31 10:45:06 INFO streaming.StreamJob: Tracking URL: http://hdp-01:50030/jobdetails.jsp?jobid=job2009073102370013
  09/07/31 10:45:07 INFO streaming.StreamJob:  map 0%  reduce 0%
  09/07/31 10:45:23 INFO streaming.StreamJob:  map 50%  reduce 0%
  09/07/31 10:45:45 INFO streaming.StreamJob:  map 50%  reduce 17%
  09/07/31 10:48:49 INFO streaming.StreamJob:  map 100%  reduce 17%
  09/07/31 10:49:10 INFO streaming.StreamJob:  map 100%  reduce 100%
  09/07/31 10:49:15 INFO streaming.StreamJob: Job complete: job2009073102370013
  09/07/31 10:49:15 INFO streaming.StreamJob: Output: out/count
  

HDFS上のout/count/以下に結果が格納されたファイルができているので確認してみましょう。

  katsuma@hdp-01 ~/hadoop/latest
  $ dls out/count
  Found 2 items
  drwxr-xr-x   - katsuma supergroup          0 2009-07-31 10:45 /user/katsuma/out/count/_logs
  -rw-r--r--   1 katsuma supergroup         43 2009-07-31 10:48 /user/katsuma/out/count/part-00000

katsuma@hdp-01 ~/hadoop/latest $ dcat out/count/part-00000 are:1 change:1 the:2 we:2 world:2

まとめ

SpiderMonkeyのような処理系を用意することで、JavaScriptでもHadoopを使ってMapReduceできることが確認できました。標準入出力さえサポートされてあれば、理屈的にはどんな言語でもMapReduceできるので、MapReduceには興味あるけどJavaということで敬遠していた方は、ぜひいろんな言語で試していただければと思います。

開発のTips

なんだかんだ言って、最初開発にかなり苦労しました。と、いうのもシンタックスエラー以外は、実行しないとどうなるかよくわからないものなので、エラーで落ちたときにどうデバッグすればいいか悩みました。 ただ、よく考えれば、「標準入力→Map処理→Mapの標準出力→Reduceの入力→Reduceの標準出力」という流れになるので、たとえば今回のJavaScript実装の場合、次のように実行することでHadoopを介さなくとも動作確認は可能です。

  cat input/file1 | js script/map.js | js script/reduce.js
  

まずは、このように手元でパイプでつないで結果を調べてみる、というのが手かと思います。実際は手元でうまく動いてもHadoop上でRuntimeエラーが起きる場合も多いので、そのときはlogディレクトリ以下にできるログファイルを調べるのがいいと思います。

また、エラーが発生するとJavaの例外のStackStraceが表示されるので、敬遠せずにそこからHadoopのソースを直接追いかけるのは何だかんだで早い解決法でした。コメントが割と充実しているので、そこからStreaming用のコードのバグを辿るのも難しくはないと思います。