• 検索結果がありません。

Spark と大規模データ処理 - NAISTビッグデータアナリティクス 第2回

N/A
N/A
Protected

Academic year: 2021

シェア "Spark と大規模データ処理 - NAISTビッグデータアナリティクス 第2回"

Copied!
39
0
0

読み込み中.... (全文を見る)

全文

(1)

Spark と大規模データ処理

NAIST ビッグデータアナリティクス 第 2 回

鈴木 優

(2)

本日の内容

ビッグデータとは Hadoop から Spark まで Apache Spark の使い方

(3)

ビッグデータとは

大きなデータとは(ガードナーによる定義 (three V)) 量が多い (Volume) 地球上に存在するデータ量は 2011 年には 1.8ZB,2020 年に は 35ZB に達する見込み 1ZB (1 ゼッタバイト)= 1021B (バイト)= 1,000 EB(エ クサバイト) = 1,000,000 PB = 1,000,000,000 TB = 1,000,000,000,000 GB 種類が多い (Variety) Word, Excel などの文書,電子メール,ソーシャルメディアの データ,センサーデータ 頻度が多い (Velocity) 商品の購買履歴データ - クレジットカード,銀行の預貯金 センサーデータ - 気象データ,河川の水位観測,気温 3 / 39

(4)

ビッグデータとは今までと何が違うのか

何も違わない(バズワード)という批判 基盤となる技術は既存のものと同じ データ工学,情報検索,データマイニング,機械学習など 色々なデータを混ぜ合わせ一つの知見を得たい 大量の埋もれているデータから今まで知られていなかった 情報を得たい

(5)

データベース

Relational Database Management System(RDBMS) Oracle, MySQL, PostgreSQL, MicrosoftSQL, ...

SQL という標準形式でどのシステムでも問合せできる 異なるデータベースを使ってもシステムを変更する必要が 無い データを表形式で格納,抽出 複雑な問合せが可能 一つのシステムで動く = 負荷分散できない 処理は必ず成功 or 失敗する = 中途半端に処理が終わらない 5 / 39

(6)

データベースの ACID 特性

データベースは,以下の四つの性質を満たしている必要が ある. Atomicity (原子性): トランザクション処理はすべて実行 されるか,全く実行されない状態かのいずれかで終わる Consistency(一貫性): トランザクション処理の前後で データに矛盾を生じない Isolation(独立性): 他のトランザクション処理から影響 されない Durability(耐久性): 障害が発生しても更新結果は保持さ れる これらを全部合わせて「整合性 (Consistency) 」と呼ぶ

(7)

CAP 定理

以下の三つを同時に保証することは不可能 データの整合性 (Consistency) データの可用性 (Availability) データの分散化 (Partition-tolerance) RDB では,整合性を保証した結果,分散化すると速度が低 下する

KVS (Key Value Store)の出現

(8)

KVS の出現

整合性は犠牲,可用性や分散性を向上という発想 データは表形式ではなく, “Key” と “Value” の組 検索方法はシステム依存 SQL は使えない = 独自の問合せ言語 NoSQL と呼ばれることも Not Only SQL の略 NO SQL (SQL は不要)という印象をもつ人も ドキュメント指向のものが多い Value の部分に,大量のデータを入れることも

(9)

KVS の歴史

Google が BigTable を開発 (2004-2005)

数百台から数千台のサーバでペタバイト級のデータを扱う Google 内のサービスで活用 (Google Reader, Google Map, Blogger, ...)

ソースや実行環境は公開されていない Google 以外のサーバでは使えない クローンの出現

BigTable の論文を基に様々な実装を

Apache HBase - Hadoop 上の KVS 実装 Apache Cassandra

MongoDB

Neo4j - グラフを対象にした KVS ... などなど

(10)

最近の傾向

Standalone で SQL を使いたい: 商用なら Oracle,特に何 もなければ MySQL 地理データを使いたい: PostgreSQL プロトタイプで簡単に: SQLite NoSQL なら: MongoDB,Cassandra など 何も考えずに: ファイル

