Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.vertigo.user;
- import java.util.Collections;
- import java.util.Map;
- import java.util.concurrent.atomic.AtomicReference;
- import com.amazonaws.auth.AWSStaticCredentialsProvider;
- import com.amazonaws.auth.BasicAWSCredentials;
- import com.amazonaws.regions.Regions;
- import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
- import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
- import com.amazonaws.services.dynamodbv2.model.AttributeValue;
- import com.amazonaws.services.dynamodbv2.model.ScanRequest;
- import com.amazonaws.services.dynamodbv2.model.ScanResult;
- import io.reactivex.BackpressureStrategy;
- import io.reactivex.Flowable;
- import lombok.val;
- import org.apache.commons.lang3.tuple.Pair;
- public class DynaRun {
- public static void main(String[] args) {
- val awsCredentials = new BasicAWSCredentials(
- "access-key",
- "secret-key"
- );
- val client = AmazonDynamoDBClientBuilder
- .standard()
- .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
- .withRegion(Regions.US_WEST_2)
- .build();
- val disposable = Flowable.range(0, Integer.MAX_VALUE)
- .zipWith(flowableScan(client, "load-post", 1000), Pair::of)
- // .map(ScanResult::getItems)
- .subscribe(pair ->
- System.out.println(
- "page: " + pair.getLeft() +
- ", items: " + pair.getRight().getCount()
- )
- );
- disposable.dispose(); // not needed in this case, but in general we might want to do it
- // other example: the flow of batches of keys
- // Flowable<Set<String>> itemsFlow =
- // flowableScan(client, "staging-post", 1000)
- // .flatMapIterable(ScanResult::getItems)
- // .map(Map::keySet);
- }
- public static Flowable<ScanResult> flowableScan(
- AmazonDynamoDB client, String table, int batchSize
- ) {
- val ref = new AtomicReference<Map<String, AttributeValue>>();
- return Flowable.create(e -> {
- if (e.isCancelled()) {
- return;
- }
- try {
- do {
- ScanRequest scanRequest = new ScanRequest()
- .withTableName(table)
- .withAttributesToGet("postid", "body")
- .withLimit(batchSize)
- .withExclusiveStartKey(ref.get());
- val scanResult = client.scan(scanRequest);
- // NOTE: the last page returns getLastEvaluatedKey == null
- ref.set(scanResult.getLastEvaluatedKey());
- e.onNext(scanResult);
- } while (ref.get() != null && !ref.get().isEmpty());
- } catch (Exception ex) {
- e.onError(ex);
- }
- }, BackpressureStrategy.BUFFER);
- }
- }
- // pom file
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-core</artifactId>
- <version>1.11.421</version>
- </dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-dynamodb</artifactId>
- <version>1.11.421</version>
- </dependency>
- <dependency>
- <groupId>io.reactivex.rxjava2</groupId>
- <artifactId>rxjava</artifactId>
- <version>2.2.2</version>
- </dependency>
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement