• 検索結果がありません。

ビッグデータアナリティクス - 第3回: 分散処理とApache Spark

N/A
N/A
Protected

Academic year: 2021

シェア "ビッグデータアナリティクス - 第3回: 分散処理とApache Spark"

Copied!
32
0
0

読み込み中.... (全文を見る)

全文

(1)

ビッグデータアナリティクス

第3回: 分散処理とApache Spark

鈴木 優

(2)

ビッグデータ

大量のデータ 地球上に存在するデータ量は 2011 年には 1.8ZB,2020 年には 35ZB に達する見込み 1ZB = 1021バイト = 1,000,000,000,000 GB 多種のデータ Word,Excel の文書,電子メール,ソーシャルメディア,センサデータ XML,CSV,JSON,text,. . . 高頻度の更新 商品の購買履歴,クレジットカード,銀行の預貯金 センサデータ,気象データ,河川の水位観測

(3)

データを処理するための戦略

高価なサーバを購入する CPU SPECfp 価格  Pentium G3420 77.6 8,946円 Xeon Gold 6128 1,470 22万円 Xeon Platinum 8180 1,770 130万円 サーバ名 メモリ 価格(本体のみ)  PRIMERGY RX1330 64GB 19万円 PRIMERGY RX2530 768GB 45万円 PRIMERGY RX4770 1536GB 180万円 ディスク規格 容量 価格(1TBあたり)  ハードディスク(SATA) 200MB/s 2500円 ハードディスク(SAS) 300MB/s 5.1万円 SSD 600MB/s 10万円 価格を上げてもそれに伴う性能を得ることは難しい 特にディスクの転送速度,ネットワークの速度は上げることが難しい

(4)

一つのサーバで処理を行うことの限界

(5)

データを処理するための戦略

小さなデータを処理するためにはRやExcelで十分 一つのサーバ,一つのメモリで事足りるとき Excel は最大 105 万行くらいしか扱えない 今回のデータは 2650 万行以上あるので扱えない データを読み出すだけで 1 ヶ月かかるようなデータをどのように扱うのかということは, 今後のビッグデータ研究において重要なテーマ 並列計算 コモディティサーバ(値段と性能のバランスが一番良いサーバ)を沢山購入,それぞれの サーバにより並列処理 最近のスーパーコンピュータは全て並列にサーバが繋がっている 一つひとつのサーバの性能は必ずしも高くない 良いところ 値段をかけるだけ得られる性能が増える(スケールする) 問題点 並列計算を行うプログラミングは難しい サーバが増えると故障確率も増える

(6)

Mapreduce

MapReduce Hadoop/Spark における基本的な並列計算の考え方 Map: 一つの処理を細かい単位に分割して処理 Reduce: Map で行った小さな単位の処理を集約 Hadoop/Spark1.x ではこれらの処理を明示的に書く必要があった Spark 2.xになり,これらの処理はバックグラウンドで実行されるように

(7)
(8)

今回のデータ

twitterのデータ(JSON)を用意 /project/bigdata-lab/bda/tweet_20171004.json 150GB (10/15現在) 2017.10.4から2017.10.13までのツイートを収集 twitterで投稿されているツイート全体の1% 特に言語やキーワードなどの指定はしていない 試しにsparkを使わずに,何個のツイートがあるか確認 データの中身を見る

% time cat /project/bigdata-lab/bda/tweet_20171004.json|wc -l 26511626

cat tweet_20171004.json 0.13s user 46.46s system 4% cpu 18:29.32 total wc -l 19.21s user 29.22s system 4% cpu 18:29.32 total

環境によるが,18分から20分ほどかかる.そのほとんどはI/O(データの入出力)に必 要な時間.

(9)

JSON

形式

データを表現する形式として,CSVやXMLと並んでよく使われる形式.元々は JavaScriptで用いられていた形式. データの中身を見る {"id":915179801954025472, "text":"め も\n 紅 い も タ ル ト と 雪 塩 ち ん す こ う","place":null,"lang":"ja"} 次のような形式で書かれている. データの中身を見る {キー:値, キー:値, キー:値, キー:値,...} 入れ子にもできる. データの中身を見る {キー:値, キー:{キー:{キー:値, キー:値,...}, キー:値,...}} 文字列はダブルクォートで囲まれている.値はそのままデータは一つひとつ改行されて いる.

