diff --git a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java index 4ac74086a3..e339437550 100644 --- a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java +++ b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java @@ -16,11 +16,6 @@ */ package org.apache.jackrabbit.oak.segment.aws; -import static org.apache.jackrabbit.oak.segment.aws.DynamoDBClient.TABLE_ATTR_CONTENT; - -import java.io.IOException; -import java.util.Iterator; - import com.amazonaws.services.dynamodbv2.document.Item; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; @@ -29,6 +24,12 @@ import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import static org.apache.jackrabbit.oak.segment.aws.DynamoDBClient.TABLE_ATTR_CONTENT; + public class AwsJournalFile implements JournalFile { private static final Logger log = LoggerFactory.getLogger(AwsJournalFile.class); @@ -89,6 +90,11 @@ public class AwsJournalFile implements JournalFile { public void writeLine(String line) throws IOException { dynamoDBClient.putDocument(fileName, line); } + + @Override + public void batchWriteLines(List lines) throws IOException { + dynamoDBClient.batchPutDocument(fileName, lines); + } } private static class AwsFileReader implements JournalFileReader { diff --git a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java index b0488d5435..d37efc24b2 100644 --- a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java +++ b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java @@ -16,18 +16,12 @@ */ package org.apache.jackrabbit.oak.segment.aws; -import java.io.IOException; -import java.util.Date; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; @@ -46,8 +40,18 @@ import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; import com.amazonaws.services.dynamodbv2.model.UpdateTableRequest; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; import com.amazonaws.services.dynamodbv2.util.TableUtils; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + public final class DynamoDBClient { private static final String TABLE_ATTR_TIMESTAMP = "timestamp"; @@ -189,19 +193,62 @@ public final class DynamoDBClient { } } + public void batchPutDocument(String fileName, List lines) { + List items = lines.stream() + .map(content -> toItem(fileName, content)) + .collect(Collectors.toList()); + batchPutDocumentItems(fileName, items); + } + + public void batchPutDocumentItems(String fileName, List items) { + items.forEach(item -> item.withString(TABLE_ATTR_FILENAME, fileName)); + AtomicInteger counter = new AtomicInteger(); + items.stream() + .collect(Collectors.groupingBy(x -> counter.getAndIncrement() / TABLE_MAX_BATCH_WRITE_SIZE)) + .values() + .forEach(chunk -> putDocumentsChunked(chunk)); + } + + /** + * There is a limition on the request size, see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html + * Therefore the number of items needs to be provided as chunks by the caller. + * + * @param items chunk of items + */ + private void putDocumentsChunked(List items) { + // See explanation at https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/batch-operation-document-api-java.html + DynamoDB dynamoDB = new DynamoDB(ddb); + TableWriteItems table = new TableWriteItems(journalTableName); + BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(table.withItemsToPut(items)); + do { + // Check for unprocessed keys which could happen if you exceed + // provisioned throughput + Map> unprocessedItems = outcome.getUnprocessedItems(); + if (outcome.getUnprocessedItems().size() > 0) { + outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems); + } + } while (outcome.getUnprocessedItems().size() > 0); + } + public void putDocument(String fileName, String line) throws IOException { - Item item = new Item().with(TABLE_ATTR_TIMESTAMP, new Date().getTime()).with(TABLE_ATTR_FILENAME, fileName) - .with(TABLE_ATTR_CONTENT, line); + Item item = toItem(fileName, line); try { - try { - // TO DO: why is this needed here - Thread.sleep(1L); - } catch (InterruptedException e) { - } journalTable.putItem(item); } catch (AmazonServiceException e) { throw new IOException(e); } } + public Item toItem(String fileName, String line) { + // making sure that timestamps are unique by sleeping 1ms + try { + Thread.sleep(1L); + } catch (InterruptedException e) { + } + return new Item() + .with(TABLE_ATTR_TIMESTAMP, new Date().getTime()) + .with(TABLE_ATTR_FILENAME, fileName) + .with(TABLE_ATTR_CONTENT, line); + } + } diff --git a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java index 8e7737adee..31525e4f14 100644 --- a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java +++ b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java @@ -16,16 +16,6 @@ */ package org.apache.jackrabbit.oak.segment.aws; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.CopyObjectRequest; @@ -44,6 +34,14 @@ import org.apache.jackrabbit.oak.commons.Buffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + public final class S3Directory { private static final Logger log = LoggerFactory.getLogger(AwsContext.class); @@ -167,7 +165,7 @@ public final class S3Directory { public void copyObject(S3Directory from, String fromKey) throws IOException { String toKey = rootDirectory + fromKey.substring(from.rootDirectory.length()); try { - s3.copyObject(new CopyObjectRequest(bucketName, fromKey, bucketName, toKey)); + s3.copyObject(new CopyObjectRequest(from.bucketName, fromKey, bucketName, toKey)); } catch (AmazonServiceException e) { throw new IOException(e); } diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java index a6f09dd118..00cb41376a 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java @@ -206,6 +206,13 @@ public class AzureJournalFile implements JournalFile { } } + @Override + public void batchWriteLines(List lines) throws IOException { + for (String line : lines) { + this.writeLine(line); + } + } + private void createNextFile(int suffix) throws IOException { try { currentBlob = directory.getAppendBlobReference(getJournalFileName(suffix + 1)); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java index 122e69ca1c..96567d24b9 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java @@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.util.List; import static java.nio.charset.Charset.defaultCharset; @@ -100,6 +101,13 @@ public class LocalJournalFile implements JournalFile { journalFile.getChannel().force(false); } + @Override + public void batchWriteLines(List lines) throws IOException { + for (String line : lines) { + this.writeLine(line); + } + } + @Override public void close() throws IOException { journalFile.close(); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java index 6a3a605113..1d6053f375 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java @@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.segment.spi.persistence; import java.io.Closeable; import java.io.IOException; +import java.util.List; /** * The {@link JournalFile} writer. It allows to append a record to the journal file @@ -59,4 +60,20 @@ public interface JournalFileWriter extends Closeable { */ void writeLine(String line) throws IOException; + /** + * Write new lines to the journal file as a batch. Methods allows for optimized + * batch implementations. This operation should be atomic, + * eg. it's should be possible to open a new reader using + * {@link JournalFile#openJournalReader()} in the way that it'll have access + * to an incomplete record line. + *

+ * If this method returns successfully it means that the line was persisted + * on the non-volatile storage. For instance, on the local disk the + * {@code flush()} should be called by the implementation. + * + * @param lines List of journal records to be written + * @throws IOException + */ + void batchWriteLines(List lines) throws IOException; + }