• 検索結果がありません。

3.4 A Generate-Test-Aggregate Parallel Programming Pattern on MapReduce . 32

3.4.2 Solving Problems with GTA

We will use the Knapsack problem and its variants as examples to show how to use the GTA library. In each example, we show how to choose proper generators, testers, and aggregators to resolve the problem. Usually, one GTA program can be written in only a few lines of Scala code, by making use of the GTA components provided in the library.

The 0-1 Knapsack problem First, recall the 0-1 Knapsack problem in the beginning of Section 3.4. We can apply the GTA algorithm that first generates all possible candidates, then filters them using the predicate of weight limitation, at last computes the total value of every remained candidate and chooses the one that has the maximum total value. This problem can be programmed by usingallSelects, maxTotalValue(a modifiedmaxSum for Knapsack problem,) and WeightLimit, as shown in Listing 3.8. The final output of this program is the maximum total value of all the candidates. The Listing 3.9 shows an-other GTA-Knapsack program that gives the best selection (the one with maximum total value under the weight limitation). In this program we use a selectiveAgg in the GTA

3.4 A Generate-Test-Aggregate Parallel Programming Pattern on MapReduce 41

Listing 3.9 A GTA program sloving the 0-1 Knapsack problem, which gives the best selection

1 ... /* omitted */

2 val bagAgg = new BagAggregator[KnapsackItem]

3 val selectiveAgg = new SelectiveAggregator(maxTotalVal, bagAgg)

4 /* GTA expression */

5 val gta = generate(allSelects) .

6 filter (withLimit_100) .

7 aggregate (selectiveAgg)

8 println( "The best selection is " +

9 gta.postProcess(input.map(gta.f).reduce(gta.combine))

10 )

Listing 3.10 Extended 0-1 Knapsack Problem

1 ...

2 val allSelects =new AllSelects[KnapsackItem]

3 val withLimit_100 =new WeightLimit(100)

4 val lessThan_10_items =LengthLimit[KnapsackItem](9)

5 object maxTotalVal extends MaxSum[KnapsackItem] {

6 def f(a: KnapsackItem): Int = a.value

7 }

8 /* define a GTA */

9 val gta = generate(allSelects) .

10 filter (withLimit_100) .

11 /* add a new filter */

12 filter (lessThan_10_items) .

13 aggregate (maxTotalValue)

14 println( /* x is input, postProcess returns the result*/

15 gta.postProcess(x.map(gta.f).reduce(gta.combine))

16 )

expression, which aggregates the selections to one (not just computes the total value). The implementation of SelectiveAggregator aggregator will be explained later in Section 6.3.

At a glance, the cost of the GTA algorithm is exponential in the number of items because it seems that an exponential number of candidates are generated. However, the GTA library optimizes the naive process and the real cost is just linear with the number of items (and quadratic with respect to the capacity of the knapsack).

Variants of the Knapsack problem A more complex example, themulti-constraint Knap-sack problem, is shown in Listing 3.10. Here, new constraint on the maximum number of

Listing 3.11 A GTA program which computes the Maximum-Segment-Sum problem

1 package Examples

2 import GTAS._

3

4 object userApp extends GTA[Int] with App {

5 /* Spark job configuration */

6 def ctx(contex:SparkContext,input:spark.RDD[Int]) = {

7 /* GTA expression */

8 val gta=generate(allSegments) aggregate (maxSum)

9 /* compute using the Spark-MapReduce API.*/

10 val rst=input.map(gta.f).reduce(gta.combine(_, _))

11 println("rst")

12 }

13 }

items in a knapsackis given: the predicateLengthLimitchecks the length of the given list in a similar way toWeightLimit. Conceptually, an arbitrary number of testers can be used in a GTA expression. More constraints can be introduced, for example, not only check the exact length but also we can check whether the length (or summation) is less/more than a constant valuec, or the length modk is equal to or less/more thanc. Still, we can add an-other constraint onthe minimum number of items in a knapsackto extend the problem more.

In all above examples, we are asked to compute the best solutions (the maximum/minimum one), and we also can find thekth-bestsolution by slightly extending these GTA programs.

Examples ofkth-bestproblems can be found in the package of our Library.

The examples presented below use more complexgenerators,testers, andaggregators.