(10)

まずは簡単な統計処理

このデータには,どのような言語で書かれているのか,という情報が書かれている. データの中身を見る {...,"lang":"ja",...} これを集計して,日本語で書かれているツイートは全体の何%なのかを調べたい ”lang”:”ja”が何回出てくるのかを計測(コマンドラインによる方法)

% cat /project/bigdata-lab/bda/tweet_20171004.json | grep \"lang\":\"ja\" |wc -l

(数十分かかる)

これを言語の数だけ繰り返すのは大変

(11)

Spark

のアーキテクチャ

HDFS Hive

Text CSV JSON

Hadoop Spark

Scala, Java, Python, R

(12)

Hadoop

が利用可能なシステムへログイン

HadoopとSparkは,bda1node0x (xは01から18まで)にインストールされており,ど のマシンからでも同じように利用できる.ただし,bda1node01から04までは管理用 サーバのためログインできない.そこで,sshでbda1node05へログインする. ログイン

$ ssh bda1node05

Last login: Mon Oct 16 16:00:40 2017 from mm-dhcp-128-012.naist.jp 2017年 10月 19日 木曜日 12:12:14 JST

[ysuzuki@bda1node05 ~]$

ここにはmandaraのホームディレクトリが見える.

今回利用するデータを準備する.データが大きいので,見るだけ. データを見る

$ cat /project/bigdata-lab/bda/tweet_20171004.json |head [Tue Oct 03 20:40:40 JST 2017]Establishing connection. [Tue Oct 03 20:40:42 JST 2017]Connection established. [Tue Oct 03 20:40:42 JST 2017]Receiving status stream.

{"in_reply_to_status_id_str":null,"in_reply_to_status_id":null, "created_at":"Tue Oct 03 11:40:42 +0000 2017","in_reply_to

(13)

HDFS

へデータを移動

HadoopやSparkは,Linuxが直接利用しているファイル領域へのアクセスができないた め,HDFSと呼ばれるファイル領域へファイルを移動させる必要がある.

HDFSへデータを移動

% hadoop fs -put /project/bigdata-lab/bda/tweet_20171004.json

この作業は,ファイルが更新されたときに限って行えば良い.毎回プログラムを動かすた びに行う必要はない.

HDFS内にファイルがあるかどうかを確認する.

% hadoop fs -ls -h Found 4 items

drwx--- - ysuzuki is-staff 0 2017-10-18 09:00 .Trash drwxr-xr-x - ysuzuki is-staff 0 2017-10-16 18:59 .sparkStaging drwx--- - ysuzuki is-staff 0 2017-01-02 19:32 .staging -rw-r--r-- 3 ysuzuki is-staff 152.1 G 2017-10-16 13:42 tweet_20171004 .json

(14)

Spark

を起動してみよう

SparkはScala,Java,Python,Rなどで利用可能.まずはPython版で,コマンドライ ンで利用したい.

sparkの起動

% pyspark2 --master yarn (複数のサーバで動作させるとき) % pyspark2 (一つのサーバだけで動作させるとき)

% spark2-shell --master yarn (scalaで動作させたいとき)

% pyspark2 --driver-memory 16g --executor-memory 16g --master yarn ( 重 た

い作業のとき) Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ ‘/ __/ ’_/ /__ / .__/\_,_/_/ /_/\_\ version 2.0.0.cloudera2 /_/

Using Python version 2.6.6 (r266:84292, Aug 18 2016 08:36:59) SparkSession available as ’spark’.

>>>

(15)

DataFrame

Sparkではデータの塊(DataFrame)を様々な方法で変換させることによって,必要な情 報を取り出す.変換の方法は主に以下の三種類を使う. 列や行の選択(select,filter,groupBy) 表の結合(join) 入出力(read, show) 詳細はhttps://spark.apache.org/docs/2.0.0/api/python/index.htmlを参照のこと.バー ジョンによって内容が変わるので,他のドキュメントを参照するときには,バージョン番 号に注意.

(16)

データの読み込み

データを読み込む >>> df = spark.read.json("/user/ysuzuki/tweet_20171004.json") dfという変数の中に, tweet_20171004.jsonの内容が入っている.この中に何個の データが含まれているのかを数える. データを数える >>> df.count() 33573798 さきほどのコマンドラインによる方法と比べて,十分速く処理できていることを確認.

(17)

言語ごとに何個のツイートがあるか

日本語のツイートは何個あるか. 言語ごとのデータ >>> df.filter("lang=’ja’").count() 6600510 jaの部分をen やesなどに変更することによって,数を数えることはできる. ところが,これは面倒なので,groupBy関数を使って集約することを考える.

(18)

言語ごとに何個のツイートがあるか

言語ごとのデータ >>> df.groupBy("lang").count().sort("count", ascending=False).show() +----+---+ |lang| count| +----+---+ | en|7057160| | ja|4458359| | es|2002657| | ar|1731097| | und|1447410| | ko|1307743| | pt|1280456| | th| 759078 ... +----+---+

only showing top 20 rows

(19)

二つのモード

あるデータが与えられたとき,1)それがどのようになっているのかが分からないときと, 2)すでにどのような調査を行うべきかが分かっているときがある. インタラクティブモード 今まで行ってきたモード.都度処理されたものに対して処理を適宜追加していく. バッチモード プログラムで全ての動作を記述し Spark に投入する.

