2016-04-12

Spark/PySparkでツイート分析してみた

Twitter Streaming APIでかき集めたツイートをSparkで分析してみました。

Hadoop版はこちら→Hadoop Streamingでアイドルツイート分析

Hive版はこちら→Hiveでツイート分析

環境

インストール

こんな感じでPrebuild for Hadoop 2.6 or laterをダウンロード、インストールします。

$ cd /usr/local
$ wget http://ftp.jaist.ac.jp/pub/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz
$ tar xvf spark-1.6.1-bin-hadoop2.6.tgz
$ mv spark-1.6.1-bin-hadoop2.6 spark

手動でダウンロードする場合は、Sparkのダウンロードサイトに行って、4のDownload Sparkをクリックした先にダウンロードリンクが表示されます。

download-spark

設定

S3を利用するので{SPARK_PATH}/conf/spark-defaults.confに以下の2行を追加

spark.driver.extraClassPath         {HADOOP_PATH}/libexec/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar:
{HADOOP_PATH}/libexec/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar:
{HADOOP_PATH}/libexec/share/hadoop/tools/lib/guava-11.0.2.jar
spark.executor.extraClassPath       {HADOOP_PATH}/libexec/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar:
{HADOOP_PATH}/libexec/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar:
{HADOOP_PATH}/libexec/share/hadoop/tools/lib/guava-11.0.2.jar

本当はワイルドカードでクラスパスを指定したかったんですが方法が見つからず…

起動

AWSのAPI用クレデンシャルを環境変数をセットしてからpysparkを起動します。

$ export AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}
$ export AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}
$ {SPARK_PATH}/bin/pyspark

以下のようなコードを打ち込むとHDFS内のデータに対して処理を行えます。gzで圧縮されている入力ファイルは1行1JSONのツイートデータが格納されています。

import json

sc.textFile("hdfs://localhost:9000/input/20160324_*_twitter-worker.gz")
txtFile.filter(lambda line: "西野七瀬" in json.loads(line)['text'].encode('utf-8')).count()

S3に対して行いたい場合は”hdfs://…“を”s3n://…“や”s3a://…“にすればOKです。Class xxx not foundと言われる場合はクラスパスの指定が問題なので、spark-defaults.confなどの設定を見直してください。

なおRDDをキャッシュする場合は以下のようにします。

cachedRDD = sc.textFile("s3n://path/to/file.gz").cache()
cachedRDD.count() 
cachedRDD.count() #2回目はキャッシュしているので高速

データがメモリに乗り切ってない場合は以下の様なワーニングが発生します。

WARN MemoryStore: Not enough space to cache rdd_10_0 in memory! (computed 495.5 MB so far)

この場合はメモリ量を調整すればOK。spark-defaults.confだとこんな感じ。

spark.executor.memory 5g

参考URL

このエントリーをはてなブックマークに追加