• 検索結果がありません。

プロデューサーを実装する

ドキュメント内 Amazon Kinesis Data Streams - 開発者ガイド (ページ 35-38)

• Apache Commons Lang

• Apache Commons Logging

• Guava (Java 用の Google コアライブラリ)

• Jackson Annotations

• Jackson Core

• Jackson Databind

• Jackson Dataformat: CBOR

• Joda Time

4. IDE によっては、プロジェクトが自動的にビルドされる場合があります。自動的にビルドされない場 合は、IDE に適切なステップを使用してプロジェクトをビルドします。

上記のステップが正常に完了したら、次のセクション (the section called “ステップ 4: プロデューサーを実 装する” (p. 30)) に進みます。ビルドのいずれかの段階でエラーが発生した場合は、先に進む前に、原因 を調査の上、解決してください。

次のステップ

(p. 30)

ステップ 4: プロデューサーを実装する

チュートリアル: Kinesis Data Streams を使用した株式データのリアルタイム分析 (p. 24) のアプリケー ションでは、株式市場取引をモニタリングする実際のシナリオが使用されます。次の原理によって、この シナリオをプロデューサーおよびサポートコード構造にマッピングすることができます。

ソースコードを参照し、次の情報を確認してください。

StockTrade クラス

株式取引は、StockTrade クラスのインスタンスによって個別に表されます。このインスタンスに は、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID な どの属性が含まれます。このクラスは、既に実装されています。

ストリームレコード

ストリームとは、一連のレコードのことです。レコードとは、JSON 形式による連続する StockTrade インスタンスの 1 つを表しています。(例:

{

"tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }

StockTradeGenerator クラス

StockTradeGenerator には、呼び出されるたびにランダムに生成された新しい株式取引を返 す、getRandomTrade() と呼ばれるメソッドが含まれています。このクラスは、既に実装されてい ます。

StockTradesWriter クラス

プロデューサーの main メソッドである StockTradesWriter は、継続的にランダム取引を取得 し、以下のタスクを実行してそれらを Kinesis Data Streams に送信します。

ステップ 4: プロデューサーを実装する

1. ストリーム名とリージョン名を入力として読み取ります。

2. AmazonKinesisClientBuilder を作成します。

3. クライアントビルダーを使用してリージョン、認証情報、およびクライアント構成を設定します。

4. クライアントビルダーを使用して AmazonKinesis クライアントを構成します。

5. ストリームが存在し、アクティブであることを確認します (そうでない場合は、エラーで終了しま す)。

6. 連続ループで、StockTradeGenerator.getRandomTrade() メソッドに続き

sendStockTrade メソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。

sendStockTrade クラスの StockTradesWriter メソッドには次のコードがあります。

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {

byte[] bytes = trade.toJsonAsBytes();

// The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.

if (bytes == null) {

LOG.warn("Could not get JSON bytes for stock trade");

return;

}

LOG.info("Putting trade: " + trade.toString());

PutRecordRequest putRecord = new PutRecordRequest();

putRecord.setStreamName(streamName);

// We use the ticker symbol as the partition key, explained in the Supplemental Information section below.

putRecord.setPartitionKey(trade.getTickerSymbol());

putRecord.setData(ByteBuffer.wrap(bytes));

try {

kinesisClient.putRecord(putRecord);

} catch (AmazonClientException ex) {

LOG.warn("Error sending record to Amazon Kinesis.", ex);

} }

次のコードの詳細を参照してください。

• PutRecord API はバイト配列を想定するため、trade を JSON 形式に変換する必要があります。

この操作は、次の 1 行のコードによって行われます。

byte[] bytes = trade.toJsonAsBytes();

• 取引を送信する前に、新しい PutRecordRequest インスタンス (この場合、putRecord と呼ばれ る) を作成する必要があります。

PutRecordRequest putRecord = new PutRecordRequest();

各 PutRecord の呼び出しには、ストリーム名、パーティションキー、およびデータ BLOB が必要 です。次のコードによって、putRecord メソッドを使用して、これらのフィールドを setXxxx() オブジェクトに追加します。

putRecord.setStreamName(streamName);

putRecord.setPartitionKey(trade.getTickerSymbol());

putRecord.setData(ByteBuffer.wrap(bytes));

この例では、株式チケットをパーティションキーとして使用することで、レコードを特定のシャー ドにマッピングしています。実際には、レコードがストリーム全体に均等に分散するように、

ステップ 4: プロデューサーを実装する

シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。スト リームにデータを追加する方法の詳細については、「ストリームへのデータの追加 (p. 101)」を参 照してください。

次に、putRecord をクライアントに送信 (put オペレーション) することができます。

kinesisClient.putRecord(putRecord);

• エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状 態を記録します。

if (bytes == null) {

LOG.warn("Could not get JSON bytes for stock trade");

return;

}

put オペレーションの前後に try/catch ブロックを追加します。

try {

kinesisClient.putRecord(putRecord);

} catch (AmazonClientException ex) {

LOG.warn("Error sending record to Amazon Kinesis.", ex);

}

これは、ネットワークエラーや、ストリームがスループット限界を超えて抑制されたために Kinesis Data Streams put オペレーションが失敗することがあるためです。データが失われることがないよ うに、単純な再試行として使用するなど、put オペレーションの再試行ポリシーを慎重に検討する ことをお勧めします。

• ステータスのログ記録は有益ですが、オプションです。

LOG.info("Putting trade: " + trade.toString());

ここに示されているプロデューサーでは、Kinesis Data Streams API のシングルレコード機能 (PutRecord) が使用されています。実際には、個々のプロデューサーで大量のレコードが生成される 場合があります。その場合、PutRecords のマルチレコード機能を使用して、レコードのバッチを一 度に送信する方が効率的です。詳細については、「ストリームへのデータの追加 (p. 101)」を参照し てください。

プロデューサーを実行するには

1. 前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアが ファイル ~/.aws/credentials に保存されていることを確認します。

2. 次の引数を指定して StockTradeWriter クラスを実行します。

StockTradeStream us-west-2

us-west-2 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指 定する必要があります。

次のような出力が表示されます。

Feb 16, 2015 3:53:00 PM

com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18

ドキュメント内 Amazon Kinesis Data Streams - 開発者ガイド (ページ 35-38)

関連したドキュメント