A Highly Efficient Consolidated Platform
for Stream Processing and Hadoop
Hiroya Matsuura†, Masaru Ganse†, Toyotaro Suzumura†‡
Tokyo Institute of Technology† IBM Research Tokyo‡
Short Summary
• We create and evaluate consolidated platform
– Maximize the CPU utilization by switching stream processing and batch processing
• CPU usage raises 24.37%
• keep the requirement of SLA with latency
– SLA : Service Level Agreement
– Realize efficient dynamic load balancing algorithm – Our platform can execute exist Hadoop and Stream
Processing programs without any changing
• Extends Hadoop to dynamically adjust the number of physical nodes
Outline
1. Background
2. Integrated Execution Platform
3. Implementation
4. Evaluation
5. Related Works
6. Conclusion
1. Background
Stream Processing
• New computing paradigm to process “big data”
in “real-time”
– Process incoming data stream without storing
• Only access data that resides in memory
• Need not to store them to secondary storages
• Latency is critical in stream processing
– e.g. Algorithm trading, log analysis, etc.
• Data Stream Management System (DSMS)
– Make it easy to program and manage stream jobs
– Borealis, Aurora (MIT), System S (IBM Research), etc
Mapreduce and Apache Hadoop
• New programing model simplify the distribute
parallel processing proposed by Google
– Hadoop developed by Yahoo! is one of the implementation of Mapreduce
Batch processing Stream Processing
Storage Needs lots of disks Needless
Accuracy Very high Relatively low
Time Needs much time Real-time
=> Both processing paradigms are indispensable and
we need to run them in “limited” computing resources
Requirements
• We need to run both stream processing
and batch processing in limited resources
– It is difficult to invest additional computing
resources in real business environment.
– limitation of financial resources, electricity,
space, human resources, … and so on
• We need to maximize resource utilization
– Our goal is not to decrease latency but to
increase CPU utilization
2. Integrated Execution
Platform
Basic assumption
• Latency is critical in stream processing
compared with batch processing.
• Incoming data for stream processing is
often stable and is unpredictable
– e.g. Twitter traffic is low at day time, high at
night, and can have periodic traffic in case of
some sudden incidents.
Basic idea
System SHadoop
System S Master IEP Manager
Logger
System S Worker 1
System S Worker N
Hadoop Worker N System S
Worker 0
Hadoop Worker 1 Hadoop
Worker 0
Logger Logger Logger
System S Worker 2
Hadoop Worker 2
・・・
・・・
・・・
IEP
Node 0 (master) Node 1 Node 2 ・・・Node N
switchable Hadoop Master
optimize number of nodes for both process by dynamically switching jobs for every node to maximize cpu usage
Effective Application Scenarios
• Internet Shopping Site
– a: DoS attack detection, click stream analysis, word pattern matching
– b: generate search index, compute collaborative filter
• Financial Trading Institution
– a: algorithm trading
– b: analysis long-running trend
• Telecommunications Industry
– a: process sensor data such as GPS – b: analysis customer’s behaviors
a is for stream processing, b is for batch processing DoS : Denial-of-Service
3. Implementation
System Overview
• Integrated Execution
Platform (IEP) is
comprised of 3 units.
– stream processing
platfom
– batch processing
platform
– dynamic load
balancing component
計算ノード1計算ノード1
Load Balancing Node
Computing Node
Node Allocator Input Data
Rate Counter
Latency Counter
Node Node Node Node Node Node Node Node
Node
Distributed System
Input Data
Node Controller Input Data
Command
Input Data+ Command
Data Stream Processing
Batch Platform
Implements on System S
• We use IBM System S as a representative of
data stream processing system
– Dynamic load balancing unit and time series prediction algorithm are built on System S
– System S can import C++ or Java code as a user defined operator and modularize it
• Dynamic load balancing unit has 3 functions
– gather execution data; latency and data rate
– determine optimal node number for each process according to the prediction and latency data
– issue node re-allocation
Extending Hadoop
• implement task switcher on Hadoop
– Hadoop’s exist suspend / resume function is
poor
• need much time and cannot access from outside
– add a TCP port to issue suspend / resume
– add function to faster suspend / resume
• suspend : kill all running task and stop new task subscription
• resume : re-start task subscription
Time Series Prediction
• We employ time series model for our
prediction algorithm to improve load
balancing system
– SDAR : Sequentially Discounting Auto
Regression
• one of the change point finding algorithm
4. Evaluation
• compare our proposal with non load balancing execution
– assume burst situation and static situation
– scale latency, CPU usage and Hadoop job execution time – Input data is Twitter log April 1st to 5th 2010
• gathered through Twitter Streaming API
– generate Inverted Index of 166MB Twitter log for Hadoop, 2000 word pattern matching for stream processing
• Execution Environment
– 8 compute nodes and 2 administrative nodes connected by 1Gbps Ether
– admin nodes are only for send / recv data and logging
– 1 compute node runs not only calc job but also IEP system and Hadoop management daemons.
experimental data
• IEP can increase CPU usage while keep
processing latency
elapsed time
Input Tuple Number
elapsed time
Process Number
Allocation CPU usage Latency avg Threshold
excess number
w/o IEP 47.77% 10.71ms 121 / 21400
w/ IEP 72.14% 16.38ms 26 / 21400
5. Related Works
• Dynamic load balancing between several Web applications
– a framework to effectively allocate jobs with less migrations
– It does not take care of the SLA, which is the processing latency critically required for data stream processing.
• Dynamic load balancing between transaction processing and batch processing
– transaction processing is similar to stream processing because of the atomic process and has latency SLA.
– transaction processing is essentially finite while data stream processing includes infinite processes and continuous
processing of incoming data from various sources.
– There are only theoretical results and no implementation and evaluation details of their system
6. Conclusion
• We constructed and evaluated an Integrated Execution Platform for stream processing and Hadoop with the proposed algorithm.
– CPU usage is increased from 47.77% to 72.14% while keeping a low latency.
• Future Works
– improvement for the load balancing algorithm and the time series prediction algorithm.
– task switching should not be done by node but by CPU core. – We hope to evaluate this system with different input data
– patterns and other applications.