(20)

ファイルに書かれたプログラムを実行

count.pyに次のように書く

count.py

from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("App example") \ .config("master", "yarn") \ .getOrCreate() df = spark.read.json("tweet_20171004.json") print df.count() プログラムを実行する. count.pyを実行

% spark2-submit count.py | tee output.txt % cat output.txt

(21)

文字カウント

データを数える

>>> from pyspark.sql.functions import explode,split >>> df.filter("lang = ’en’").select("text").distinct() .select(explode(split(’text’,’ ’))).groupBy("col").count() .sort("count",ascending=False).show() +----+---+ | col| count| +----+---+ | RT|1735023| | the|1218371| | to|1136336| | a| 883388| | I| 737669| ... | be| 248032| | me| 239061| +----+---+

(22)

一番ツイートされているテキストは何なのか

データを数える >>> df.groupBy("text").count().show() +---+---+ | text|count| +---+---+ |Big economic call...| 1| |@TomiLahren reall...| 1| |RT @PossumPastor:...| 2| |RT @dsmesk: ...| 13| countで順番を変える >>> df.groupBy("text").count().sort("count",ascending=False).show() +---+---+ | text|count| +---+---+ |RT @akiko_lawson:...| 4284| |RT @Kaepernick7: ...| 3747| |RT @RodriguezDaGo...| 3419| |RT @TheRealNyha: ...| 3305|

(23)

一番ツイートされているテキストは何なのか

テキストを全部表示させるようにする >>> df.groupBy("text").count().sort("count",ascending=False).show(20,False) +---+---+ |text |count| +---+---+ |RT @akiko_lawson: \新発売記念 #Lコロ無料プレゼント / フォロー&リツイートで、10/15まで 毎日抽選で【1万名様】に、ほくっとおいしい #Lコロ がその場で当たります(^^) 3日目は10/13 10:59まで!#ローソン… |4284 |

|RT @Kaepernick7: I appreciate you @Eminem https://t.co/nwavBwsOkQ |3747 |

どうもローソンのツイートキャンペーンがあったようだ.Eminemへのツイートが多いよ うだ.

(24)

一番ツイートしている人は誰なのか

