ステップ 5: コンシューマーを実装する
• 最後のレポートから 1 分間以上経過した場合は、reportStats() を呼び出して最新の統計を出力 し、次の間隔に新しいレコードのみ含まれるように resetStats() を呼び出して統計を消去しま す。
• 次のレポート時間を設定します。
• 最後のチェックポイントから 1 分間以上経過した場合は、checkpoint() を呼び出します。
• 次のチェックポイント時間を設定します。
このメソッドでは、60 秒間間隔でレポートおよびチェックポイント時間が設定されています。チェッ クポイントの詳細については、「コンシューマーに関する追加情報 (p. 35)」を参照してください。
StockStats クラス
このクラスでは、データを保持し、最も人気のある株式の経時的な統計を示すことができます。この コードは、事前に用意されており、次のメソッドが含まれています。
• addStockTrade(StockTrade): 指定された StockTrade を実行中の統計に取り込みます。
• toString(): 特定の形式の文字列として統計を返します。
このクラスは、各株式の合計取引数と最大取引数を継続的にカウントすることで、最も人気のある株 式を追跡します。これらの数は、株式取引を受け取る度に更新されます。
次のステップに示されているコードを StockTradeRecordProcessor クラスのメソッドに追加します。
コンシューマーを実装するには
1. processRecord メソッドを実装するには、サイズの正しい StockTrade オブジェクトを開始し、そ れにレコードデータを追加します。また、問題が発生した場合に警告がログに記録されるようにしま す。
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
if (trade == null) {
LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: "
+ record.getPartitionKey());
return;
}stockStats.addStockTrade(trade);
2. 簡単な reportStats メソッドを実装します。出力形式は好みに応じて自由に変更することができま す。
System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******
\n" +
stockStats + "\n" +
"****************************************************************
\n");
3. 最後に、新しい resetStats インスタンスを作成する stockStats メソッドを実装します。
stockStats = new StockStats();
コンシューマーを実行するには
1. (p. 30) で記述したプロデューサーを実行し、シミュレートした株式取引レコードをストリームに 取り込みます。
2. 前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアが ファイル ~/.aws/credentials に保存されていることを確認します。
3. 次の引数を指定して StockTradesProcessor クラスを実行します。
ステップ 5: コンシューマーを実装する
StockTradesProcessor StockTradeStream us-west-2
us-west-2 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指 定する必要があります。
1 分後、次のような出力が表示されます。その後、1 分間ごとに出力が更新されます。
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
コンシューマーに関する追加情報
「Kinesis Client Library 1.x を使用したコンシューマーの開発 (p. 118)」などで説明されている Kinesis Client Library のメリットに詳しい方であれば、ここで使用することに疑問を感じるかもしれません。1 つのシャードストリームとそれを処理する 1 つのコンシューマーインスタンスしか使用しない場合で も、KCL を使用して簡単にコンシューマーを実装することができます。プロデューサーセクションとコン シューマーセクションのコードの実装手順を比較すると、コンシューマーの実装の方が比較的に簡単であ ることがわかります。これは、KCL で提供されているサービスが大きく関係しています。
このアプリケーションでは、個別のレコードを処理できるレコードプロセッサクラスの実装に焦点を合わ せてきました。新しいレコードが使用可能になると、KCL がレコードを取得してレコードプロセッサを呼 び出すため、Kinesis Data Streams からレコードを取得する方法を心配しなくて済みます。また、シャー ド数やコンシューマーインスタンス数についても心配しなくて済みます。ストリームがスケールアップさ れても、複数のシャードやコンシューマーインスタンスを処理するためにアプリケーションを書き直す必 要はありません。
チェックポイントとは、ストリームにおける特定のポイントのことで、それまでに消費および処理された データレコードが記録されます。このため、アプリケーションがクラッシュしても、ストリームの始めか らではなく、そのポイントからストリームが読み取られます。チェックポイントやそのさまざまな設計パ ターン、およびベストプラクティスは、この章の範囲外です。ただし、本番環境ではこのような問題に直 面することがあります。
「 (p. 30)」で説明したように、Kinesis Data Streams API の put オペレーションでは パーティション キーを入力として受け取ります。Kinesis Data Streams では、複数のシャード間でレコードを分割する方 法としてパーティションキーを使用します (ストリームに複数のシャードがある場合)。同じパーティショ ンキーは、常に同じシャードにルーティングされます。このため、同じパーティションキーを持つレコー ドはそのコンシューマーにのみ送信され、他のコンシューマーに送信されることはないと仮定して、特 定のシャードを処理するコンシューマーを設計できます。したがって、コンシューマーのワーカーは、必 要なデータが欠落しているかもしれないと心配することなく、同じパーティションキーを持つすべてのレ コードを集計できます。
このアプリケーションでは、コンシューマーによるレコードの処理の負荷は高くないため、1 つのシャー ドを使用して、KCL スレッドと同じスレッドで処理することができます。ただし、実際には、まずシャー ドの数のスケールアップを検討します。レコードの処理が大変になることが予想される場合は、異なる スレッドに処理を切り替えたり、スレッドプールを使用したりする必要があるかもしれません。このよう に、その他のスレッドがレコードを並列処理していても、KCL は新しいレコードを迅速に取得できます。
一般的に、マルチスレッド設計は簡単ではなく高度な技術が必要になるため、シャードの数を増やすこと が最も効果的で簡単な拡張方法です。
次のステップ
ステップ 6: (オプション) コンシューマーを拡張する (p. 36)