• 検索結果がありません。

終了する

ドキュメント内 Amazon Kinesis Data Streams - 開発者ガイド (ページ 42-175)

ステップ 7: 終了する

Kinesis data stream の使用には料金がかかるため、作業が終わったら、ストリームおよび対応する Amazon DynamoDB テーブルは必ず削除してください。レコードを送信したり取得したりしていなくて も、ストリームがアクティブなだけでわずかな料金が発生します。その理由として、アクティブなスト リームでは、受信レコードを継続的に "リッスン" し、レコードを取得するようにリクエストすることにリ ソースが使用されるためです。

ストリームおよびテーブルを削除するには

1. 実行しているプロデューサーおよびコンシューマーをすべてシャットダウンします。

2. https://console.aws.amazon.com/kinesis にある Kinesis コンソールを開きます。

3. このアプリケーション用に作成したストリームを選択します(StockTradeStream)。

4. [ストリームの削除] を選択します。

5. https://console.aws.amazon.com/dynamodb/ にある DynamoDB コンソールを開きます。

6. StockTradesProcessor テーブルを削除します。

概要

ほぼリアルタイムで大量のデータを処理するために、魔法のコードを記述したり、大規模なインフラス トラクチャを開発したりする必要はありません。Kinesis Data Streams を使用すれば、少量のデータを 処理するロジックを記述する (processRecord(Record) を記述するなど) 場合と同じように簡単にス ケールして、大量のストリーミングデータに対応できます。処理のスケール方法を心配する必要はあり ません。Kinesis Data Streams が代わりに処理してくれます。することと言えば、ストリームレコードを Kinesis Data Streams に送信し、受信した新しい各レコードを処理するロジックを記述するだけです。

このアプリケーションについて考えられる拡張機能は、次のとおりです。

すべてのシャードで集計する

現在は、単一のワーカーが単一のシャードから受け取ったデータレコードの集約に基づく統計が取得 されます (複数のワーカーが同時に単一のアプリケーションからシャードを処理することはできませ ん)。 拡張するときに複数のシャードがある場合、すべてのシャードで集計しようと考えるかもしれ ません。そのためには、パイプラインアーキテクチャを用意します。パイプラインアーキテクチャで は、各ワーカーの出力が単一のシャードを持つ別のストリームに供給され、第 1 段階の出力を集計す るワーカーによってそのストリームが処理されます。第 1 段階のデータが制限 (シャードおよび 1 分 間あたり 1 つのサンプル) されるため、シャードごとに処理しやすくなります。

処理の拡張

多数のシャードが含まれるようにストリームを拡張する場合 (多数のプロデューサーがデータを送信 している場合)、処理を拡張するには、より多くのワーカーを追加します。複数のワーカーは Amazon EC2 インスタンスで実行し、Auto Scaling グループを使用できます。

Amazon S3/DynamoDB/Amazon Redshift/Storm へのコネクタを使用する

ストリームは継続的に処理されるため、出力を他の保存先に送信することができます。AWS に は、Kinesis Data Streams を他の AWS のサービスおよびサードパーティー製ツールと統合するため のコネクタが用意されています。

次のステップ

• Kinesis Data Streams API オペレーションの使用方法については、「Amazon Kinesis Data Streams API と AWS SDK for Java を使用したプロデューサーの開発 (p. 100)」、「AWS SDK for Java での Kinesis Data Streams API を使用したコンシューマーの開発 (p. 137)」、および「ストリームの作成と 管理 (p. 39)」を参照してください。

ステップ 7: 終了する

• Kinesis Client Library の詳細については、「Kinesis Client Library 1.x を使用したコンシューマーの開 発 (p. 118)」を参照してください。

• アプリケーションを最適化する方法については、「高度なトピック (p. 160)」を参照してください。

ストリームの作成

ストリームの作成と管理

以下の例では、Amazon Kinesis Data Streams API について説明し、AWS SDK for Java を使用して Kinesis data stream を作成、削除、および操作する方法を示します。

この章で紹介する Java サンプルコードは、基本的な Kinesis Data Streams API オペレーションを実行す る方法を示しており、オペレーションタイプ別に論理的に分割されています。これらのサンプルは、すべ ての例外を確認しているわけではなく、すべてのセキュリティやパフォーマンスの側面を考慮しているわ けでもない点で、本稼働環境に使用できるコードを表すものではありません。また、他のプログラミング 言語を使用して Kinesis Data Streams API を呼び出すこともできます。すべての利用可能な AWS SDK の 詳細については、「アマゾン ウェブ サービスを使用した開発の開始」を参照してください。

トピック

• ストリームの作成 (p. 39)

• ストリームのリスト (p. 41)

• シャードの一覧表示 (p. 42)

• ストリームからシャードを取得する (p. 43)

• ストリームを削除する (p. 43)

• ストリームをリシャーディングする (p. 44)

• データ保持期間の変更 (p. 49)

• Amazon Kinesis Data Streams でのストリームのタグ付け (p. 49)

• Amazon Kinesis Data Streams のストリームのモニタリング (p. 52)

• Amazon Kinesis Data Streams による IAM リソースに対するアクセスの制御 (p. 78)

• サーバー側の暗号化の使用 (p. 82)

• Amazon Kinesis Data Streams とインターフェイス VPC エンドポイントの使用 (p. 87)

• コンソールを使用した Kinesis データストリームの管理 (p. 88)

ストリームの作成

次の手順に従って Kinesis data stream を作成します 。

Kinesis Data Streams クライアントの構築

Kinesis data stream を使用する前に、クライアントオブジェクトを構築する必要があります。次の Java コードは、クライアントビルダーをインスタンス化し、それを使用してリージョン、認証情報、およびク ライアント設定を指定します。次に、クライアントオブジェクトを構築します。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();

