Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3Backend.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3Backend.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3Backend.java (revision ) @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob.datastore; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsResult; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.transfer.Upload; +import org.apache.jackrabbit.aws.ext.ds.S3Backend; +import org.apache.jackrabbit.core.data.CachingDataStore; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Extension to {@link org.apache.jackrabbit.aws.ext.ds.S3Backend} to support + * {@link org.apache.jackrabbit.oak.plugins.blob.SharedDataStore} + */ +public class SharedS3Backend extends S3Backend { + private static final Logger LOG = LoggerFactory.getLogger(SharedS3Backend.class); + + public static final String META_BUCKET = "oakmetadatabucket"; + + @Override + public void init(CachingDataStore store, String homeDir, String config) + throws DataStoreException { + super.init(store, homeDir, config); + } + + @Override + public void init(CachingDataStore store, String homeDir, Properties prop) + throws DataStoreException { + super.init(store, homeDir, prop); + initMetadataBucket(); + } + + /** + * Create a metadata specific bucket + */ + private void initMetadataBucket() { + if (!s3service.doesBucketExist(META_BUCKET)) { + s3service.createBucket(META_BUCKET, s3Region); + LOG.info("Created metadata bucket [{}] in [{}] ", META_BUCKET, s3Region); + } + } + + /** + * Adds a metadata record with the specified name + * + * @param input the record input stream + * @param name the name + * @throws DataStoreException + */ + public void addMetadataRecord(final InputStream input, final String name) throws DataStoreException { + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + Upload upload = tmx.upload(new PutObjectRequest(META_BUCKET, name, input, new ObjectMetadata())); + upload.waitForUploadResult(); + } catch (InterruptedException e) { + LOG.error("Error in uploading", e); + throw new DataStoreException("Error in uploading", e); + } finally { + if (contextClassLoader != null) { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + } + + /** + * Gets the metadata of the specified name. + * + * @param name the name of the record + * @return + */ + public DataRecord getMetadataRecord(String name) { + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader( + getClass().getClassLoader()); + ObjectMetadata meta = s3service.getObjectMetadata(META_BUCKET, name); + return new S3DataRecord(s3service, name, + meta.getLastModified().getTime(), meta.getContentLength()); + } finally { + if (contextClassLoader != null) { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + } + + /** + * Gets all the metadata with a specified prefix. + * + * @param prefix the prefix of the records to retrieve + * @return + */ + public List getAllMetadataRecords(String prefix) { + List metadataList = new ArrayList(); + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader( + getClass().getClassLoader()); + ObjectListing prevObjectListing = s3service.listObjects(META_BUCKET); + for (final S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) { + if (s3ObjSumm.getKey().startsWith(prefix)) { + metadataList.add(new S3DataRecord(s3service, s3ObjSumm.getKey(), + s3ObjSumm.getLastModified().getTime(), s3ObjSumm.getSize())); + } + } + } finally { + if (contextClassLoader != null) { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + return metadataList; + } + + /** + * Deletes the metadata record with the specified name + * + * @param name the name of the record + * @return + */ + public boolean deleteMetadataRecord(String name) { + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader( + getClass().getClassLoader()); + s3service.deleteObject(META_BUCKET, name); + } finally { + if (contextClassLoader != null) { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + return true; + } + + /** + * Deletes all the metadata records with the specified prefix. + * + * @param prefix the prefix of the record + */ + public void deleteAllMetadataRecords(String prefix) { + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader( + getClass().getClassLoader()); + + ObjectListing metaList = s3service.listObjects(META_BUCKET); + List deleteList = new ArrayList(); + for (S3ObjectSummary s3ObjSumm : metaList.getObjectSummaries()) { + if (s3ObjSumm.getKey().startsWith(prefix)) { + deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey())); + } + } + if (deleteList.size() > 0) { + DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(META_BUCKET); + delObjsReq.setKeys(deleteList); + DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq); + } + } finally { + if (contextClassLoader != null) { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + } + + /** + * S3DataRecord which lazily retrieves the input stream of the record. + */ + static class S3DataRecord implements DataRecord { + private AmazonS3Client s3service; + private DataIdentifier identifier; + private long length; + private long lastModified; + + public S3DataRecord(AmazonS3Client s3service, String key, long lastModified, long length) { + this.s3service = s3service; + this.identifier = new DataIdentifier(key); + this.lastModified = lastModified; + this.length = length; + } + + @Override + public DataIdentifier getIdentifier() { + return identifier; + } + + @Override + public String getReference() { + return identifier.toString(); + } + + @Override + public long getLength() throws DataStoreException { + return length; + } + + @Override + public InputStream getStream() throws DataStoreException { + return s3service.getObject(META_BUCKET, identifier.toString()).getObjectContent(); + } + + @Override + public long getLastModified() { + return lastModified; + } + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java (revision ) @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import java.io.InputStream; +import java.util.List; + +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; + +/** + * Interface to be implemented by a shared data store. + */ +public interface SharedDataStore { + /** + * Explicitly identifies the type of the data store + */ + enum Type { + SHARED, DEFAULT + } + + /** + * Adds the root record. + * + * @param stream the stream + * @param name the name of the root record + * @return the data record + * @throws DataStoreException the data store exception + */ + void addMetadataRecord(InputStream stream, String name) + throws DataStoreException; + + DataRecord getMetadataRecord(String name); + + /** + * Gets the all root records. + * + * @return the all root records + */ + List getAllMetadataRecords(String prefix); + + /** + * Deletes the root record represented by the given parameters. + * + * @param name the name of the root record + * @return success/failure + */ + boolean deleteMetadataRecord(String name); + + /** + * Deletes all records matching the given prefix. + * + * @param prefix metadata type identifier + */ + void deleteAllMetadataRecords(String prefix); + + /** + * Gets the type. + * + * @return the type + */ + Type getType(); +} + Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedS3DataStore.java (revision ) @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob.datastore; + +import org.apache.jackrabbit.aws.ext.ds.S3DataStore; +import org.apache.jackrabbit.core.data.Backend; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; + +import java.io.InputStream; +import java.util.List; +import java.util.Properties; + +/** + * {@link org.apache.jackrabbit.oak.plugins.blob.SharedDataStore} implementation for + * {@link org.apache.jackrabbit.aws.ext.ds.S3DataStore} + */ +public class SharedS3DataStore extends S3DataStore implements SharedDataStore { + private Properties properties; + private SharedS3Backend backend; + + @Override + protected Backend createBackend() { + backend = new SharedS3Backend(); + if(properties != null){ + backend.setProperties(properties); + } + return backend; + } + + @Override + public void addMetadataRecord(InputStream stream, String name) throws DataStoreException { + backend.addMetadataRecord(stream, name); + } + + @Override + public DataRecord getMetadataRecord(String name) { + return backend.getMetadataRecord(name); + } + + @Override + public List getAllMetadataRecords(String prefix) { + return backend.getAllMetadataRecords(prefix); + } + + @Override + public boolean deleteMetadataRecord(String name) { + return backend.deleteMetadataRecord(name); + } + + @Override + public void deleteAllMetadataRecords(String prefix) { + backend.deleteAllMetadataRecords(prefix); + } + + @Override public Type getType() { + return Type.SHARED; + } +}