(11)

Key Value Store

JSON 形式を用いられることが多い

JSON 形式

{"Key" : "Value"} {"Key" : {"Key2": "Value2"} }

[{"Key":"Value"}, {"Key2": "Value2"}, ...]

{"ノートパソコン":

{"重量":"10kg", "大きさ":"A4", "バッテリー":"10 時間"} }

(12)

最近のシステム構築に関する動向

一つの大きなシステムよりも複数の小さなシステム 倍の値段のサーバは性能が倍とはならない 性能/価格比が最も良いサーバ = コモディティサーバ サーバの数が多い = 壊れやすい 少しは壊れても問題無いようにシステムを設計する必要あり 一つのポイントが壊れてしまうとシステムが動作しない点 SPOF (Single Point Of Failure) を無くす

(13)

文字のカウント

「台風」は何回出現するか (Java like language)

File file = fopen("file.txt"); //ファイルを開く

int count = 0;     //変数 count を設定 for(String s: file){ if(s.contains("台風")){ //「台風」が入っていたら・・・? count += 1; // count に 1 を足す } } System.out.println("台風は"+count+"回出現しました"); 変数countはこのプログラム内でしか利用できない. もし他のマシンで変数の値が変わってもこの値は変化せず 13 / 39

(14)

このプログラムは何が問題なのか

メモリが足りない 大量のデータに比例するメモリが無ければ,メモリ不足にな り処理続行できない CPU を有効に活用できていない 結局 1 個の CPU だけしか活かせていない データの読み出し速度が遅い ハードディスクからデータを読み出す速度はシーケンシャル で 200MiB/s,ランダムで 380 IOPS (SAS 15000rpm) いくらお金をかけてもこれ以上にはならない

(15)

並列化

複数のマシン間で協調させて動かすのは難しい ファイル番号が偶数ならシステム A,奇数なら B が動作する ようにすると,偶数と奇数が同じ処理数でなければ,有効に 動かすことができない 分け方によっては同じファイルを 2 回処理してしまうことも 最後に集計は 1 個のマシンしか動作させられない メモリはマシンの間で共有できない あるマシンへ変数の値を変えても別のマシンに知らせること は難しい マシン間で影響が無いように問題を分割 15 / 39

(16)

複数マシンの並列化手法の登場

MapReduce の出現 (Google) 一つのタスクを細かな単位 (map) に細分化し処理 map で処理された部分をまとめる (reduce) MapReduce は Google 以外では利用できない 誰でも使える実装として Hadoop が作られる

分散ファイルシステム Google File System の出現 多くのサーバにデータを分散して配置

いくつかディスクやサーバが壊れてもデータは無事 GFS は Google 以外では利用できない

誰でも使える実装として hdfs が作られる Hadoop Distributed File System

(17)

サーバの変化

一つの大きなサーバ 小さなサーバの集合体

(ブレードサーバ)

*1

*1by Robert Kloosterhuis (Wikimedia Commons)

(18)

京のラック 2 一つひとつは小さなコン ピュータ 上の方 12 個のサーバ 中央は電源 下の方 12 個のサーバ 複数のサーバを効率良く組 み合わせられるかが勝負

(19)

Hadoop や Spark に対する誤解

(誤解 1) Hadoop や Spark で書くと処理スピードが速く なる 少量のデータを扱う時は逆に遅くなる データが非常に多いときに処理可能になる (誤解 2) 並列処理のプログラミングが面倒 並列処理を意識したプログラムはほとんどありません 19 / 39

(20)

Hadoop のプログラム  (単語の数え上げ)