clientBuilder.setRegion(regionName);

clientBuilder.setCredentials(credentialsProvider);

clientBuilder.setClientConfiguration(config);

AmazonKinesis client = clientBuilder.build();

ストリームを作成する

詳細については、AWS General Reference の「Kinesis Data Streams のリージョンとエンドポイント」を 参照してください。

ストリームを作成する

Kinesis Data Streams クライアントを作成したら、使用するストリームを作成できます。この作業 は、Kinesis Data Streams コンソールまたはプログラムから実行できます。プログラムでストリームを作 成するには、CreateStreamRequest オブジェクトをインスタンス化し、ストリームの名前とストリー ムが使用するシャードの数を指定します。

CreateStreamRequest createStreamRequest = new CreateStreamRequest();

createStreamRequest.setStreamName( myStreamName );

createStreamRequest.setShardCount( myStreamSize );

ストリーム名はストリームを識別するために使用されます。この名前のスコープは、アプリケーションが 使用する AWS アカウントに限定されます。また、リージョンにも限定されます。つまり、2 つの異なる AWS アカウント内の 2 つのストリームを同じ名前にすることができ、同じ AWS アカウントで 2 つの異 なるリージョン内の 2 つのストリームを同じ名前にすることができますが、同じアカウントで、同じリー ジョン内の 2 つのストリームを同じ名前にすることはできません。

ストリームのスループットはシャードの数によって決まります。プロビジョンドスループットを高くする ほど、必要になるシャードの数は増えます。シャードが増えると、ストリームに対して請求される AWS のコストも増えます。アプリケーションに適切なシャードの数の計算の詳細については、「Kinesis Data Stream の初期サイズを決定する (p. 5)」を参照してください。

createStreamRequest オブジェクトを設定した後、クライアントの createStream メソッドを呼 び出すことで、ストリームを作成します。createStream の呼び出し後、ストリームに対してさらに オペレーションを実行するには、ストリームが ACTIVE 状態になるまで待機します。ストリームの状 態を確認するには、describeStream メソッドを呼び出します。ただし、ストリームが存在しない場 合、describeStream は例外をスローします。そのために、describeStream の呼び出しは try/

catch ブロックで囲みます。

client.createStream( createStreamRequest );

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();

describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();

long endTime = startTime + ( 10 * 60 * 1000 );

while ( System.currentTimeMillis() < endTime ) { try {

Thread.sleep(20 * 1000);

} catch ( Exception e ) {}

try {

DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );

String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();

if ( streamStatus.equals( "ACTIVE" ) ) { break;

} //

// sleep for one second //

try {

Thread.sleep( 1000 );

}

catch ( Exception e ) {}

} catch ( ResourceNotFoundException e ) {}

ストリームのリスト

}

if ( System.currentTimeMillis() >= endTime ) {

throw new RuntimeException( "Stream " + myStreamName + " never went active" );

}

ストリームのリスト

前のセクションで説明したように、ストリームのスコープは、Kinesis Data Streams クライアントのイン スタンス化に使用される AWS の認証情報に関連付けられた AWS アカウントに限定されます。また、こ のクライアントに指定されたリージョンにも限定されます。AWS アカウントを使用して多数のストリーム を 1 度にアクティブにできます。ストリームは、Kinesis Data Streams コンソールでリストするか、プロ グラムによってリストすることができます。このセクションのコードでは、AWS アカウントのすべてのス トリームをリスト表示する方法を示します。

ListStreamsRequest listStreamsRequest = new ListStreamsRequest();

listStreamsRequest.setLimit(20);

ListStreamsResult listStreamsResult = client.listStreams(listStreamsRequest);

List<String> streamNames = listStreamsResult.getStreamNames();

このコード例では、最初に ListStreamsRequest の新しいインスタンスを作成し、その setLimit メ ソッドを呼び出して、最大 20 個のストリームが listStreams の呼び出しごとに返されるように指定し ています。setLimit の値を指定しない場合は、アカウント内のストリーム数以下のストリームが Kinesis Data Streams によって返されます。次に、コードはクライアントの listStreamsRequest メソッドに listStreams を渡します。listStreams の戻り値は ListStreamsResult オブジェクトに格納され ます。コードはこのオブジェクトの getStreamNames メソッドを呼び出して、返されたストリームの 名前を streamNames リストに格納します。アカウントとリージョンにこの制限で指定したよりも多く のストリームがある場合でも、Kinesis Data Streams によって返されるストリームの数が指定した制限 に満たないことがあります。確実にすべてのストリームを取得するには、次のコード例で説明している getHasMoreStreams メソッドを使用します。

while (listStreamsResult.getHasMoreStreams()) { if (streamNames.size() > 0) {

listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() 1));

}

listStreamsResult = client.listStreams(listStreamsRequest);

streamNames.addAll(listStreamsResult.getStreamNames());

}

このコードは、getHasMoreStreams の listStreamsRequest メソッドを呼び出して、listStreams の最初の呼び出しで返されたストリームの数よりも多いストリームがあるかどうかを確認します。ある 場合、コードは setExclusiveStartStreamName メソッドを呼び出して、listStreams の前の呼び 出しで返された最後のストリームの名前を指定します。setExclusiveStartStreamName メソッドは listStreams の次の呼び出しをそのストリームの後から開始します。その呼び出しによって返されたス トリーム名のグループが streamNames リストに追加されます。すべてのストリームの名前がリストに収 集されるまで、この処理を続行します。

listStreams で返されるストリームは以下のいずれかの状態になります。

• CREATING

• ACTIVE

• UPDATING

• DELETING

ドキュメント内 Amazon Kinesis Data Streams - 開発者ガイド (ページ 42-175)

関連したドキュメント