Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (revision 1631345) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (working copy) @@ -35,6 +35,8 @@ import org.apache.jackrabbit.aws.ext.S3Constants; import org.apache.jackrabbit.aws.ext.Utils; +import org.apache.jackrabbit.core.data.AsyncTouchCallback; +import org.apache.jackrabbit.core.data.AsyncTouchResult; import org.apache.jackrabbit.core.data.AsyncUploadCallback; import org.apache.jackrabbit.core.data.AsyncUploadResult; import org.apache.jackrabbit.core.data.Backend; @@ -59,8 +61,11 @@ import com.amazonaws.services.s3.model.Region; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.transfer.Copy; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.Upload; +import com.amazonaws.services.s3.transfer.Transfer.TransferState; +import com.amazonaws.services.s3.transfer.model.CopyResult; /** * A data store backend that stores data on Amazon S3. @@ -290,7 +295,8 @@ CopyObjectRequest copReq = new CopyObjectRequest(bucket, key, bucket, key); copReq.setNewObjectMetadata(objectMetaData); - s3service.copyObject(copReq); + Copy copy = tmx.copy(copReq); + copy.waitForCopyResult(); LOG.debug("[{}] touched took [{}] ms. ", identifier, (System.currentTimeMillis() - start)); } @@ -319,8 +325,79 @@ retVal, (System.currentTimeMillis() - start) }); return retVal; } + + @Override + public void touchAsync(final DataIdentifier identifier, + final long minModifiedDate, final AsyncTouchCallback callback) + throws DataStoreException { + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + if (callback == null) { + throw new IllegalArgumentException( + "callback parameter cannot be null in touchAsync"); + } + Thread.currentThread().setContextClassLoader( + getClass().getClassLoader()); + asyncWriteExecuter.execute(new Runnable() { + @Override + public void run() { + try { + touch(identifier, minModifiedDate); + callback.onSuccess(new AsyncTouchResult(identifier)); + } catch (DataStoreException e) { + AsyncTouchResult result = new AsyncTouchResult( + identifier); + result.setException(e); + callback.onFailure(result); + } + } + }); + } catch (Exception e) { + callback.onAbort(new AsyncTouchResult(identifier)); + throw new DataStoreException("Cannot touch the record " + + identifier.toString(), e); + } finally { + if (contextClassLoader != null) { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + + } + @Override + public void touch(DataIdentifier identifier, long minModifiedDate) + throws DataStoreException { + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final long start = System.currentTimeMillis(); + final String key = getKeyName(identifier); + if (minModifiedDate > 0 + && minModifiedDate > getLastModified(identifier)) { + CopyObjectRequest copReq = new CopyObjectRequest(bucket, key, + bucket, key); + copReq.setNewObjectMetadata(new ObjectMetadata()); + Copy copy = tmx.copy(copReq); + copy.waitForCompletion(); + LOG.debug("[{}] touched. time taken [{}] ms ", new Object[] { + identifier, (System.currentTimeMillis() - start) }); + } else { + LOG.debug("[{}] touch not required. time taken [{}] ms ", + new Object[] { identifier, + (System.currentTimeMillis() - start) }); + } + + } catch (Exception e) { + throw new DataStoreException("Error occured in touching key [" + + identifier.toString() + "]", e); + } finally { + if (contextClassLoader != null) { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + } + + @Override public InputStream read(DataIdentifier identifier) throws DataStoreException { long start = System.currentTimeMillis(); @@ -471,8 +548,16 @@ getIdentifierName(s3ObjSumm.getKey())); long lastModified = s3ObjSumm.getLastModified().getTime(); LOG.debug("Identifier [{}]'s lastModified = [{}]", identifier, lastModified); - if (!store.isInUse(identifier) && lastModified < min) { - LOG.debug("add id [{}] to delete lists", s3ObjSumm.getKey()); + if (lastModified < min + && store.confirmDelete(identifier) + // confirm once more that record's lastModified < min + // order is important here + && s3service.getObjectMetadata(bucket, + s3ObjSumm.getKey()).getLastModified().getTime() < min) { + + + LOG.debug("add id [{}] to delete lists", + s3ObjSumm.getKey()); deleteList.add(new DeleteObjectsRequest.KeyVersion( s3ObjSumm.getKey())); deleteIdSet.add(identifier); @@ -513,10 +598,10 @@ @Override public void close() { // backend is closing. abort all mulitpart uploads from start. + asyncWriteExecuter.shutdownNow(); tmx.abortMultipartUploads(bucket, startTime); tmx.shutdownNow(); s3service.shutdown(); - asyncWriteExecuter.shutdownNow(); LOG.info("S3Backend closed."); } @@ -567,10 +652,20 @@ CopyObjectRequest copReq = new CopyObjectRequest(bucket, key, bucket, key); copReq.setNewObjectMetadata(objectMetaData); - s3service.copyObject(copReq); - LOG.debug("lastModified of [{}] updated successfully.", identifier); - if (callback != null) { - callback.onSuccess(new AsyncUploadResult(identifier, file)); + Copy copy = tmx.copy(copReq); + try { + copy.waitForCopyResult(); + LOG.debug("lastModified of [{}] updated successfully.", identifier); + if (callback != null) { + callback.onSuccess(new AsyncUploadResult(identifier, file)); + } + }catch (Exception e2) { + AsyncUploadResult asyncUpRes= new AsyncUploadResult(identifier, file); + asyncUpRes.setException(e2); + if (callback != null) { + callback.onAbort(asyncUpRes); + } + throw new DataStoreException("Could not upload " + key, e2); } } @@ -594,10 +689,12 @@ identifier, file)); } } - } catch (Exception e2) { - if (!asyncUpload) { - callback.onAbort(new AsyncUploadResult(identifier, file)); - } + } catch (Exception e2 ) { + AsyncUploadResult asyncUpRes= new AsyncUploadResult(identifier, file); + asyncUpRes.setException(e2); + if (callback != null) { + callback.onAbort(asyncUpRes); + } throw new DataStoreException("Could not upload " + key, e2); } } @@ -721,6 +818,7 @@ } return key.substring(0, 4) + key.substring(5); } + /** * The class renames object key in S3 in a thread. @@ -737,8 +835,15 @@ String newS3Key = convertKey(oldKey); CopyObjectRequest copReq = new CopyObjectRequest(bucket, oldKey, bucket, newS3Key); - s3service.copyObject(copReq); - LOG.debug("[{}] renamed to [{}] ", oldKey, newS3Key); + Copy copy = tmx.copy(copReq); + try { + copy.waitForCopyResult(); + LOG.debug("[{}] renamed to [{}] ", oldKey, newS3Key); + } catch (InterruptedException ie) { + LOG.error(" Exception in renaming [{}] to [{}] ", + new Object[] { ie, oldKey, newS3Key }); + } + } finally { if (contextClassLoader != null) { Thread.currentThread().setContextClassLoader( @@ -797,7 +902,7 @@ } } } - + /** * This class implements {@link Runnable} interface to upload {@link File} * to S3 asynchronously. @@ -828,4 +933,6 @@ } } + + } Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DSAsyncTouch.java =================================================================== --- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DSAsyncTouch.java (revision 0) +++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DSAsyncTouch.java (working copy) @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.aws.ext.ds; + +import javax.jcr.RepositoryException; + +import org.apache.jackrabbit.core.data.CachingDataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test {@link CachingDataStore} with + * {@link CachingDataStore#setTouchAsync(boolean)with set to true. It requires + * to pass aws config file via system property. For e.g. + * -Dconfig=/opt/cq/aws.properties. Sample aws properties located at + * src/test/resources/aws.properties + */ +public class TestS3DsAsyncTouch extends TestS3Ds { + + protected static final Logger LOG = LoggerFactory.getLogger(TestS3DsAsyncTouch.class); + + public TestS3DsAsyncTouch() { + config = System.getProperty(CONFIG); + memoryBackend = false; + noCache = false; + } + + protected CachingDataStore createDataStore() throws RepositoryException { + ds = new S3TestDataStore(String.valueOf(randomGen.nextInt(9999)) + "-test"); + ds.setConfig(config); + ds.init(dataStoreDir); + ds.setTouchAsync(true); + ds.updateModifiedDateOnAccess(System.currentTimeMillis()+ 50* 1000); + return ds; + } +} Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DSWithSmallCache.java =================================================================== --- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DSWithSmallCache.java (revision 0) +++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DSWithSmallCache.java (working copy) @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.aws.ext.ds; + +import javax.jcr.RepositoryException; + +import org.apache.jackrabbit.core.data.CachingDataStore; +import org.apache.jackrabbit.core.data.LocalCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test {@link CachingDataStore} with S3Backend and with very small size (@link + * {@link LocalCache}. It requires to pass aws config file via system property. + * For e.g. -Dconfig=/opt/cq/aws.properties. Sample aws properties located at + * src/test/resources/aws.properties + */ +public class TestS3DsWithSmallCache extends TestS3Ds { + + protected static final Logger LOG = LoggerFactory.getLogger(TestS3DsWithSmallCache.class); + + public TestS3DsWithSmallCache() { + config = System.getProperty(CONFIG); + memoryBackend = false; + noCache = false; + } + + protected CachingDataStore createDataStore() throws RepositoryException { + ds = new S3TestDataStore(String.valueOf(randomGen.nextInt(9999)) + "-test"); + ds.setConfig(config); + ds.setCacheSize(dataLength * 10); + ds.setCachePurgeTrigFactor(0.5d); + ds.setCachePurgeResizeFactor(0.4d); + ds.init(dataStoreDir); + return ds; + } +} Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java =================================================================== --- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java (revision 1631345) +++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java (working copy) @@ -22,7 +22,9 @@ import junit.framework.TestSuite; import org.apache.jackrabbit.aws.ext.ds.TestS3Ds; +import org.apache.jackrabbit.aws.ext.ds.TestS3DsAsyncTouch; import org.apache.jackrabbit.aws.ext.ds.TestS3DsCacheOff; +import org.apache.jackrabbit.aws.ext.ds.TestS3DsWithSmallCache; import org.apache.jackrabbit.core.data.TestCaseBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +44,13 @@ */ public static Test suite() { TestSuite suite = new TestSuite("S3 tests"); + System.setProperty(TestCaseBase.CONFIG, "C:/sourceCodeGit/granite-modules/temp-crx-ext-s3/crx-ext-s3/src/test/resources/aws.properties"); String config = System.getProperty(TestCaseBase.CONFIG); LOG.info("config= " + config); if (config != null && !"".equals(config.trim())) { suite.addTestSuite(TestS3Ds.class); + suite.addTestSuite(TestS3DsAsyncTouch.class); + suite.addTestSuite(TestS3DsWithSmallCache.class); suite.addTestSuite(TestS3DsCacheOff.class); } return suite; Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncTouchCallback.java =================================================================== --- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncTouchCallback.java (revision 0) +++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncTouchCallback.java (working copy) @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.core.data; +/** + * This interface defines callback methods to reflect the status of asynchronous + * touch. + */ +public interface AsyncTouchCallback { + + + /** + * Callback method for successful asynchronous touch. + */ + public void onSuccess(AsyncTouchResult result); + + /** + * Callback method for failed asynchronous touch. + */ + public void onFailure(AsyncTouchResult result); + + /** + * Callback method for aborted asynchronous touch. + */ + public void onAbort(AsyncTouchResult result); + +} Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncTouchResult.java =================================================================== --- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncTouchResult.java (revision 0) +++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncTouchResult.java (working copy) @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.data; + +/** + * + * The class holds the result of asynchronous touch to {@link Backend} + */ +public class AsyncTouchResult { + /** + * {@link DataIdentifier} on which asynchronous touch is initiated. + */ + private final DataIdentifier identifier; + /** + * Any {@link Exception} which is raised in asynchronously touch. + */ + private Exception exception; + + public AsyncTouchResult(DataIdentifier identifier) { + super(); + this.identifier = identifier; + } + + public DataIdentifier getIdentifier() { + return identifier; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } + +} Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java =================================================================== --- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java (revision 1631345) +++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java (working copy) @@ -131,6 +131,35 @@ * @throws DataStoreException */ boolean exists(DataIdentifier identifier) throws DataStoreException; + + /** + * Update the lastModified of record if it's lastModified < minModifiedDate. + * + * @param identifier + * @param minModifiedDate + * @throws DataStoreException + */ + void touch(final DataIdentifier identifier, long minModifiedDate) + throws DataStoreException; + + /** + * Update the lastModified of record if it's lastModified < minModifiedDate + * asynchronously. Result of update is passed using appropriate + * {@link AsyncTouchCallback} methods. If identifier's lastModified > + * minModified {@link AsyncTouchCallback#onAbort(AsyncTouchResult)} is + * called. Any exception is communicated through + * {@link AsyncTouchCallback#onFailure(AsyncTouchResult)} . On successful + * update of lastModified, + * {@link AsyncTouchCallback#onSuccess(AsyncTouchResult)(AsyncTouchResult)} + * is invoked. + * + * @param identifiera + * @param minModifiedDate + * @param callback + * @throws DataStoreException + */ + void touchAsync(final DataIdentifier identifier, long minModifiedDate, + final AsyncTouchCallback callback) throws DataStoreException; /** * Close backend and release resources like database connection if any. Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java =================================================================== --- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (revision 1631345) +++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (working copy) @@ -75,10 +75,11 @@ * <param name="{@link #setConcurrentUploadsThreads(int) concurrentUploadsThreads}" value="10"/> * <param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/> * <param name="{@link #setUploadRetries(int) uploadRetries}" value="3"/> + * <param name="{@link #setTouchAsync(boolean) touchAsync}" value="false"/> * </DataStore> */ public abstract class CachingDataStore extends AbstractDataStore implements - MultiDataStoreAware, AsyncUploadCallback { + MultiDataStoreAware, AsyncUploadCallback, AsyncTouchCallback { /** * Logger instance. @@ -112,6 +113,12 @@ * is not required to be persisted. */ protected final Map uploadRetryMap = new ConcurrentHashMap(5); + + /** + * In memory map to hold in-progress asychronous touch. Once touch is + * successful corresponding entry is flused from the map. + */ + protected final Map asyncTouchCache = new ConcurrentHashMap(5); protected Backend backend; @@ -127,6 +134,11 @@ private File tmpDir; private String secret; + + /** + * Flag to indicate if lastModified is updated asynchronously. + */ + private boolean touchAsync = false; /** * The optional backend configuration. @@ -385,54 +397,53 @@ public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException { String fileName = getFileName(identifier); - boolean touch = minModifiedDate > 0 ? true : false; - synchronized (this) { - try { - if (asyncWriteCache.hasEntry(fileName, touch)) { - usesIdentifier(identifier); - return new CachingDataRecord(this, identifier); - } else if (cache.getFileIfStored(fileName) != null) { - if (touch) { - backend.exists(identifier, touch); - } - usesIdentifier(identifier); - return new CachingDataRecord(this, identifier); - } else if (backend.exists(identifier, touch)) { - usesIdentifier(identifier); - return new CachingDataRecord(this, identifier); - } + try { + if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) { + LOG.debug("[{}] record retrieved from asyncUploadmap", + identifier); + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); + } else if (cache.getFileIfStored(fileName) != null + || backend.exists(identifier)) { + LOG.debug("[{}] record retrieved from local cache or backend", + identifier); + touchInternal(identifier); + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); + } - } catch (IOException ioe) { - throw new DataStoreException("error in getting record [" - + identifier + "]", ioe); - } + } catch (IOException ioe) { + throw new DataStoreException("error in getting record [" + + identifier + "]", ioe); } throw new DataStoreException("Record not found: " + identifier); } - + /** * Get a data record for the given identifier or null it data record doesn't * exist in {@link Backend} * - * @param identifier - * identifier of record. + * @param identifier identifier of record. * @return the {@link CachingDataRecord} or null. */ @Override public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException { String fileName = getFileName(identifier); - boolean touch = minModifiedDate > 0 ? true : false; - synchronized (this) { - try { - if (asyncWriteCache.hasEntry(fileName, touch) - || backend.exists(identifier, touch)) { - usesIdentifier(identifier); - return new CachingDataRecord(this, identifier); - } - } catch (IOException ioe) { - throw new DataStoreException(ioe); + try { + if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) { + LOG.debug("[{}] record retrieved from asyncuploadmap", + identifier); + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); + } else if (backend.exists(identifier)) { + LOG.debug("[{}] record retrieved from backend", identifier); + touchInternal(identifier); + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); } + } catch (IOException ioe) { + throw new DataStoreException(ioe); } return null; } @@ -534,14 +545,20 @@ long lastModified = asyncWriteCache.getLastModified(fileName); if (lastModified != 0) { LOG.debug( - "identifier [{}]'s lastModified retrireved from AsyncUploadCache ", - identifier); + "identifier [{}], lastModified=[{}] retrireved from AsyncUploadCache ", + identifier, lastModified); + } else if (asyncTouchCache.get(identifier) != null) { + lastModified = asyncTouchCache.get(identifier); + LOG.debug( + "identifier [{}], lastModified=[{}] retrireved from asyncTouchCache ", + identifier, lastModified); } else { - lastModified = backend.getLastModified(identifier); + lastModified = backend.getLastModified(identifier); + LOG.debug( + "identifier [{}], lastModified=[{}] retrireved from backend ", + identifier, lastModified); } - LOG.debug("identifier= [{}], lastModified=[{}]", identifier, - lastModified); return lastModified; } @@ -555,23 +572,8 @@ if (length != null) { return length.longValue(); } else { - InputStream in = null; - InputStream cachedStream = null; - try { - in = backend.read(identifier); - cachedStream = cache.store(fileName, in); - } catch (IOException e) { - throw new DataStoreException("IO Exception: " + identifier, e); - } finally { - IOUtils.closeQuietly(in); - IOUtils.closeQuietly(cachedStream); - } - length = cache.getFileLength(fileName); - if (length != null) { - return length.longValue(); - } + return backend.getLength(identifier); } - return backend.getLength(identifier); } @Override @@ -600,6 +602,10 @@ if (cachedResult.doRequiresDelete()) { // added record already marked for delete deleteRecord(identifier); + } else { + // async upload took lot of time. + // getRecord to touch if required. + getRecord(identifier); } } catch (IOException ie) { LOG.warn("Cannot remove pending file upload. Dataidentifer [ " @@ -662,6 +668,8 @@ File file = result.getFile(); String fileName = getFileName(identifier); try { + // remove from failed upload map if any. + uploadRetryMap.remove(identifier); asyncWriteCache.remove(fileName); LOG.info( "Async Upload Aborted. Dataidentifer [{}], file [{}] removed from AsyncCache.", @@ -672,8 +680,91 @@ } } + + @Override + public void onSuccess(AsyncTouchResult result) { + asyncTouchCache.remove(result.getIdentifier()); + LOG.debug(" Async Touch succeed. Removed [{}] from asyncTouchCache", + result.getIdentifier()); + } + + @Override + public void onFailure(AsyncTouchResult result) { + LOG.warn(" Async Touch failed. Not removing [{}] from asyncTouchCache", + result.getIdentifier()); + if (result.getException() != null) { + LOG.debug(" Async Touch failed. exception", result.getException()); + } + } + + @Override + public void onAbort(AsyncTouchResult result) { + asyncTouchCache.remove(result.getIdentifier()); + LOG.debug(" Async Touch aborted. Removed [{}] from asyncTouchCache", + result.getIdentifier()); + } + /** + * Method to confirm that identifier can be deleted from {@link Backend} + * + * @param identifier + * @return + */ + public boolean confirmDelete(DataIdentifier identifier) { + if (isInUse(identifier)) { + LOG.debug("identifier [{}] is inUse confirmDelete= false ", + identifier); + return false; + } + + String fileName = getFileName(identifier); + long lastModified = asyncWriteCache.getLastModified(fileName); + if (lastModified != 0) { + LOG.debug( + "identifier [{}] is asyncWriteCache map confirmDelete= false ", + identifier); + return false; + + } + if (asyncTouchCache.get(identifier) != null) { + LOG.debug( + "identifier [{}] is asyncTouchCache confirmDelete = false ", + identifier); + return false; + } + + return true; + } + + /** + * Internal method to touch identifier in @link {@link Backend}. if + * {@link #touchAsync}, the record is updated asynchronously. + * + * @param identifier + * @throws DataStoreException + */ + private void touchInternal(DataIdentifier identifier) + throws DataStoreException { + + if (touchAsync) { + Long lastModified = asyncTouchCache.put(identifier, + System.currentTimeMillis()); + + if (lastModified == null) { + LOG.debug("Async touching [{}] ", identifier); + backend.touchAsync(identifier, minModifiedDate, this); + } else { + LOG.debug( "Touched in asyncTouchMap [{}]", identifier); + } + + } else { + backend.touch(identifier, minModifiedDate); + } + } + + + /** * Returns a unique temporary file to be used for creating a new data * record. */ @@ -972,7 +1063,13 @@ public void setUploadRetries(int uploadRetries) { this.uploadRetries = uploadRetries; } + + + public void setTouchAsync(boolean touchAsync) { + this.touchAsync = touchAsync; + } + public Backend getBackend() { return backend; } Index: jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java =================================================================== --- jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (revision 1631345) +++ jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (working copy) @@ -43,12 +43,15 @@ private HashMap data = new HashMap(); private HashMap timeMap = new HashMap(); + + private CachingDataStore store; @Override public void init(CachingDataStore store, String homeDir, String config) throws DataStoreException { // ignore log("init"); + this.store = store; } @Override @@ -110,7 +113,8 @@ for (Map.Entry entry : timeMap.entrySet()) { DataIdentifier identifier = entry.getKey(); long timestamp = entry.getValue(); - if (timestamp < min) { + if (timestamp < min && !store.isInUse(identifier) + && store.confirmDelete(identifier)) { tobeDeleted.add(identifier); } } @@ -140,7 +144,19 @@ } return retVal; } + + @Override + public void touch(DataIdentifier identifier, long minModifiedDate) { + timeMap.put(identifier, System.currentTimeMillis()); + } + @Override + public void touchAsync(DataIdentifier identifier, long minModifiedDate, + AsyncTouchCallback callback) { + timeMap.put(identifier, System.currentTimeMillis()); + callback.onSuccess(new AsyncTouchResult(identifier)); + } + private void write(final DataIdentifier identifier, final File file, final boolean async, final AsyncUploadCallback callback) throws DataStoreException { Index: jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestCaseBase.java =================================================================== --- jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestCaseBase.java (revision 1631345) +++ jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestCaseBase.java (working copy) @@ -81,7 +81,7 @@ /** * length of record to be added */ - private int dataLength = 123456; + protected int dataLength = 123456; /** * datastore directory path @@ -154,7 +154,7 @@ LOG.error("error:", e); } } - + /** * Testcase to validate {@link DataStore#getAllIdentifiers()} API. */ @@ -253,10 +253,10 @@ try { long start = System.currentTimeMillis(); LOG.info("Testcase: " + this.getClass().getName() - + "#test, testDir=" + dataStoreDir); + + "#testSingleThread, testDir=" + dataStoreDir); doTestSingleThread(); LOG.info("Testcase: " + this.getClass().getName() - + "#test finished, time taken = [" + + "#testSingleThread finished, time taken = [" + (System.currentTimeMillis() - start) + "]ms"); } catch (Exception e) { LOG.error("error:", e); @@ -446,7 +446,7 @@ DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data)); // sleep for some time to ensure that async upload completes in backend. - sleep(6000); + sleep(10000); long updateTime = System.currentTimeMillis(); ds.updateModifiedDateOnAccess(updateTime); @@ -574,7 +574,7 @@ ArrayList list = new ArrayList(); HashMap map = new HashMap(); for (int i = 0; i < 10; i++) { - int size = 1000000 - (i * 100); + int size = 100000 - (i * 100); RandomInputStream in = new RandomInputStream(size + offset, size); DataRecord rec = ds.addRecord(in); list.add(rec);