誰が一番ツイートしているのかを表示させたい. >>> df.groupBy("user.name").count().sort("count",ascending=False) .show(20,False) +----+---+ |name|count| +----+---+ | .|53716| | |17986| | |15129| | -|14615| | ;|11380| | ♡|10926| |ローソン| 9044| これは不正確なデータ.同じ名前だが違う人の場合,重複してカウントされてしまって いる. 解決するために,ユーザIDで集計し直す.

(25)

一番ツイートしている人は誰なのか

まずはユーザIDで集計 >>> a = df.groupBy("user.id").count().sort("count",ascending=False) >>> a.show() +---+---+ | id|count| +---+---+ | 115639376| 9038| | 4823945834| 3736| | 1662830792| 3445| |856385582401966080| 3242| | 2669983818| 3015| |796251890908434432| 2663| | 104120518| 1592|

(26)

一番ツイートしている人は誰なのか

ユーザIDとユーザ名の対応表を作成 >>> b = df.select("user.id","user.name").distinct() >>> b.show() +---+---+ | id| name| +---+---+ |767396295665299456| ひかり| |859974072834310144| [6 ] | | 535819067| Vibes| | 517553112| Liseth Valencia R.|

(27)

一番ツイートしている人は誰なのか

aとbを結合する

>>> c = a.join(b, a.id == b.id,’inner’).select("name","count") >>> c.show() +---+---+ | name|count| +---+---+ | ローソン| 9038| | McDonalds Japan| 3736| | | 3445| | 【公式】きららファンタジア| 3242| | Test Account1| 3015| | ネスカフェ(NESCAFÉ )| 2663| | アクエリアス| 1592| | キングスマン| 1496|

(28)

条件を指定

textに「選挙」という文字が含まれているツイートだけを抽出 >>> df.where(col(’text’).like("%選挙%")).select(’text’).show() +---+ | text| +---+ |選挙で考えてるのは ・政策 ・別の...| |RT @keigomi29: たら...| |RT @shunchoukatsu...| |RT @tkq12: 衆議院選挙の...| |RT @pentabutabu: ...|

(29)

結果を保存する

CSVで保存

>>> c.write.csv("output_csv") >>> exit

$ hadoop fs -get output_csv

ディレクトリoutput_csvの中に全部入っている.これを使ってExcelなどでグラフにす ることが可能.文字コードがutf-8なので,テキストが入っているときには,なんとかし て文字コードを変換しないとExcelで見られない.

(30)
(31)

必須課題

以下の課題,および同様の難易度であると考えられる課題を3個以上行い,実行に利用し たコマンドおよびその結果,考察を述べよ. 「選挙」について書かれているツイートを表示 「選挙」について頻繁に書いているユーザ上位20人を表示 最もフォローされているユーザを表示 最もツイートされている政党名を1日ごとに集計して表示 県によってツイートされている政党名の違い なお,選挙に関係させる必要はないし,日本語に限定する必要も無い. この必須課題については,レポートに含めてください.個別に行ってください.

(32)

発表課題

自由にテーマを考え,Apache Sparkを用いて処理を行った結果を提示してください. データは何を使うか自由ですが,元のデータは少なくとも100GB以上となるように してください. 提出方法などは,吉野先生担当分で指示します. グループワークです.

参照

関連したドキュメント

テキストマイニング は,大量の構 造化されていないテキスト情報を様々な観点から

この節では mKdV 方程式を興味の中心に据えて,mKdV 方程式によって統制されるような平面曲線の連 続朗変形,半離散 mKdV

LLVM から Haskell への変換は、各 LLVM 命令をそれと 同等な処理を行う Haskell のプログラムに変換することに より、実現される。

次に、第 2 部は、スキーマ療法による認知の修正を目指したプログラムとな

ヒュームがこのような表現をとるのは当然の ことながら、「人間は理性によって感情を支配

このエアコンは冷房運転時のドレン(除湿)水を内部で蒸発さ

しかし , 特性関数 を使った証明には複素解析や Fourier 解析の知識が多少必要となってくるため , ここではより初等的な道 具のみで証明を実行できる Stein の方法

口文字」は患者さんと介護者以外に道具など不要。家で も外 出先でもどんなときでも会話をするようにコミュニケー ションを