import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.* public class WordCount { public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text();

public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

public static class IntSumReducer

extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context

) throws IOException, InterruptedException { int sum = 0;

for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

public static void main(String[] args) throws Exception { Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }

(21)

Hadoop の問題点

主にバッチ処理

時々刻々と更新されるデータを処理するのは苦手 出力は常にファイル

Map → Reduce の先はディスクに保存しなければならない Map → Reduce → Map → Reduce →などの処理を行うとき に,ディスクアクセスが発生し速度が低下

特に機械学習において速度の問題は深刻 同じデータを何度も繰り返し読む場面が多い

(22)

Apache Spark とは

大規模なデータを活用するための高速で総合的なデータプ ラットフォーム

利用しやすい

Scala, Java, Python, R shell が使える

SQL やストリームデータなどを扱うことができる 機械学習フレームワークが既に幾つか実装されている どこででも動く Java JVM が動くこと,相互にネットワーク接続されて入れ ばどこでも動く ただし,動作させるまでに設定が結構大変 NAIST では ITC で既に設定済み.誰でも動作可能

(23)

Apache Spark を使ってみよう

1 bda1node??へログイン ?? は05∼16のうちのどれかを使ってください 2 データの準備 Spark のプログラムから見えるファイルシステム hdfs へ ファイルを移動します 3 プログラムの作成 この授業では python を使いますが,Java/Scala でも作成 可能です 4 プログラムの実行 実際にプログラムを実行し,実行結果を出力します 23 / 39

(24)

ログイン・PySpark を起動・終了

以下,リモート側のプロンプトは %, ローカル側は $で示 します.

pyspark 起動・終了

$ ssh bda1node05.naist.jp (05 の部分を 05 から 18 までの数に変える) % pyspark ← Spark を立ち上げる Python 2.6.6 (r266:84292, Jan 22 2014, 01:49:05) (中略)

Using Python version 2.6.6 SparkContext available as sc.

>>> exit() ← Spark を終わる.Ctl-D でも良い % logout

(25)

データを取ってくる

データ取得

% ls | grep txt

iphone6.ja.txt.xz iphone6s.ja.txt.xz Apple.txt.xz オリンピック.ja.txt.xz 台風.ja.txt.xz // 最後に xz が付いているものは圧縮されている // なので解凍する(ちょっと時間かかります) % xz -d Apple.txt.xz // ファイル確認 % ls |grep txt Apple.txt 25 / 39

(26)

対象データの準備

hdfs へデータを移動

% ls | grep ja.txt iphone6.ja.txt iphone6s.ja.txt オリンピック.ja.txt 台風.ja.txt // 台風.ja.txt を spark で読み書きできるように % hadoop fs -put 台風.ja.txt

// ファイル名一覧 % hadoop fs -ls Found 2 items drwx--- - ysuzuki is-staff 0 2015-10-06 10:14 .Trash -rw-r--r-- 3 ysuzuki is-staff 543921683 2015-10-06 00:04 台風.ja.txt

(27)

プログラミング

文字のカウント

% pyspark

// ファイルの読み込み

>>> tweets = sc.textFile(u’台風.ja.txt’)

// ‘‘奈良’’が入っている行だけを抽出

>>> nara = tweets.filter(lambda s: u’奈良’ in s) // 行をカウント

>>> nara.count() (省略)

15/10/06 15:25:03 INFO DAGScheduler: Job 3 finished: count at <stdin>:1, took 8.239254 s 2640

// 2640 行あった

(28)

RDD   (Resilient Distributed Dataset)

不変 (イミュータブル) で並列実行可能な (分割された) コレ クション 情報が出てくる蛇口 メモリ上で保持される 分割され,サーバ上に分散して配置される 今回はファイル.DB 上のデータやインターネット上の データを扱うことも可能 遅延実行 実際に処理が必要となるまで処理が開始されない 三種類の RDD RDD (一行に一つの値だけが入る RDD)

PairRDD (一行に Key と Value の二つが入っている RDD) SchemaRDD (SparkSQL で扱う RDD)

(29)

プログラミング

” 奈良” が含まれる

% pyspark

// ファイルの読み込み

>>> tweets = sc.textFile(u’台風.ja.txt’)

// ‘‘奈良’’が入っている行だけを抽出

>>> nara = tweets.filter(lambda s: u’奈良’ in s) // 行をカウント

>>> nara.count() (省略)

15/10/06 15:25:03 INFO DAGScheduler: Job 3 finished: count at <stdin>:1, took 8.239254 s

2640

// 2640 行あった

(30)

プログラミング

複数台のサーバを使う

// マシンを二つ利用する (--master local[2]) % pyspark --master local\[2\]

// 同じ処理を行う

15/10/06 15:25:03 INFO DAGScheduler: Job 3 finished: count at <stdin>:1, took 8.166796 s

2640

// 8.23秒かかっていたのが 8.16秒になった

(31)

ファイルに保存されているプログラムを実行

wordcount.py (ホームページに掲載しています)

from pyspark import SparkContext sc = SparkContext(appName="WordCount")

// RDD 作成

text = sc.textFile("Apple.txt")

// tab で区切り 左から 3 個目だけをとってくる

counts = text.flatMap(lambda line: line.split(’\t’)[2]. // スペースで区切る

split(’ ’) if len(line.split(’\t’)) == 4 else ’’)\

// (単語,1) というペアを作成

.map(lambda word: (word,1))\

// 同じ単語を集約,(単語,a) と(単語,b) が来たら (単語, a+b)を 返す

(32)

ファイルに保存されているプログラムを実行

wordcount.py (続き)

// ペア (単語, x ) を (x, 単語) へ変換 output = counts.map(lambda x:(x[1],x[0])).\ // Key で順序付け,10個取り出す sortByKey(ascending=False).take(10) // 出力

for (count, word) in output:

print("%s: %i" % (word.encode(’utf_8’),count))

// Spark を停止

sc.stop()

(33)

結果を出力

実行

// データを投入 (英語の Twitter データ) % hadoop fs -put Apple.txt

// タスクを投入 % spark-submit wordcount.py // 結果が出てくる RT: 1057117 Apple: 788480 the: 640973 to: 416088 : 396710 -: 382765 // RT という単語が一番多い.次は Apple 33 / 39

(34)

Spark での基本的な処理

二つの処理方法を組み合わせる Transformations ある RDD を別の RDD へ変換する Transformations に属する処理が呼び出されても処理は行 われない Actions データの処理・保存・出力 Action に属する処理が呼び出され他時処理は行われる

(35)

Transformations

filter: 条件を満たす要素だけを抽出 map: 要素を別の要素へ変換(元の要素数と変換後の要素 数は同じ)   flatMap: 要素を別の要素へ変換(元の要素数と変換ごの 要素数が違う) reduceByKey: キーごとに要素を演算 sample: ランダムに要素を抽出 など,多数 35 / 39

(36)

Action

collect: 全要素を配列に入れて返す count: 要素の数を返す

take: 先頭 k 個の要素を返す など,多数

(37)

RDD を二つ組み合わせ

join: 同じキーのものを合わせる subtract: 差集合を作る

など

(38)

練習問題

Twitter データから次のような集計を行いましょう 台風データから “NAIST” という文字列は何回出てくるか 台風データから “生駒” という文字列は何回出てくるか iPhone6s で最もツイートされている色は何色? PS4, XBox. “Good” と言われる確率が高いのはどっち? PS4 のデータで  “XBox” と共に良く使われている単語は何?

(39)

練習用データ

授業のページからダウンロードしてください. Apple (E) GalaxyS6 (E) iPad (E) iPhone6/6s (E/J) iPhone (E) オリンピック (J) PS4 (E) SONY (E) Surface (E) XboxOne (E) Xbox (E) Xperia (E) 台風 (J) ← 一番大きい 39 / 39

参照

関連したドキュメント

With respect to each good of Chapter 50 through 63 of the Harmonized System, in the case where a material of the other Country or a third State which is a member country of the

[r]

[r]

[r]

一 六〇四 ・一五 CC( 第 三類の 非原産 材料を 使用す る場合 には、 当該 非原産 材料の それぞ

[r]

[r]

Description of good(s); HS tariff classification number. 産品ごとの品番(必要に応じ)、包装の記号・番号、包装の個数・種類、品