2016-03-22

Hadoop Streamingでアイドルツイート分析

Fluentd+ElasticSearch+Kibanaでアイドルデータ分析基盤を作ってみたの回で、FluentdとTwitter Streaming APIを使ってS3にツイートデータを保存したので、このデータをHadoopを使って解析してみます。

今回はMeCabを使って形態素解析してワードカウントを取るような教科書的なMapReduceを試してみました。Hadoop Streamingを使ってPythonでMapper、Reducerを書いていきます。

環境

Hadoopのインストール&設定

以下のURLを参考にインストール&設定すればOK。 Macなら brew install hadoopで一発。 インストールしたら、~/.bashrcか~/.zshrcにこんな感じで設定しておきます。環境変数HADOOP_CLASSPATHに$HADOOP_HOME/libexec/share/hadoop/tools/lib/*を入れないと"ls: No FileSystem for scheme: s3n hadoop"とエラーが発生するので注意。 ```bash export HADOOP_VER=2.7.1 export HADOOP_HOME=/usr/local/Cellar/hadoop/$HADOOP_VER export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:"$HADOOP_HOME/libexec/share/hadoop/tools/lib/*" ``` Macで擬似分散モードを試すときは[システム環境設定]>[共有]でリモートログインを許可しておく。

Hadoop StreamingのMapper、Reducer

Mapperは対象のデータを標準入力で受け取り、Key/Valueを標準出力に吐き出すようなコード、ReducerはMapperの標準出力を入力としてKey/Valueを集計するようなコードをそれぞれ書けばOKです。今回はMapperでMeCabを使って形態素解析をするため、MeCabのインストールを行います。MeCabのダウンロードはこちら。 ```bash $ tar xvf mecab-0.996.tar.gz $ cd mecab-0.996 $ ./configure $ make $ sudo make install ``` 辞書もインストール ```bash $ tar xvf mecab-ipadic-2.7.0-20070801.tar.gz $ cd mecab-ipadic-2.7.0-20070801 $ ./configure --with-charset=utf8 $ make $ sudo make install ``` Pythonバインディングのインストール ```bash $ tar xvf mecab-python-0.996.tar.gz $ cd mecab-python-0.996 $ python setup.py build $ sudo python setup.py install ``` ユーザ辞書の元CSVデータ作成 ```text 乃木坂,1288,1288,*,名詞,固有名詞,*,*,*,*,乃木坂,ノギザカ,ノギザカ 欅坂,1288,1288,*,名詞,固有名詞,*,*,*,*,欅坂,ケヤキザカ,ケヤキザカ ... ``` ユーザ辞書を作成 ```bash $ /usr/local/libexec/mecab/mecab-dict-index -d/usr/local/lib/mecab/dic/ipadic \ -u /path/to/file/user.dic -f utf-8 -t utf-8 /path/to/file/user_dic.csv ```

MeCabにユーザ辞書を設定(/usr/local/lib/mecab/dic/ipadic/dicrc)

```text userdic = /path/to/file/user.dic ``` Mapperの実装 ```python #!/usr/bin/env python # -*- coding: utf-8 -*- import sys import json import MeCab import re regex = re.compile(r'[ぁ-ん。、!?!-/:-@≠\[-`{-~a-zA-Z\d_]+') mt = MeCab.Tagger("-Ochasen") for line in sys.stdin: obj = json.loads(line) text = obj['text'] # RTを含むツイートは除く if text.find('RT') > -1: continue m = mt.parseToNode(text.encode('utf-8')) while m: # 一文字、空白な品詞は無視。正規表現によるフィルタリング内容は要検討。 if m.surface == '' or len(m.surface) == 1 or regex.match(m.surface): m = m.next continue print("{0}\t1".format(m.surface)) m = m.next ``` Reducerの実装 ```python #!/usr/bin/env python # -*- coding: utf-8 -*- import sys word_map = {} for line in sys.stdin: word, count = line.strip().split('\t') word_map[word] = word_map[word] + int(count) if word in word_map else int(count) for k, v in sorted(word_map.items(), key=lambda x:x[1], reverse=True): print("{0}\t{1}".format(k, v)) ``` Mapperでツイート内容を形態素解析かけるときは、encodeしないと以下のようなエラーが発生するので注意 ```text Traceback (most recent call last): File "/path/to/hadoop_mapper.py", line 12, in m = mt.parseToNode(obj['text']) File "/usr/local/lib/python2.7/site-packages/MeCab.py", line 282, in parseToNode def parseToNode(self, *args): return _MeCab.Tagger_parseToNode(self, *args) TypeError: in method 'Tagger_parseToNode', argument 2 of type 'char const *' ```

S3のデータを利用する設定

core-site.xmlの設定はこんな感じで↓

```xml fs.defaultFS hdfs://localhost:9000 fs.s3n.awsAccessKeyId {AWS_ACCESS_KEY_ID} fs.s3n.awsSecretAccessKey {AWS_SECRET_ACCESS_KEY} ``` 以下のコマンドを実行して問題なくS3にアクセスできることを確認します。 ```bash $ hadoop fs -ls s3n://{BUCKET_NAME} ```

Hadoopの起動

$ hadoop jar ${HADOOP_HOME}/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
 -input s3n://{BUCKET_NAME}/20160319_*_twitter.gz \
 -output /output -mapper ./hadoop_mapper.py -reducer ./hadoop_reducer.py

結果の確認は以下のコマンドで

$ hadoop fs -cat /output/part-00000 | less

3/19のツイート収集結果はこんな感じ

乃木坂:2565     
西野七瀬:532    
白石麻衣:436    
欅坂:426        
橋本奈々未:369  
生駒里奈:356    
希望:327        
齋藤飛鳥:302    
永島聖羅:286    
生田絵梨花:274  
高山一実:257    
生:251  
写真:241        
紅白:239        
衣装:237        
提供:236        
松村沙友理:213  
人:210  
日:187  
衛藤美彩:180    
中:167  
北野日奈子:150  
深川麻衣:147

正規表現のフィルタリングをキツ目に設定しているのでメンバーが良い感じに出力できています。卒業コンサートがあった永島聖羅が高め。アンダーでは北野日奈子強いです。

参考URL