Maximum segment sum problem Let us consider the famous maximum segment sum (mssfor short) problem [13, 36, 67, 104, 106, 116]: given a list of integers, find the maxi-mum sum of its all segments (contiguous sublists). This is a simplified problem of finding an optimal period in a history of changing values.

The GTA algorithm for mss is simple. First, choose allSegments as the generator that generates all the segments [67] of the input list. Then choosemaxSum as the aggregator to compute the maximum sum of all segments. Only a few lines of Scala code are needed for this problem. Listing 3.11 shows the GTA program formssproblem.

Themssproblem has many variants. For example, the segment should only contain at most one negative number, or the maximum sum has to be divisible by 3. Similar to extend

Knap-3.4 A Generate-Test-Aggregate Parallel Programming Pattern on MapReduce 43

Listing 3.12 ViterbiTest is the implementation of a tester, which tests if a transition is valid or not

1 abstract class ViterbiTest[E, Mark] extends Predicate[(E,Mark), Mark]

{

2

3 type MarkedTs = (E,Mark)

4 def isTrans(a: Mark): Boolean

5 def postProcess(a: Mark) = isTrans(a)

6 def combine(l: Mark, r: Mark): Mark

7 def f(a: MarkedTs): Option[Mark] = Some(a._2)

8 val id: Mark

9 ...//omit others

10 }

Listing 3.13 MaxProdAggregator is the aggregator for computing the maximum proba-bility

1 abstract class MaxProdAggregator[T] extends Aggregator[T, Double] {

2 def plus(l: Double, r: Double) = l max r

3 def times(l: Double, r: Double) = l * r

4 def f(a: T): Double

5 val id: Double = 1.0

6 val zero: Double = 0.0

7 }

sack problems, more additional predicates can be used to resolve extendedmssproblems.

Viterbi algorithm More complex problems can also be encoded by GTA. The Viterbi algorithm [128] is a dynamic programming algorithm for finding the most likely sequence of hidden states, i.e., the Viterbi path, from a given sequence of observed events. In detail, given a sequence of observed events E = (x1,x2, ...,xn), a set of states S= (z1,z2, ...,zk) in a hidden Markov model [113], probability Pyield(xi|zj)of event xiEbeing caused by statezjS, and probabilityPtrans(zi|zj)of stateziappearing immediately after statezj, the algorithm computes the most likely sequence of(z1,z2, ...,zn)formalized as follows.

arg max

Z∈Sn+1

(

n

i=1

Pyield(xi|zi)Ptrans(zi|zi−1))

In [46], the GTA approach to compute above specification is introduced as follows. First, we remove the indexi1in the specification. To this end, we let the expression range over

Listing 3.14 A GTA program for Viterbi algorithm, which runs on the Spark cluster

1 object ViterbiSpark extends GTA[Action, Tuple2[Action, (Weather, Weather)],Id]{

2 ...

3 val gta = generate(assTransGen) filter (viterbiTest) aggregate(

viterbiAgg)

4 val rst = gta.postProcess(x.map(gta.f(_).get).reduce(gta.

combine(_, _)))

5 println("The result = " + rst)

6 System.exit(0)

7 }

pairs of hidden states in S×S and introduce a predicate trans to restrict the lists of state pairs. Intuitively,trans(p)is true if and only if the given sequencepof state pairs describes consecutive transitions,

((z0,z1),(z1,z2), . . . ,(zn−2,zn−1),(zn−1,zn)), and is false otherwise. By introducing a function,

prob(x,(s,t)) =Pyield(x|t)Ptrans(t|s),

the above expression can be transformed into the following equivalent one:

arg max

p∈(S×S)n trans(p)=True

n

i=1

prob(xi,pi)

Now, we are ready to build a Generate-Test-Aggregate algorithm. Given a set of marks (in

regard to the Viterbi algorithm, the mark is the product setS×S.), the generatorMarkingGenerator associates all possible marks to elements of the given list. For example, givenS={s1,s2}

andx= [x1,x2],MarkingGeneratorcan generate

* [(x1,(s1,s1)),(x2,(s1,s1))], [(x1,(s2,s1)),(x2,(s1,s1))], [(x1,(s1,s2)),(x2,(s1,s1))], ...

[(x1,(s2,s2)),(x2,(s2,s2))] +.

3.5 Related Work 45