From a1c61c3b2bd541efc407d1abd1b5182f9554c379 Mon Sep 17 00:00:00 2001 From: Eric Evans Date: Thu, 24 Oct 2013 15:33:13 -0500 Subject: [PATCH] JCLOUDS-356 multipart uploads with InputStream payloads This changeset introduces an alternative to PayloadSlicer, IterablePayloadSlicer, with a method for returning a Payload iterator. ...swift.blobstore.strategy.internal.SequentialMultipartUploadStrategy has been updated to to use a payload iterator. --- .../SequentialMultipartUploadStrategy.java | 21 +--- .../integration/SwiftBlobIntegrationLiveTest.java | 38 +++++- .../main/java/org/jclouds/io/PayloadSlicer.java | 12 ++ .../org/jclouds/io/internal/BasePayloadSlicer.java | 137 ++++++++++++++++++++- .../jclouds/io/internal/BasePayloadSlicerTest.java | 70 +++++++++++ 5 files changed, 258 insertions(+), 20 deletions(-) create mode 100644 core/src/test/java/org/jclouds/io/internal/BasePayloadSlicerTest.java diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index e050912..1874ccd 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -66,22 +66,13 @@ public String execute(String container, Blob blob) { long chunkSize = algorithm.calculateChunkSize(length); int partCount = algorithm.getParts(); if (partCount > 0) { - int part; - while ((part = algorithm.getNextPart()) <= partCount) { - Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), chunkSize); + for (Payload part : slicer.slice(payload, chunkSize)) { + int partNum = algorithm.getNextPart(); Blob blobPart = blobBuilders.get() - .name(key + PART_SEPARATOR + part) - .payload(chunkedPart) - .contentDisposition(key + PART_SEPARATOR + part).build(); - client.putObject(container, blob2Object.apply(blobPart)); - } - long remaining = algorithm.getRemaining(); - if (remaining > 0) { - Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), remaining); - Blob blobPart = blobBuilders.get() - .name(key + PART_SEPARATOR + part) - .payload(chunkedPart) - .contentDisposition(key + PART_SEPARATOR + part).build(); + .name(key + PART_SEPARATOR + partNum) + .payload(part) + .contentDisposition(key + PART_SEPARATOR + partNum) + .build(); client.putObject(container, blob2Object.apply(blobPart)); } return client.putObjectManifest(container, key); diff --git a/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/integration/SwiftBlobIntegrationLiveTest.java b/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/integration/SwiftBlobIntegrationLiveTest.java index 6331988..6277dc4 100644 --- a/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/integration/SwiftBlobIntegrationLiveTest.java +++ b/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/integration/SwiftBlobIntegrationLiveTest.java @@ -16,12 +16,17 @@ */ package org.jclouds.openstack.swift.blobstore.integration; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; -import com.google.common.io.ByteStreams; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest; @@ -34,12 +39,11 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.google.common.hash.Hashing; +import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.common.io.InputSupplier; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; - /** * * @author James Murty @@ -128,6 +132,32 @@ public void testMultipartChunkedFileStream() throws IOException, InterruptedExce } } + // InputStreamPayloads are handled differently than File; Test InputStreams too + @Test(groups = { "integration", "live" }) + public void testMultipartChunkedInputStream() throws InterruptedException, IOException { + String container = getContainerName(); + try { + BlobStore blobStore = view.getBlobStore(); + + blobStore.createContainerInLocation(null, container); + + File inFile = createFileBiggerThan(PART_SIZE); + File outFile = new File("target/lots-of-const-readback.txt"); + + InputStream contentToUpload = new FileInputStream(inFile); + Blob write = blobStore.blobBuilder("const.txt").payload(contentToUpload).contentLength(inFile.length()).build(); + blobStore.putBlob(container, write, PutOptions.Builder.multipart()); + + Blob read = blobStore.getBlob(container, "const.txt"); + read.getPayload().writeTo(new FileOutputStream(outFile)); + + assertEquals(Files.hash(outFile, Hashing.md5()), Files.hash(inFile, Hashing.md5())); + + } finally { + returnContainer(container); + } + } + @Override protected int getIncorrectContentMD5StatusCode() { return 422; diff --git a/core/src/main/java/org/jclouds/io/PayloadSlicer.java b/core/src/main/java/org/jclouds/io/PayloadSlicer.java index a6d807e..753a407 100644 --- a/core/src/main/java/org/jclouds/io/PayloadSlicer.java +++ b/core/src/main/java/org/jclouds/io/PayloadSlicer.java @@ -41,4 +41,16 @@ * if offset or length are negative */ Payload slice(Payload input, long offset, long length); + + /** + * Returns an {@link Iterable} of {@link Payload} instances that are no larger than + * size bytes in length. + * + * @param input + * the {@link Payload} to be sliced + * @param size + * the maximum size of each slice + * @return an {@link Iterable} of {@link Payload} instances + */ + Iterable slice(Payload input, long size); } diff --git a/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java b/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java index e4d3be9..91df891 100644 --- a/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java +++ b/core/src/main/java/org/jclouds/io/internal/BasePayloadSlicer.java @@ -19,15 +19,24 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.ByteArrayInputStream; import java.io.File; -import java.io.InputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; import javax.inject.Singleton; +import org.jclouds.io.ContentMetadata; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.io.payloads.BaseMutableContentMetadata; +import org.jclouds.io.payloads.ByteArrayPayload; import org.jclouds.io.payloads.InputStreamPayload; import org.jclouds.io.payloads.InputStreamSupplierPayload; @@ -42,6 +51,79 @@ */ @Singleton public class BasePayloadSlicer implements PayloadSlicer { + + public static class PayloadIterator implements Iterable, Iterator { + + private final InputStream input; + private final ContentMetadata metaData; + private Payload nextPayload; + private final int readLen; + + public PayloadIterator(InputStream input, ContentMetadata meta) { + this.input = checkNotNull(input, "input"); + this.metaData = checkNotNull(meta, "meta"); + this.readLen = checkNotNull(this.metaData.getContentLength(), "content-length").intValue(); + + this.nextPayload = getNextPayload(); + } + + @Override + public boolean hasNext() { + return (nextPayload != null); + } + + @Override + public Payload next() { + Payload payload; + + if (!hasNext()) + throw new NoSuchElementException(); + + payload = nextPayload; + nextPayload = getNextPayload(); + + return payload; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return this; + } + + private Payload getNextPayload() { + byte[] content = new byte[readLen]; + int read = 0; + + try { + if ((read = input.read(content)) == -1) { + return null; + } + } catch (IOException e) { + throw Throwables.propagate(e); + } + + return createPayload((content.length == read) ? content : Arrays.copyOf(content, read)); + } + + private Payload createPayload(byte[] content) { + Payload payload = null; + + if (content.length > 0) { + payload = new ByteArrayPayload(content); + ContentMetadata cm = metaData.toBuilder().contentLength((long)content.length).contentMD5(null).build(); + payload.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(cm)); + } + + return payload; + } + + } + /** * {@inheritDoc} */ @@ -105,4 +187,57 @@ protected Payload copyMetadataAndSetLength(Payload input, Payload returnVal, lon return returnVal; } + @Override + public Iterable slice(Payload input, long size) { + checkNotNull(input, "input"); + checkArgument(size >= 0, "size must be non-negative but was: %s", size); + + ContentMetadata meta = BaseMutableContentMetadata.fromContentMetadata(input.getContentMetadata()) + .toBuilder() + .contentLength(size) + .contentMD5(null) + .build(); + Object rawContent = input.getRawContent(); + if (rawContent instanceof File) { + return doSlice((File) rawContent, meta); + } else if (rawContent instanceof String) { + return doSlice((String) rawContent, meta); + } else if (rawContent instanceof byte[]) { + return doSlice((byte[]) rawContent, meta); + } else if (rawContent instanceof InputStream) { + return doSlice((InputStream) rawContent, meta); + } else { + return doSlice(input, meta); + } + + } + + protected Iterable doSlice(Payload input, ContentMetadata meta) { + return doSlice(input.getInput(), meta); + } + + protected Iterable doSlice(String rawContent, ContentMetadata meta) { + try { + return doSlice(rawContent.getBytes("UTF-8"), meta); + } catch (UnsupportedEncodingException e) { + throw Throwables.propagate(e); + } + } + + protected Iterable doSlice(byte[] rawContent, ContentMetadata meta) { + return doSlice(new ByteArrayInputStream(rawContent), meta); + } + + protected Iterable doSlice(File rawContent, ContentMetadata meta) { + try { + return doSlice(new FileInputStream(rawContent), meta); + } catch (FileNotFoundException e) { + throw Throwables.propagate(e); + } + } + + protected Iterable doSlice(InputStream rawContent, ContentMetadata meta) { + return new PayloadIterator(rawContent, meta); + } + } diff --git a/core/src/test/java/org/jclouds/io/internal/BasePayloadSlicerTest.java b/core/src/test/java/org/jclouds/io/internal/BasePayloadSlicerTest.java new file mode 100644 index 0000000..b2d27b0 --- /dev/null +++ b/core/src/test/java/org/jclouds/io/internal/BasePayloadSlicerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.io.internal; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Iterator; + +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.io.payloads.InputStreamPayload; +import org.jclouds.util.Strings2; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.google.common.io.ByteStreams; + +@Test +public class BasePayloadSlicerTest { + + @Test + public void testIterableSliceExpectedSingle() throws IOException { + PayloadSlicer slicer = new BasePayloadSlicer(); + String contents = "aaaaaaaaaabbbbbbbbbbccccc"; + Payload payload = new InputStreamPayload(new ByteArrayInputStream(contents.getBytes(Charsets.US_ASCII))); + + Iterator iter = slicer.slice(payload, 25).iterator(); + + assertTrue(iter.hasNext(), "Not enough results"); + assertEquals(new String(ByteStreams.toByteArray(iter.next())), contents); + assertFalse(iter.hasNext()); + + } + + @Test + public void testIterableSliceExpectedMulti() throws IOException { + PayloadSlicer slicer = new BasePayloadSlicer(); + Payload payload = new InputStreamPayload(new ByteArrayInputStream("aaaaaaaaaabbbbbbbbbbccccc".getBytes(Charsets.US_ASCII))); + + Iterator iter = slicer.slice(payload, 10).iterator(); + + assertTrue(iter.hasNext(), "Not enough results"); + assertEquals(Strings2.toStringAndClose(iter.next().getInput()), "aaaaaaaaaa"); + assertTrue(iter.hasNext(), "Not enough results"); + assertEquals(Strings2.toStringAndClose(iter.next().getInput()), "bbbbbbbbbb"); + assertTrue(iter.hasNext(), "Not enough results"); + assertEquals(Strings2.toStringAndClose(iter.next().getInput()), "ccccc"); + assertFalse(iter.hasNext()); + + } + +} -- 1.8.5.1