Dynamodb-table-activity
DynamoDB-テーブルアクティビティ
DynamoDBストリームを使用すると、テーブルアイテムの変更を追跡して対応できます。 この機能を使用して、ソース間で情報を更新することで変更に対応するアプリケーションを作成します。 大規模なマルチユーザーシステムの何千人ものユーザーのデータを同期します。 これを使用して、更新に関する通知をユーザーに送信します。 そのアプリケーションは、多様で実質的です。 DynamoDBストリームは、この機能を実現するために使用されるメインツールとして機能します。
ストリームは、テーブル内のアイテムの変更を含む時間順シーケンスをキャプチャします。 このデータは最大24時間保持されます。 アプリケーションはそれらを使用して、ほぼリアルタイムで元のアイテムと変更されたアイテムを表示します。
テーブルで有効なストリームは、すべての変更をキャプチャします。 CRUD操作では、DynamoDBは変更されたアイテムのプライマリキー属性を持つストリームレコードを作成します。 前後の画像などの追加情報のストリームを構成できます。
ストリームには2つの保証があります-
- 各レコードはストリームに1回表示され、
- 各アイテムを変更すると、変更と同じ順序のストリームレコードが生成されます。
すべてのストリームはリアルタイムで処理されるため、アプリケーションの関連機能にそれらを使用できます。
ストリームの管理
テーブルの作成時に、ストリームを有効にできます。 既存のテーブルでは、ストリームを無効にしたり、設定を変更したりできます。 ストリームは非同期操作の機能を提供するため、テーブルのパフォーマンスに影響はありません。
シンプルなストリーム管理にAWSマネジメントコンソールを利用します。 まず、コンソールに移動し、 Tables を選択します。 [概要]タブで、[ストリームの管理]を選択します。 ウィンドウ内で、テーブルデータの変更時にストリームに追加される情報を選択します。 すべての設定を入力したら、*有効*を選択します。
既存のストリームを無効にする場合は、*ストリームの管理*を選択してから、*無効*を選択します。
API CreateTableおよびUpdateTableを使用して、ストリームを有効化または変更することもできます。 パラメーターStreamSpecificationを使用して、ストリームを構成します。 StreamEnabledはステータスを指定します。つまり、有効な場合はtrue、無効な場合はfalseを意味します。
StreamViewTypeは、ストリームに追加される情報、KEYS_ONLY、NEW_IMAGE、OLD_IMAGE、およびNEW_AND_OLD_IMAGESを指定します。
ストリームリーディング
エンドポイントに接続し、APIリクエストを行うことにより、ストリームを読み取り、処理します。 各ストリームはストリームレコードで構成され、各レコードはストリームを所有する単一の変更として存在します。 ストリームレコードには、発行順序を示すシーケンス番号が含まれます。 レコードは、シャードとも呼ばれるグループに属します。 シャードは、複数のレコードのコンテナーとして機能し、レコードへのアクセスとトラバースに必要な情報も保持します。 24時間後、レコードは自動的に削除されます。
これらのシャードは、必要に応じて生成および削除され、長続きしません。 また、通常は書き込みアクティビティの急増に応じて、複数の新しいシャードに自動的に分割されます。 ストリームを無効にすると、シャードを閉じます。 シャード間の階層関係により、アプリケーションは正しい処理順序のために親シャードに優先順位を付ける必要があります。 Kinesisアダプターを使用して、これを自動的に行うことができます。
_ 注-変更のない操作では、ストリームレコードは書き込まれません。
レコードにアクセスして処理するには、次のタスクを実行する必要があります-
- ターゲットストリームのARNを決定します。
- ターゲットレコードを保持するストリームのシャードを決定します。 *シャードにアクセスして、目的のレコードを取得します。
_* 注*-シャードを一度に読み取る最大2つのプロセスが必要です。 2プロセスを超える場合、ソースを調整できます。
利用可能なストリームAPIアクションは次のとおりです。
- ListStreams
- DescribeStream
- GetShardIterator
- GetRecords
あなたは、ストリームの読み取りの次の例を確認することができます-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;
public class StreamsExample {
private static AmazonDynamoDBClient dynamoDBClient =
new AmazonDynamoDBClient(new ProfileCredentialsProvider());
private static AmazonDynamoDBStreamsClient streamsClient =
new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());
public static void main(String args[]) {
dynamoDBClient.setEndpoint("InsertDbEndpointHere");
streamsClient.setEndpoint("InsertStreamEndpointHere");
//table creation
String tableName = "MyTestingTable";
ArrayList<AttributeDefinition> attributeDefinitions =
new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition()
.withAttributeName("ID")
.withAttributeType("N"));
ArrayList<KeySchemaElement> keySchema = new
ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement()
.withAttributeName("ID")
.withKeyType(KeyType.HASH)); //Partition key
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);
CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchema)
.withAttributeDefinitions(attributeDefinitions)
.withProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(1L)
.withWriteCapacityUnits(1L))
.withStreamSpecification(streamSpecification);
System.out.println("Executing CreateTable for " + tableName);
dynamoDBClient.createTable(createTableRequest);
System.out.println("Creating " + tableName);
try {
Tables.awaitTableToBecomeActive(dynamoDBClient, tableName);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Get the table's stream settings
DescribeTableResult describeTableResult =
dynamoDBClient.describeTable(tableName);
String myStreamArn = describeTableResult.getTable().getLatestStreamArn();
StreamSpecification myStreamSpec =
describeTableResult.getTable().getStreamSpecification();
System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled());
System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());
//Add an item
int numChanges = 0;
System.out.println("Making some changes to table data");
Map<String, AttributeValue> item = new HashMap<String, AttributeValue>();
item.put("ID", new AttributeValue().withN("222"));
item.put("Alert", new AttributeValue().withS("item!"));
dynamoDBClient.putItem(tableName, item);
numChanges++;
//Update the item
Map<String, AttributeValue> key = new HashMap<String, AttributeValue>();
key.put("ID", new AttributeValue().withN("222"));
Map<String, AttributeValueUpdate> attributeUpdates =
new HashMap<String, AttributeValueUpdate>();
attributeUpdates.put("Alert", new AttributeValueUpdate()
.withAction(AttributeAction.PUT)
.withValue(new AttributeValue().withS("modified item")));
dynamoDBClient.updateItem(tableName, key, attributeUpdates);
numChanges++;
//Delete the item
dynamoDBClient.deleteItem(tableName, key);
numChanges++;
//Get stream shards
DescribeStreamResult describeStreamResult =
streamsClient.describeStream(new DescribeStreamRequest()
.withStreamArn(myStreamArn));
String streamArn =
describeStreamResult.getStreamDescription().getStreamArn();
List<Shard> shards =
describeStreamResult.getStreamDescription().getShards();
//Process shards
for (Shard shard : shards) {
String shardId = shard.getShardId();
System.out.println("Processing " + shardId + " in "+ streamArn);
//Get shard iterator
GetShardIteratorRequest getShardIteratorRequest = new
GetShardIteratorRequest()
.withStreamArn(myStreamArn)
.withShardId(shardId)
.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
GetShardIteratorResult getShardIteratorResult =
streamsClient.getShardIterator(getShardIteratorRequest);
String nextItr = getShardIteratorResult.getShardIterator();
while (nextItr != null && numChanges > 0) {
//Read data records with iterator
GetRecordsResult getRecordsResult =
streamsClient.getRecords(new GetRecordsRequest().
withShardIterator(nextItr));
List<Record> records = getRecordsResult.getRecords();
System.out.println("Pulling records...");
for (Record record : records) {
System.out.println(record);
numChanges--;
}
nextItr = getRecordsResult.getNextShardIterator();
}
}
}
}