Spark と大規模データ処理
NAIST ビッグデータアナリティクス 第 2 回鈴木 優
本日の内容
ビッグデータとは Hadoop から Spark まで Apache Spark の使い方
ビッグデータとは
大きなデータとは(ガードナーによる定義 (three V)) 量が多い (Volume) 地球上に存在するデータ量は 2011 年には 1.8ZB,2020 年に は 35ZB に達する見込み 1ZB (1 ゼッタバイト)= 1021B (バイト)= 1,000 EB(エ クサバイト) = 1,000,000 PB = 1,000,000,000 TB = 1,000,000,000,000 GB 種類が多い (Variety) Word, Excel などの文書,電子メール,ソーシャルメディアの データ,センサーデータ 頻度が多い (Velocity) 商品の購買履歴データ - クレジットカード,銀行の預貯金 センサーデータ - 気象データ,河川の水位観測,気温 3 / 39ビッグデータとは今までと何が違うのか
何も違わない(バズワード)という批判 基盤となる技術は既存のものと同じ データ工学,情報検索,データマイニング,機械学習など 色々なデータを混ぜ合わせ一つの知見を得たい 大量の埋もれているデータから今まで知られていなかった 情報を得たいデータベース
Relational Database Management System(RDBMS) Oracle, MySQL, PostgreSQL, MicrosoftSQL, ...
SQL という標準形式でどのシステムでも問合せできる 異なるデータベースを使ってもシステムを変更する必要が 無い データを表形式で格納,抽出 複雑な問合せが可能 一つのシステムで動く = 負荷分散できない 処理は必ず成功 or 失敗する = 中途半端に処理が終わらない 5 / 39
データベースの ACID 特性
データベースは,以下の四つの性質を満たしている必要が ある. Atomicity (原子性): トランザクション処理はすべて実行 されるか,全く実行されない状態かのいずれかで終わる Consistency(一貫性): トランザクション処理の前後で データに矛盾を生じない Isolation(独立性): 他のトランザクション処理から影響 されない Durability(耐久性): 障害が発生しても更新結果は保持さ れる これらを全部合わせて「整合性 (Consistency) 」と呼ぶCAP 定理
以下の三つを同時に保証することは不可能 データの整合性 (Consistency) データの可用性 (Availability) データの分散化 (Partition-tolerance) RDB では,整合性を保証した結果,分散化すると速度が低 下する→KVS (Key Value Store)の出現
KVS の出現
整合性は犠牲,可用性や分散性を向上という発想 データは表形式ではなく, “Key” と “Value” の組 検索方法はシステム依存 SQL は使えない = 独自の問合せ言語 NoSQL と呼ばれることも Not Only SQL の略 NO SQL (SQL は不要)という印象をもつ人も ドキュメント指向のものが多い Value の部分に,大量のデータを入れることもKVS の歴史
Google が BigTable を開発 (2004-2005)
数百台から数千台のサーバでペタバイト級のデータを扱う Google 内のサービスで活用 (Google Reader, Google Map, Blogger, ...)
ソースや実行環境は公開されていない Google 以外のサーバでは使えない クローンの出現
BigTable の論文を基に様々な実装を
Apache HBase - Hadoop 上の KVS 実装 Apache Cassandra
MongoDB
Neo4j - グラフを対象にした KVS ... などなど
最近の傾向
Standalone で SQL を使いたい: 商用なら Oracle,特に何 もなければ MySQL 地理データを使いたい: PostgreSQL プロトタイプで簡単に: SQLite NoSQL なら: MongoDB,Cassandra など 何も考えずに: ファイルKey Value Store
JSON 形式を用いられることが多いJSON 形式
{"Key" : "Value"} {"Key" : {"Key2": "Value2"} }[{"Key":"Value"}, {"Key2": "Value2"}, ...]
{"ノートパソコン":
{"重量":"10kg", "大きさ":"A4", "バッテリー":"10 時間"} }
最近のシステム構築に関する動向
一つの大きなシステムよりも複数の小さなシステム 倍の値段のサーバは性能が倍とはならない 性能/価格比が最も良いサーバ = コモディティサーバ サーバの数が多い = 壊れやすい 少しは壊れても問題無いようにシステムを設計する必要あり 一つのポイントが壊れてしまうとシステムが動作しない点 SPOF (Single Point Of Failure) を無くす文字のカウント
「台風」は何回出現するか (Java like language)
File file = fopen("file.txt"); //ファイルを開くint count = 0; //変数 count を設定 for(String s: file){ if(s.contains("台風")){ //「台風」が入っていたら・・・? count += 1; // count に 1 を足す } } System.out.println("台風は"+count+"回出現しました"); 変数countはこのプログラム内でしか利用できない. もし他のマシンで変数の値が変わってもこの値は変化せず 13 / 39
このプログラムは何が問題なのか
メモリが足りない 大量のデータに比例するメモリが無ければ,メモリ不足にな り処理続行できない CPU を有効に活用できていない 結局 1 個の CPU だけしか活かせていない データの読み出し速度が遅い ハードディスクからデータを読み出す速度はシーケンシャル で 200MiB/s,ランダムで 380 IOPS (SAS 15000rpm) いくらお金をかけてもこれ以上にはならない並列化
複数のマシン間で協調させて動かすのは難しい ファイル番号が偶数ならシステム A,奇数なら B が動作する ようにすると,偶数と奇数が同じ処理数でなければ,有効に 動かすことができない 分け方によっては同じファイルを 2 回処理してしまうことも 最後に集計は 1 個のマシンしか動作させられない メモリはマシンの間で共有できない あるマシンへ変数の値を変えても別のマシンに知らせること は難しい マシン間で影響が無いように問題を分割 15 / 39複数マシンの並列化手法の登場
MapReduce の出現 (Google) 一つのタスクを細かな単位 (map) に細分化し処理 map で処理された部分をまとめる (reduce) MapReduce は Google 以外では利用できない 誰でも使える実装として Hadoop が作られる分散ファイルシステム Google File System の出現 多くのサーバにデータを分散して配置
いくつかディスクやサーバが壊れてもデータは無事 GFS は Google 以外では利用できない
誰でも使える実装として hdfs が作られる Hadoop Distributed File System
サーバの変化
一つの大きなサーバ 小さなサーバの集合体
(ブレードサーバ)
*1
*1by Robert Kloosterhuis (Wikimedia Commons)
京
京のラック 2 一つひとつは小さなコン ピュータ 上の方 12 個のサーバ 中央は電源 下の方 12 個のサーバ 複数のサーバを効率良く組 み合わせられるかが勝負Hadoop や Spark に対する誤解
(誤解 1) Hadoop や Spark で書くと処理スピードが速く なる 少量のデータを扱う時は逆に遅くなる データが非常に多いときに処理可能になる (誤解 2) 並列処理のプログラミングが面倒 並列処理を意識したプログラムはほとんどありません 19 / 39Hadoop のプログラム (単語の数え上げ)
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.* public class WordCount { public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException { int sum = 0;
for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
Hadoop の問題点
主にバッチ処理
時々刻々と更新されるデータを処理するのは苦手 出力は常にファイル
Map → Reduce の先はディスクに保存しなければならない Map → Reduce → Map → Reduce →などの処理を行うとき に,ディスクアクセスが発生し速度が低下
特に機械学習において速度の問題は深刻 同じデータを何度も繰り返し読む場面が多い
Apache Spark とは
大規模なデータを活用するための高速で総合的なデータプ ラットフォーム
利用しやすい
Scala, Java, Python, R shell が使える
SQL やストリームデータなどを扱うことができる 機械学習フレームワークが既に幾つか実装されている どこででも動く Java JVM が動くこと,相互にネットワーク接続されて入れ ばどこでも動く ただし,動作させるまでに設定が結構大変 NAIST では ITC で既に設定済み.誰でも動作可能
Apache Spark を使ってみよう
1 bda1node??へログイン ?? は05∼16のうちのどれかを使ってください 2 データの準備 Spark のプログラムから見えるファイルシステム hdfs へ ファイルを移動します 3 プログラムの作成 この授業では python を使いますが,Java/Scala でも作成 可能です 4 プログラムの実行 実際にプログラムを実行し,実行結果を出力します 23 / 39ログイン・PySpark を起動・終了
以下,リモート側のプロンプトは %, ローカル側は $で示 します.pyspark 起動・終了
$ ssh bda1node05.naist.jp (05 の部分を 05 から 18 までの数に変える) % pyspark ← Spark を立ち上げる Python 2.6.6 (r266:84292, Jan 22 2014, 01:49:05) (中略)Using Python version 2.6.6 SparkContext available as sc.
>>> exit() ← Spark を終わる.Ctl-D でも良い % logout
データを取ってくる
データ取得
% ls | grep txt
iphone6.ja.txt.xz iphone6s.ja.txt.xz Apple.txt.xz オリンピック.ja.txt.xz 台風.ja.txt.xz // 最後に xz が付いているものは圧縮されている // なので解凍する(ちょっと時間かかります) % xz -d Apple.txt.xz // ファイル確認 % ls |grep txt Apple.txt 25 / 39
対象データの準備
hdfs へデータを移動
% ls | grep ja.txt iphone6.ja.txt iphone6s.ja.txt オリンピック.ja.txt 台風.ja.txt // 台風.ja.txt を spark で読み書きできるように % hadoop fs -put 台風.ja.txt// ファイル名一覧 % hadoop fs -ls Found 2 items drwx--- - ysuzuki is-staff 0 2015-10-06 10:14 .Trash -rw-r--r-- 3 ysuzuki is-staff 543921683 2015-10-06 00:04 台風.ja.txt
プログラミング
文字のカウント
% pyspark
// ファイルの読み込み
>>> tweets = sc.textFile(u’台風.ja.txt’)
// ‘‘奈良’’が入っている行だけを抽出
>>> nara = tweets.filter(lambda s: u’奈良’ in s) // 行をカウント
>>> nara.count() (省略)
15/10/06 15:25:03 INFO DAGScheduler: Job 3 finished: count at <stdin>:1, took 8.239254 s 2640
// 2640 行あった
RDD (Resilient Distributed Dataset)
不変 (イミュータブル) で並列実行可能な (分割された) コレ クション 情報が出てくる蛇口 メモリ上で保持される 分割され,サーバ上に分散して配置される 今回はファイル.DB 上のデータやインターネット上の データを扱うことも可能 遅延実行 実際に処理が必要となるまで処理が開始されない 三種類の RDD RDD (一行に一つの値だけが入る RDD)PairRDD (一行に Key と Value の二つが入っている RDD) SchemaRDD (SparkSQL で扱う RDD)
プログラミング
” 奈良” が含まれる
% pyspark
// ファイルの読み込み
>>> tweets = sc.textFile(u’台風.ja.txt’)
// ‘‘奈良’’が入っている行だけを抽出
>>> nara = tweets.filter(lambda s: u’奈良’ in s) // 行をカウント
>>> nara.count() (省略)
15/10/06 15:25:03 INFO DAGScheduler: Job 3 finished: count at <stdin>:1, took 8.239254 s
2640
// 2640 行あった
プログラミング
複数台のサーバを使う
// マシンを二つ利用する (--master local[2]) % pyspark --master local\[2\]
// 同じ処理を行う
15/10/06 15:25:03 INFO DAGScheduler: Job 3 finished: count at <stdin>:1, took 8.166796 s
2640
// 8.23秒かかっていたのが 8.16秒になった
ファイルに保存されているプログラムを実行
wordcount.py (ホームページに掲載しています)
from pyspark import SparkContext sc = SparkContext(appName="WordCount")
// RDD 作成
text = sc.textFile("Apple.txt")
// tab で区切り 左から 3 個目だけをとってくる
counts = text.flatMap(lambda line: line.split(’\t’)[2]. // スペースで区切る
split(’ ’) if len(line.split(’\t’)) == 4 else ’’)\
// (単語,1) というペアを作成
.map(lambda word: (word,1))\
// 同じ単語を集約,(単語,a) と(単語,b) が来たら (単語, a+b)を 返す
ファイルに保存されているプログラムを実行
wordcount.py (続き)
// ペア (単語, x ) を (x, 単語) へ変換 output = counts.map(lambda x:(x[1],x[0])).\ // Key で順序付け,10個取り出す sortByKey(ascending=False).take(10) // 出力for (count, word) in output:
print("%s: %i" % (word.encode(’utf_8’),count))
// Spark を停止
sc.stop()
結果を出力
実行
// データを投入 (英語の Twitter データ) % hadoop fs -put Apple.txt
// タスクを投入 % spark-submit wordcount.py // 結果が出てくる RT: 1057117 Apple: 788480 the: 640973 to: 416088 : 396710 -: 382765 // RT という単語が一番多い.次は Apple 33 / 39
Spark での基本的な処理
二つの処理方法を組み合わせる Transformations ある RDD を別の RDD へ変換する Transformations に属する処理が呼び出されても処理は行 われない Actions データの処理・保存・出力 Action に属する処理が呼び出され他時処理は行われるTransformations
filter: 条件を満たす要素だけを抽出 map: 要素を別の要素へ変換(元の要素数と変換後の要素 数は同じ) flatMap: 要素を別の要素へ変換(元の要素数と変換ごの 要素数が違う) reduceByKey: キーごとに要素を演算 sample: ランダムに要素を抽出 など,多数 35 / 39Action
collect: 全要素を配列に入れて返す count: 要素の数を返す
take: 先頭 k 個の要素を返す など,多数
RDD を二つ組み合わせ
join: 同じキーのものを合わせる subtract: 差集合を作る
など