Twitter Streaming APIでかき集めたツイートをSparkで分析してみました。
Hadoop版はこちら→Hadoop Streamingでアイドルツイート分析
Hive版はこちら→Hiveでツイート分析
環境
- OS: Mac OS X(El Capitan)
- Python: 2.7.11
- Hadoop: 2.7.1
インストール
こんな感じでPrebuild for Hadoop 2.6 or laterをダウンロード、インストールします。$ cd /usr/local
$ wget http://ftp.jaist.ac.jp/pub/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz
$ tar xvf spark-1.6.1-bin-hadoop2.6.tgz
$ mv spark-1.6.1-bin-hadoop2.6 spark
手動でダウンロードする場合は、Sparkのダウンロードサイトに行って、4のDownload Sparkをクリックした先にダウンロードリンクが表示されます。
設定
S3を利用するので{SPARK_PATH}/conf/spark-defaults.confに以下の2行を追加spark.driver.extraClassPath {HADOOP_PATH}/libexec/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar:
{HADOOP_PATH}/libexec/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar:
{HADOOP_PATH}/libexec/share/hadoop/tools/lib/guava-11.0.2.jar
spark.executor.extraClassPath {HADOOP_PATH}/libexec/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar:
{HADOOP_PATH}/libexec/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar:
{HADOOP_PATH}/libexec/share/hadoop/tools/lib/guava-11.0.2.jar
本当はワイルドカードでクラスパスを指定したかったんですが方法が見つからず…
起動
AWSのAPI用クレデンシャルを環境変数をセットしてからpysparkを起動します。$ export AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}
$ export AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}
$ {SPARK_PATH}/bin/pyspark
以下のようなコードを打ち込むとHDFS内のデータに対して処理を行えます。gzで圧縮されている入力ファイルは1行1JSONのツイートデータが格納されています。
import json
sc.textFile("hdfs://localhost:9000/input/20160324_*_twitter-worker.gz")
txtFile.filter(lambda line: "西野七瀬" in json.loads(line)['text'].encode('utf-8')).count()
S3に対して行いたい場合は”hdfs://…“を”s3n://…“や”s3a://…“にすればOKです。Class xxx not foundと言われる場合はクラスパスの指定が問題なので、spark-defaults.confなどの設定を見直してください。
なおRDDをキャッシュする場合は以下のようにします。
cachedRDD = sc.textFile("s3n://path/to/file.gz").cache()
cachedRDD.count()
cachedRDD.count() #2回目はキャッシュしているので高速
データがメモリに乗り切ってない場合は以下の様なワーニングが発生します。
WARN MemoryStore: Not enough space to cache rdd_10_0 in memory! (computed 495.5 MB so far)
この場合はメモリ量を調整すればOK。spark-defaults.confだとこんな感じ。
spark.executor.memory 5g