Index: oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java =================================================================== --- oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java (revision 0) +++ oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java (working copy) @@ -0,0 +1,647 @@ +package org.apache.jackrabbit.oak.commons.sort; + +// filename: ExternalSort.java +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.zip.Deflater; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +/** + * Source copied from a publicly available library. + * @see https://code.google.com/p/externalsortinginjava + * + * Goal: offer a generic external-memory sorting program in Java. + * + * It must be : - hackable (easy to adapt) - scalable to large files - sensibly efficient. + * + * This software is in the public domain. + * + * Usage: java org/apache/oak/commons/sort//ExternalSort somefile.txt out.txt + * + * You can change the default maximal number of temporary files with the -t flag: java + * org/apache/oak/commons/sort/ExternalSort somefile.txt out.txt -t 3 + * + * For very large files, you might want to use an appropriate flag to allocate more memory to the + * Java VM: java -Xms2G org/apache/oak/commons/sort/ExternalSort somefile.txt out.txt + * + * By (in alphabetical order) Philippe Beaudoin, Eleftherios Chetzakis, Jon Elsas, Christan Grant, + * Daniel Haran, Daniel Lemire, Sugumaran Harikrishnan, Jerry Yang, First published: April 2010 + * originally posted at http://lemire.me/blog/archives/2010/04/01/external-memory-sorting-in-java/ + */ +public class ExternalSort { + + /* + * This sorts a file (input) to an output file (output) using default parameters + * + * @param file source file + * + * @param file output file + */ + public static void sort(File input, File output) throws IOException { + ExternalSort.mergeSortedFiles(ExternalSort.sortInBatch(input), output); + } + + static int DEFAULTMAXTEMPFILES = 1024; + + // we divide the file into small blocks. If the blocks + // are too small, we shall create too many temporary files. + // If they are too big, we shall be using too much memory. + public static long estimateBestSizeOfBlocks(File filetobesorted, + int maxtmpfiles) { + long sizeoffile = filetobesorted.length() * 2; + /** + * We multiply by two because later on someone insisted on counting the memory usage as 2 + * bytes per character. By this model, loading a file with 1 character will use 2 bytes. + */ + // we don't want to open up much more than maxtmpfiles temporary + // files, better run + // out of memory first. + long blocksize = sizeoffile / maxtmpfiles + + (sizeoffile % maxtmpfiles == 0 ? 0 : 1); + + // on the other hand, we don't want to create many temporary + // files + // for naught. If blocksize is smaller than half the free + // memory, grow it. + long freemem = Runtime.getRuntime().freeMemory(); + if (blocksize < freemem / 2) { + blocksize = freemem / 2; + } + return blocksize; + } + + /** + * This will simply load the file by blocks of lines, then sort them in-memory, and write the + * result to temporary files that have to be merged later. + * + * @param file + * some flat file + * @param cmp + * string comparator + * @return a list of temporary flat files + */ + public static List sortInBatch(File file) + throws IOException { + return sortInBatch(file, defaultcomparator, DEFAULTMAXTEMPFILES, + Charset.defaultCharset(), null, false); + } + + /** + * This will simply load the file by blocks of lines, then sort them in-memory, and write the + * result to temporary files that have to be merged later. + * + * @param file + * some flat file + * @param cmp + * string comparator + * @return a list of temporary flat files + */ + public static List sortInBatch(File file, Comparator cmp) + throws IOException { + return sortInBatch(file, cmp, DEFAULTMAXTEMPFILES, + Charset.defaultCharset(), null, false); + } + + /** + * This will simply load the file by blocks of lines, then sort them in-memory, and write the + * result to temporary files that have to be merged later. + * + * @param file + * some flat file + * @param cmp + * string comparator + * @param distinct + * Pass true if duplicate lines should be discarded. + * @return a list of temporary flat files + */ + public static List sortInBatch(File file, Comparator cmp, + boolean distinct) throws IOException { + return sortInBatch(file, cmp, DEFAULTMAXTEMPFILES, + Charset.defaultCharset(), null, distinct); + } + + /** + * This will simply load the file by blocks of lines, then sort them in-memory, and write the + * result to temporary files that have to be merged later. You can specify a bound on the number + * of temporary files that will be created. + * + * @param file + * some flat file + * @param cmp + * string comparator + * @param maxtmpfiles + * maximal number of temporary files + * @param Charset + * character set to use (can use Charset.defaultCharset()) + * @param tmpdirectory + * location of the temporary files (set to null for default location) + * @param distinct + * Pass true if duplicate lines should be discarded. + * @param numHeader + * number of lines to preclude before sorting starts + * @parame usegzip use gzip compression for the temporary files + * @return a list of temporary flat files + */ + public static List sortInBatch(File file, Comparator cmp, + int maxtmpfiles, Charset cs, File tmpdirectory, + boolean distinct, int numHeader, boolean usegzip) + throws IOException { + List files = new ArrayList(); + BufferedReader fbr = new BufferedReader(new InputStreamReader( + new FileInputStream(file), cs)); + long blocksize = estimateBestSizeOfBlocks(file, maxtmpfiles);// in + // bytes + + try { + List tmplist = new ArrayList(); + String line = ""; + try { + int counter = 0; + while (line != null) { + long currentblocksize = 0;// in bytes + while ((currentblocksize < blocksize) + && ((line = fbr.readLine()) != null)) { + // as long as you have enough memory + if (counter < numHeader) { + counter++; + continue; + } + tmplist.add(line); + // ram usage estimation, not + // very accurate, still more + // realistic that the simple 2 * + // String.length + currentblocksize += StringSizeEstimator + .estimatedSizeOf(line); + } + files.add(sortAndSave(tmplist, cmp, cs, + tmpdirectory, distinct, usegzip)); + tmplist.clear(); + } + } catch (EOFException oef) { + if (tmplist.size() > 0) { + files.add(sortAndSave(tmplist, cmp, cs, + tmpdirectory, distinct, usegzip)); + tmplist.clear(); + } + } + } finally { + fbr.close(); + } + return files; + } + + /** + * This will simply load the file by blocks of lines, then sort them in-memory, and write the + * result to temporary files that have to be merged later. You can specify a bound on the number + * of temporary files that will be created. + * + * @param file + * some flat file + * @param cmp + * string comparator + * @param maxtmpfiles + * maximal number of temporary files + * @param Charset + * character set to use (can use Charset.defaultCharset()) + * @param tmpdirectory + * location of the temporary files (set to null for default location) + * @param distinct + * Pass true if duplicate lines should be discarded. + * @return a list of temporary flat files + */ + public static List sortInBatch(File file, Comparator cmp, + int maxtmpfiles, Charset cs, File tmpdirectory, boolean distinct) + throws IOException { + return sortInBatch(file, cmp, maxtmpfiles, cs, tmpdirectory, + distinct, 0, false); + } + + /** + * Sort a list and save it to a temporary file + * + * @return the file containing the sorted data + * @param tmplist + * data to be sorted + * @param cmp + * string comparator + * @param cs + * charset to use for output (can use Charset.defaultCharset()) + * @param tmpdirectory + * location of the temporary files (set to null for default location) + * @param distinct + * Pass true if duplicate lines should be discarded. + */ + public static File sortAndSave(List tmplist, + Comparator cmp, Charset cs, File tmpdirectory, + boolean distinct, boolean usegzip) throws IOException { + Collections.sort(tmplist, cmp); + File newtmpfile = File.createTempFile("sortInBatch", + "flatfile", tmpdirectory); + newtmpfile.deleteOnExit(); + OutputStream out = new FileOutputStream(newtmpfile); + int ZIPBUFFERSIZE = 2048; + if (usegzip) + out = new GZIPOutputStream(out, ZIPBUFFERSIZE) { + { + def.setLevel(Deflater.BEST_SPEED); + } + }; + BufferedWriter fbw = new BufferedWriter(new OutputStreamWriter( + out, cs)); + String lastLine = null; + try { + for (String r : tmplist) { + // Skip duplicate lines + if (!distinct || !r.equals(lastLine)) { + fbw.write(r); + fbw.newLine(); + lastLine = r; + } + } + } finally { + fbw.close(); + } + return newtmpfile; + } + + /** + * Sort a list and save it to a temporary file + * + * @return the file containing the sorted data + * @param tmplist + * data to be sorted + * @param cmp + * string comparator + * @param cs + * charset to use for output (can use Charset.defaultCharset()) + * @param tmpdirectory + * location of the temporary files (set to null for default location) + */ + public static File sortAndSave(List tmplist, + Comparator cmp, Charset cs, File tmpdirectory) + throws IOException { + return sortAndSave(tmplist, cmp, cs, tmpdirectory, false, false); + } + + /** + * This merges a bunch of temporary flat files + * + * @param files + * @param output + * file + * @return The number of lines sorted. (P. Beaudoin) + */ + public static int mergeSortedFiles(List files, File outputfile) throws IOException { + return mergeSortedFiles(files, outputfile, defaultcomparator, + Charset.defaultCharset()); + } + + /** + * This merges a bunch of temporary flat files + * + * @param files + * @param output + * file + * @return The number of lines sorted. (P. Beaudoin) + */ + public static int mergeSortedFiles(List files, File outputfile, + final Comparator cmp) throws IOException { + return mergeSortedFiles(files, outputfile, cmp, + Charset.defaultCharset()); + } + + /** + * This merges a bunch of temporary flat files + * + * @param files + * @param output + * file + * @return The number of lines sorted. (P. Beaudoin) + */ + public static int mergeSortedFiles(List files, File outputfile, + final Comparator cmp, boolean distinct) + throws IOException { + return mergeSortedFiles(files, outputfile, cmp, + Charset.defaultCharset(), distinct); + } + + /** + * This merges a bunch of temporary flat files + * + * @param files + * The {@link List} of sorted {@link File}s to be merged. + * @param Charset + * character set to use to load the strings + * @param distinct + * Pass true if duplicate lines should be discarded. (elchetz@gmail.com) + * @param outputfile + * The output {@link File} to merge the results to. + * @param cmp + * The {@link Comparator} to use to compare {@link String}s. + * @param cs + * The {@link Charset} to be used for the byte to character conversion. + * @param append + * Pass true if result should append to {@link File} instead of + * overwrite. Default to be false for overloading methods. + * @param usegzip + * assumes we used gzip compression for temporary files + * @return The number of lines sorted. (P. Beaudoin) + * @since v0.1.4 + */ + public static int mergeSortedFiles(List files, File outputfile, + final Comparator cmp, Charset cs, boolean distinct, + boolean append, boolean usegzip) throws IOException { + PriorityQueue pq = new PriorityQueue( + 11, new Comparator() { + @Override + public int compare(BinaryFileBuffer i, + BinaryFileBuffer j) { + return cmp.compare(i.peek(), j.peek()); + } + }); + ArrayList bfbs = new ArrayList(); + for (File f : files) { + final int BUFFERSIZE = 2048; + InputStream in = new FileInputStream(f); + BufferedReader br; + if (usegzip) { + br = new BufferedReader(new InputStreamReader( + new GZIPInputStream(in, BUFFERSIZE), cs)); + } else { + br = new BufferedReader(new InputStreamReader(in, + cs)); + } + + BinaryFileBuffer bfb = new BinaryFileBuffer(br); + bfbs.add(bfb); + } + BufferedWriter fbw = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(outputfile, append), cs)); + int rowcounter = merge(fbw, cmp, distinct, bfbs); + for (File f : files) + f.delete(); + return rowcounter; + } + + /** + * This merges several BinaryFileBuffer to an output writer. + * + * @param BufferedWriter + * A buffer where we write the data. + * @param cmp + * A comparator object that tells us how to sort the lines. + * @param distinct + * Pass true if duplicate lines should be discarded. (elchetz@gmail.com) + * @param buffers + * Where the data should be read. + * @return The number of lines sorted. (P. Beaudoin) + * + */ + public static int merge(BufferedWriter fbw, final Comparator cmp, boolean distinct, + List buffers) throws IOException { + PriorityQueue pq = new PriorityQueue( + 11, new Comparator() { + @Override + public int compare(BinaryFileBuffer i, + BinaryFileBuffer j) { + return cmp.compare(i.peek(), j.peek()); + } + }); + for (BinaryFileBuffer bfb : buffers) + if (!bfb.empty()) + pq.add(bfb); + int rowcounter = 0; + String lastLine = null; + try { + while (pq.size() > 0) { + BinaryFileBuffer bfb = pq.poll(); + String r = bfb.pop(); + // Skip duplicate lines + if (!distinct || !r.equals(lastLine)) { + fbw.write(r); + fbw.newLine(); + lastLine = r; + } + ++rowcounter; + if (bfb.empty()) { + bfb.fbr.close(); + } else { + pq.add(bfb); // add it back + } + } + } finally { + fbw.close(); + for (BinaryFileBuffer bfb : pq) + bfb.close(); + } + return rowcounter; + + } + + /** + * This merges a bunch of temporary flat files + * + * @param files + * The {@link List} of sorted {@link File}s to be merged. + * @param Charset + * character set to use to load the strings + * @param distinct + * Pass true if duplicate lines should be discarded. (elchetz@gmail.com) + * @param outputfile + * The output {@link File} to merge the results to. + * @param cmp + * The {@link Comparator} to use to compare {@link String}s. + * @param cs + * The {@link Charset} to be used for the byte to character conversion. + * @return The number of lines sorted. (P. Beaudoin) + * @since v0.1.2 + */ + public static int mergeSortedFiles(List files, File outputfile, + final Comparator cmp, Charset cs, boolean distinct) + throws IOException { + return mergeSortedFiles(files, outputfile, cmp, cs, distinct, + false, false); + } + + /** + * This merges a bunch of temporary flat files + * + * @param files + * @param output + * file + * @param Charset + * character set to use to load the strings + * @return The number of lines sorted. (P. Beaudoin) + */ + public static int mergeSortedFiles(List files, File outputfile, + final Comparator cmp, Charset cs) throws IOException { + return mergeSortedFiles(files, outputfile, cmp, cs, false); + } + + public static void displayUsage() { + System.out + .println("java com.google.externalsorting.ExternalSort inputfile outputfile"); + System.out.println("Flags are:"); + System.out.println("-v or --verbose: verbose output"); + System.out.println("-d or --distinct: prune duplicate lines"); + System.out + .println("-t or --maxtmpfiles (followed by an integer): specify an upper bound on the number of temporary files"); + System.out + .println("-c or --charset (followed by a charset code): specify the character set to use (for sorting)"); + System.out + .println("-z or --gzip: use compression for the temporary files"); + System.out + .println("-H or --header (followed by an integer): ignore the first few lines"); + System.out + .println("-s or --store (following by a path): where to store the temporary files"); + System.out.println("-h or --help: display this message"); + } + + public static void main(String[] args) throws IOException { + boolean verbose = false; + boolean distinct = false; + int maxtmpfiles = DEFAULTMAXTEMPFILES; + Charset cs = Charset.defaultCharset(); + String inputfile = null, outputfile = null; + File tempFileStore = null; + boolean usegzip = false; + int headersize = 0; + for (int param = 0; param < args.length; ++param) { + if (args[param].equals("-v") + || args[param].equals("--verbose")) { + verbose = true; + } else if ((args[param].equals("-h") || args[param] + .equals("--help"))) { + displayUsage(); + return; + } else if ((args[param].equals("-d") || args[param] + .equals("--distinct"))) { + distinct = true; + } else if ((args[param].equals("-t") || args[param] + .equals("--maxtmpfiles")) + && args.length > param + 1) { + param++; + maxtmpfiles = Integer.parseInt(args[param]); + if (headersize < 0) { + System.err + .println("maxtmpfiles should be positive"); + } + } else if ((args[param].equals("-c") || args[param] + .equals("--charset")) + && args.length > param + 1) { + param++; + cs = Charset.forName(args[param]); + } else if ((args[param].equals("-z") || args[param] + .equals("--gzip"))) { + usegzip = true; + } else if ((args[param].equals("-H") || args[param] + .equals("--header")) && args.length > param + 1) { + param++; + headersize = Integer.parseInt(args[param]); + if (headersize < 0) { + System.err + .println("headersize should be positive"); + } + } else if ((args[param].equals("-s") || args[param] + .equals("--store")) && args.length > param + 1) { + param++; + tempFileStore = new File(args[param]); + } else { + if (inputfile == null) + inputfile = args[param]; + else if (outputfile == null) + outputfile = args[param]; + else + System.out.println("Unparsed: " + + args[param]); + } + } + if (outputfile == null) { + System.out + .println("please provide input and output file names"); + displayUsage(); + return; + } + Comparator comparator = defaultcomparator; + List l = sortInBatch(new File(inputfile), comparator, + maxtmpfiles, cs, tempFileStore, distinct, headersize, + usegzip); + if (verbose) + System.out + .println("created " + l.size() + " tmp files"); + mergeSortedFiles(l, new File(outputfile), comparator, cs, + distinct, false, usegzip); + } + + public static Comparator defaultcomparator = new Comparator() { + @Override + public int compare(String r1, String r2) { + return r1.compareTo(r2); + } + }; + +} + +class BinaryFileBuffer { + public BufferedReader fbr; + private String cache; + private boolean empty; + + public BinaryFileBuffer(BufferedReader r) + throws IOException { + this.fbr = r; + reload(); + } + + public boolean empty() { + return this.empty; + } + + private void reload() throws IOException { + try { + if ((this.cache = this.fbr.readLine()) == null) { + this.empty = true; + this.cache = null; + } else { + this.empty = false; + } + } catch (EOFException oef) { + this.empty = true; + this.cache = null; + } + } + + public void close() throws IOException { + this.fbr.close(); + } + + public String peek() { + if (empty()) + return null; + return this.cache.toString(); + } + + public String pop() throws IOException { + String answer = peek(); + reload(); + return answer; + } + +} Index: oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSizeEstimator.java =================================================================== --- oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSizeEstimator.java (revision 0) +++ oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSizeEstimator.java (working copy) @@ -0,0 +1,70 @@ +/** + * + */ +package org.apache.jackrabbit.oak.commons.sort; + +/** + * Source copied from a publicly available library. + * @see https://code.google.com/p/externalsortinginjava + * + * @author Eleftherios Chetzakis + * + */ +public final class StringSizeEstimator { + + private static int OBJ_HEADER; + private static int ARR_HEADER; + private static int INT_FIELDS = 12; + private static int OBJ_REF; + private static int OBJ_OVERHEAD; + private static boolean IS_64_BIT_JVM; + + /** + * Private constructor to prevent instantiation. + */ + private StringSizeEstimator() { + } + + /** + * Class initializations. + */ + static { + // By default we assume 64 bit JVM + // (defensive approach since we will get + // larger estimations in case we are not sure) + IS_64_BIT_JVM = true; + // check the system property "sun.arch.data.model" + // not very safe, as it might not work for all JVM implementations + // nevertheless the worst thing that might happen is that the JVM is 32bit + // but we assume its 64bit, so we will be counting a few extra bytes per string object + // no harm done here since this is just an approximation. + String arch = System.getProperty("sun.arch.data.model"); + if (arch != null) { + if (arch.indexOf("32") != -1) { + // If exists and is 32 bit then we assume a 32bit JVM + IS_64_BIT_JVM = false; + } + } + // The sizes below are a bit rough as we don't take into account + // advanced JVM options such as compressed oops + // however if our calculation is not accurate it'll be a bit over + // so there is no danger of an out of memory error because of this. + OBJ_HEADER = IS_64_BIT_JVM ? 16 : 8; + ARR_HEADER = IS_64_BIT_JVM ? 24 : 12; + OBJ_REF = IS_64_BIT_JVM ? 8 : 4; + OBJ_OVERHEAD = OBJ_HEADER + INT_FIELDS + OBJ_REF + ARR_HEADER; + + } + + /** + * Estimates the size of a {@link String} object in bytes. + * + * @param s The string to estimate memory footprint. + * @return The estimated size in bytes. + */ + public static long estimatedSizeOf(String s) { + return (s.length() * 2) + OBJ_OVERHEAD; + } + +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java (working copy) @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +/** + * Interface for blob garbage collector + */ +public interface BlobGarbageCollector { + + /** + * Collect garbage blobs from the passed node store instance. + * + * @throws Exception + */ + public void collectGarbage() throws Exception; +} \ No newline at end of file Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java (revision 1567531) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java (working copy) @@ -16,29 +16,34 @@ */ package org.apache.jackrabbit.oak.plugins.blob.cloud; +import static org.jclouds.blobstore.options.ListContainerOptions.Builder.maxResults; import static org.jclouds.blobstore.options.PutOptions.Builder.multipart; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Iterator; import java.util.Map; +import java.util.NoSuchElementException; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.io.ByteStreams; + import org.apache.jackrabbit.mk.blobs.AbstractBlobStore; import org.apache.jackrabbit.mk.util.StringUtils; import org.jclouds.ContextBuilder; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.PageSet; +import org.jclouds.blobstore.domain.StorageMetadata; import org.jclouds.io.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.io.ByteStreams; - /** * Implementation of the {@link BlobStore} to store blobs in a cloud blob store. *

- * Extends {@link AbstractBlobStore} and breaks the the binary to chunks for - * easier management. + * Extends {@link AbstractBlobStore} and breaks the the binary to chunks for easier management. */ public class CloudBlobStore extends AbstractBlobStore { /** @@ -111,9 +116,9 @@ .buildView(BlobStoreContext.class); context.getBlobStore().createContainerInLocation(null, cloudContainer); - LOG.info("Using bucket: " + cloudContainer); + LOG.info("Using container : " + cloudContainer); } catch (Exception e) { - LOG.error("Error creating S3BlobStore : ", e); + LOG.error("Error creating CloudBlobStore : ", e); throw e; } } @@ -212,4 +217,88 @@ protected boolean isMarkEnabled() { return false; } + + @Override + public Iterator getAllChunkIds( + long maxLastModifiedTime) throws Exception { + Preconditions.checkNotNull(context); + + final org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore(); + return new CloudStoreIterator(blobStore, maxLastModifiedTime); } + + @Override + public boolean deleteChunk(String chunkId) throws Exception { + Preconditions.checkNotNull(context); + + final org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore(); + blobStore.removeBlob(cloudContainer, chunkId); + + return true; + } + + class CloudStoreIterator implements Iterator { + static final int BATCH = 1000; + + org.jclouds.blobstore.BlobStore store; + long maxLastModifiedTime; + + PageSet set; + ArrayDeque queue; + + public CloudStoreIterator(org.jclouds.blobstore.BlobStore store, + long maxLastModifiedTime) { + this.store = store; + this.maxLastModifiedTime = maxLastModifiedTime; + this.queue = new ArrayDeque(BATCH); + } + + @Override + public boolean hasNext() { + if ((set == null) || (queue == null)) { + set = store.list(cloudContainer, maxResults(1000)); + loadElements(set); + } + + if (!queue.isEmpty()) { + return true; + } else if (set.getNextMarker() != null) { + set = store.list(cloudContainer, + maxResults(BATCH).afterMarker(set.getNextMarker())); + loadElements(set); + + if (!queue.isEmpty()) { + return true; + } + } + + return false; + } + + private void loadElements(PageSet set) { + Iterator iter = set.iterator(); + while (iter.hasNext()) { + StorageMetadata metadata = iter.next(); + if ((maxLastModifiedTime == 0 || maxLastModifiedTime == -1) || + (metadata.getLastModified().getTime() <= maxLastModifiedTime)) { + queue.add(metadata.getName()); + } else { + queue.add(metadata.getName()); + } + } + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements"); + } + return queue.poll(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (revision 1567531) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (working copy) @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.concurrent.ExecutionException; import org.apache.jackrabbit.core.data.CachingDataStore; @@ -28,6 +30,7 @@ import org.apache.jackrabbit.core.data.DataRecord; import org.apache.jackrabbit.core.data.DataStore; import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.core.data.MultiDataStoreAware; import org.apache.jackrabbit.mk.blobs.BlobStore; import org.apache.jackrabbit.mk.blobs.GarbageCollectableBlobStore; import org.apache.jackrabbit.mk.util.Cache; @@ -42,6 +45,7 @@ import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Lists; /** * A {@link BlobStore} implementation which is a compatibility wrapper for @@ -508,4 +512,50 @@ // no-op return 0; } + + /** + * Ignores the maxLastModifiedTime currently. + */ + @Override + public Iterator getAllChunkIds( + long maxLastModifiedTime) throws Exception { + return new DataStoreIterator(dataStore.getAllIdentifiers()); } + + @Override + public boolean deleteChunk(String blobId) throws Exception { + ((MultiDataStoreAware) dataStore).deleteRecord(new DataIdentifier(blobId)); + return true; + } + + @Override + public Iterator resolveChunks(String blobId) throws IOException { + return Lists.newArrayList(blobId).iterator(); + } + + class DataStoreIterator implements Iterator { + Iterator backingIterator; + + public DataStoreIterator(Iterator backingIterator) { + this.backingIterator = backingIterator; + } + + @Override + public boolean hasNext() { + return backingIterator.hasNext(); + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements"); + } + return backingIterator.next().toString(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (working copy) @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import java.io.File; +import java.io.IOException; +import java.util.Comparator; + +import com.google.common.io.Files; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.commons.sort.ExternalSort; + +/** + * Class for keeping the file system state of the garbage collection. + * + * Also, manages any temporary files needed as well as external sorting. + * + */ +public class GarbageCollectorFileState { + + private final static String GC_DIR = "gc"; + + private final static String MARKED_PREFIX = "marked"; + + private final static String AVAIL_PREFIX = "avail"; + + private final static String GC_CANDIDATE_PREFIX = "gccand"; + + private final static String GC_PREFIX = "gc"; + + /** The startTime which records the starting time. */ + private long startTime; + + /** The root of the gc file state directory. */ + private File home; + + /** The marked references. */ + private File markedRefs; + + /** The available references. */ + private File availableRefs; + + /** The gc candidates. */ + private File gcCandidates; + + /** The garbage stores the garbage collection candidates which were not deleted . */ + private File garbage; + + /** + * Gets the home directory. + * + * @return the home + */ + protected File getHome() { + return home; + } + + /** + * Gets the file storing the marked references. + * + * @return the marked references + */ + protected File getMarkedRefs() { + return createMarkedRefsFile(); + } + + /** + * Gets the file storing the available references. + * + * @return the available references + */ + protected File getAvailableRefs() { + return createAvailableRefsFile(); + } + + /** + * Gets the file storing the gc candidates. + * + * @return the gc candidates + */ + protected File getGcCandidates() { + return createGcCandidatesFile(); + } + + /** + * Gets the storing the garbage. + * + * @return the garbage + */ + protected File getGarbage() { + return createGarbageFile(); + } + + /** + * Instantiates a new garbage collector file state. + * + * @param root + * the root + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public GarbageCollectorFileState(String root) throws IOException { + init(root); + } + + /** + * Initialize the state. + * + * @param root + * the root + * @throws IOException + * Signals that an I/O exception has occurred. + */ + private void init(String root) throws IOException { + startTime = System.currentTimeMillis(); + + home = new File(root, GC_DIR); + FileUtils.forceMkdir(home); + home.deleteOnExit(); + } + + /** + * Creates the marked references file. + * + * @return the file + */ + private File createMarkedRefsFile() { + if (markedRefs == null) { + markedRefs = new File(home, + (MARKED_PREFIX + "-" + startTime)); + markedRefs.deleteOnExit(); + } + return markedRefs; + } + + /** + * Creates the available references file. + * + * @return the file + */ + private File createAvailableRefsFile() { + if (availableRefs == null) { + availableRefs = new File(home, + (AVAIL_PREFIX + "-" + startTime)); + availableRefs.deleteOnExit(); + } + return availableRefs; + } + + /** + * Creates the gc candidates file. + * + * @return the file + */ + private File createGcCandidatesFile() { + if (gcCandidates == null) { + gcCandidates = new File(home, + (GC_CANDIDATE_PREFIX + "-" + startTime)); + gcCandidates.deleteOnExit(); + } + return gcCandidates; + } + + /** + * Creates the garbage file. + * + * @return the file + */ + private File createGarbageFile() { + if (garbage == null) { + garbage = new File(home, + (GC_PREFIX + "-" + startTime)); + garbage.deleteOnExit(); + } + return garbage; + } + + /** + * Creates a temp file. + * + * @return the file + * @throws IOException + * Signals that an I/O exception has occurred. + */ + protected File createTempFile() throws IOException { + return File.createTempFile("temp", null, home); + } + + /** + * Completes the process by deleting the files. + * + * @throws IOException + * Signals that an I/O exception has occurred. + */ + protected void complete() throws IOException { + if (!getGarbage().exists() || + FileUtils.sizeOf(getGarbage()) == 0) { + FileUtils.deleteDirectory(home); + } + } + + /** + * Sorts the given file externally. + * + * @param file + * the file + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public void sort(File file) throws IOException { + File sorted = createTempFile(); + Comparator lexComparator = new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.compareTo(s2); + } + }; + ExternalSort.mergeSortedFiles( + ExternalSort.sortInBatch(file, lexComparator, true), + sorted, lexComparator, true); + Files.move(sorted, file); + } +} \ No newline at end of file Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (working copy) @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.StandardSystemProperty; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.jackrabbit.mk.blobs.GarbageCollectableBlobStore; +import org.apache.jackrabbit.mk.util.IOUtils; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mark and sweep garbage collector. + * + * Uses the file system to store internal state while in process to account for huge data. + * + */ +public class MarkSweepGarbageCollector implements BlobGarbageCollector { + + public static Logger LOG = LoggerFactory.getLogger(MarkSweepGarbageCollector.class); + + public static String NEWLINE = StandardSystemProperty.LINE_SEPARATOR.value(); + + public static String TEMP_DIR = StandardSystemProperty.JAVA_IO_TMPDIR.value(); + + public final static int DEFAULT_BATCH_COUNT = 2048; + + /** The configured root to store gc process files. */ + private String root = TEMP_DIR; + + /** The batch count. */ + private int batchCount = DEFAULT_BATCH_COUNT; + + /** Run concurrently when possible. */ + boolean runConcurrently = true; + + /** The number of sweeper threads to use. */ + int numSweepers = 1; + + /** The node store. */ + private DocumentNodeStore nodeStore; + + /** The garbage collector file state */ + private GarbageCollectorFileState fs; + + /** + * Gets the root. + * + * @return the root + */ + protected String getRoot() { + return root; + } + + /** + * Gets the batch count. + * + * @return the batch count + */ + protected int getBatchCount() { + return batchCount; + } + + /** + * Checks if run concurrently. + * + * @return true, if is run concurrently + */ + protected boolean isRunConcurrently() { + return runConcurrently; + } + + /** + * Gets the number sweepers. + * + * @return the number sweepers + */ + protected int getNumSweepers() { + return numSweepers; + } + + /** + * Instantiates a new blob garbage collector. + * + * @param nodeStore + * the node store + * @param root + * the root + * @param batchCount + * the batch count + * @param runBackendConcurrently + * - run the backend iterate concurrently + * @param maxSweeperThreads + * the max sweeper threads + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public void init( + NodeStore nodeStore, + String root, + int batchCount, + boolean runBackendConcurrently, + int maxSweeperThreads) + throws IOException { + this.batchCount = batchCount; + this.root = root; + this.runConcurrently = runBackendConcurrently; + this.numSweepers = maxSweeperThreads; + init(nodeStore); + } + + /** + * Instantiates a new blob garbage collector. + * + * @param nodeStore + * the node store + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public void init(NodeStore nodeStore) throws IOException { + Preconditions.checkState(!Strings.isNullOrEmpty(root)); + this.nodeStore = (DocumentNodeStore) nodeStore; + fs = new GarbageCollectorFileState(root); + } + + @Override + public void collectGarbage() throws Exception { + markAndSweep(); + } + + /** + * Mark and sweep. Main method for GC. + * + * @throws Exception + * the exception + */ + protected void markAndSweep() throws Exception { + try { + LOG.debug("Starting garbage collector"); + + mark(); + difference(); + sweep(); + + LOG.debug("garbage collector finished"); + } finally { + fs.complete(); + } + } + + /** + * Mark phase of the GC. + * + * @throws Exception + * the exception + */ + protected void mark() throws Exception { + LOG.debug("Starting mark phase of the garbage collector"); + + // Find all blobs available in the blob store + Thread blobIdRetrieverThread = null; + if (runConcurrently) { + blobIdRetrieverThread = new Thread(new BlobIdRetriever(), (this.getClass().getSimpleName() + "-MarkThread")); + blobIdRetrieverThread.setDaemon(true); + blobIdRetrieverThread.start(); + } else { + (new BlobIdRetriever()).retrieve(); + } + + // Find all blob references after iterating over the whole repository + iterateNodeTree(); + + if (runConcurrently) { + if (blobIdRetrieverThread.isAlive()) { + blobIdRetrieverThread.join(); + } + } + + LOG.debug("Ending mark phase of the garbage collector"); + } + + /** + * Difference phase where the GC candidates are identified. + * + * @throws IOException + * Signals that an I/O exception has occurred. + */ + protected void difference() throws IOException { + LOG.debug("Starting difference phase of the garbage collector"); + + FileLineDifferenceIterator iter = new FileLineDifferenceIterator( + fs.getMarkedRefs(), + fs.getAvailableRefs()); + + BufferedWriter bufferWriter = null; + try { + bufferWriter = Files.newWriter(fs.getGcCandidates(), Charsets.UTF_8); + List expiredSet = Lists.newArrayList(); + + int numCandidates = 0; + while (iter.hasNext()) { + expiredSet.add(iter.next()); + if (expiredSet.size() > getBatchCount()) { + numCandidates += expiredSet.size(); + saveBatchToFile(expiredSet, bufferWriter); + } + } + + if (!expiredSet.isEmpty()) { + numCandidates += expiredSet.size(); + saveBatchToFile(expiredSet, bufferWriter); + } + LOG.debug("Found GC candidates - " + numCandidates); + } finally { + IOUtils.closeQuietly(bufferWriter); + } + + LOG.debug("Ending difference phase of the garbage collector"); + } + + /** + * Sweep phase of gc candidate deletion. + * + * @throws IOException + * Signals that an I/O exception has occurred. + */ + protected void sweep() throws IOException { + LOG.debug("Starting sweep phase of the garbage collector"); + + ConcurrentLinkedQueue exceptionQueue = new ConcurrentLinkedQueue(); + ExecutorService executorService = + new ThreadPoolExecutor(getNumSweepers(), getNumSweepers(), 1, + TimeUnit.MINUTES, + new LinkedBlockingQueue(), + new ThreadFactory() { + private final AtomicInteger threadCounter = new AtomicInteger(); + + private String getName() { + return "MarkSweepGarbageCollector-Sweeper-" + threadCounter.getAndIncrement(); + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, getName()); + thread.setDaemon(true); + return thread; + } + }); + + LineIterator iterator = FileUtils.lineIterator(fs.getGcCandidates(), Charsets.UTF_8.name()); + List ids = Lists.newArrayList(); + int count = 0; + while (iterator.hasNext()) { + ids.add(iterator.next()); + + if (ids.size() > getBatchCount()) { + count += ids.size(); + executorService.execute(new Sweeper(ids, exceptionQueue)); + ids = Lists.newArrayList(); + } + } + if (!ids.isEmpty()) { + count += ids.size(); + executorService.execute(new Sweeper(ids, exceptionQueue)); + } + + try { + executorService.shutdown(); + executorService.awaitTermination(100, TimeUnit.MINUTES); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + count -= exceptionQueue.size(); + BufferedWriter writer = null; + try { + if (!exceptionQueue.isEmpty()) { + writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8); + saveBatchToFile(Lists.newArrayList(exceptionQueue), writer); + } + } finally { + LineIterator.closeQuietly(iterator); + IOUtils.closeQuietly(writer); + } + + LOG.debug("Blobs deleted count - " + count); + LOG.debug("Ending sweep phase of the garbage collector"); + } + + /** + * Save batch to file. + * + * @param ids + * the ids + * @param writer + * the writer + * @throws IOException + * Signals that an I/O exception has occurred. + */ + private void saveBatchToFile(List ids, BufferedWriter writer) throws IOException { + writer.append(Joiner.on(NEWLINE).join(ids)); + writer.append(NEWLINE); + ids.clear(); + writer.flush(); + } + + /** + * Sweeper thread. + */ + class Sweeper implements Runnable { + + /** The exception queue. */ + ConcurrentLinkedQueue exceptionQueue; + + /** The ids to sweep. */ + List ids; + + /** + * Instantiates a new sweeper. + * + * @param ids + * the ids + * @param exceptionQueue + * the exception queue + */ + public Sweeper(List ids, ConcurrentLinkedQueue exceptionQueue) { + this.exceptionQueue = exceptionQueue; + this.ids = ids; + } + + @Override + public void run() { + for (String id : ids) { + try { + boolean deleted = ((GarbageCollectableBlobStore) nodeStore.getBlobStore()) + .deleteChunk(id); + if (!deleted) { + exceptionQueue.add(id); + } + } catch (Exception e) { + e.printStackTrace(); + exceptionQueue.add(id); + } + } + } + } + + /** + * Iterates the complete node tree. + * + * @param writer + * the writer + * @return the list + * @throws Exception + * the exception + */ + private List iterateNodeTree() throws Exception { + ArrayList referencedBlobs = Lists.newArrayList(); + BufferedWriter writer = null; + try { + writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8); + + fs.sort(fs.getMarkedRefs()); + + Iterator blobIterator = nodeStore.getReferencedBlobsIterator(); + referencedBlobs.ensureCapacity(getBatchCount()); + + int referencesFound = 0; + while (blobIterator.hasNext()) { + Blob blob = blobIterator.next(); + if (blob.toString().length() != 0) { + Iterator idIter = ((GarbageCollectableBlobStore) nodeStore + .getBlobStore()) + .resolveChunks(blob.toString()); + while (idIter.hasNext()) { + referencedBlobs.add(idIter.next()); + } + } + + if (referencedBlobs.size() >= getBatchCount()) { + referencesFound += referencedBlobs.size(); + saveBatchToFile(referencedBlobs, writer); + } + } + + if (!referencedBlobs.isEmpty()) { + referencesFound += referencedBlobs.size(); + saveBatchToFile(referencedBlobs, writer); + } + fs.sort(fs.getMarkedRefs()); + + LOG.debug("Blob references found (including chunk resolution) " + referencesFound); + } finally { + IOUtils.closeQuietly(writer); + } + return referencedBlobs; + } + + /** + * BlobIdRetriever class to retrieve all blob ids. + */ + class BlobIdRetriever implements Runnable { + + boolean finished = false; + + @Override + public void run() { + retrieve(); + } + + /** + * Retrieve. + */ + protected void retrieve() { + LOG.debug("Starting retrieve of all blobs"); + + BufferedWriter bufferWriter = null; + try { + bufferWriter = new BufferedWriter( + new FileWriter(fs.getAvailableRefs())); + Iterator idsIter = ((GarbageCollectableBlobStore) nodeStore.getBlobStore()) + .getAllChunkIds(0); + List ids = Lists.newArrayList(); + int blobsCount = 0; + while (idsIter.hasNext()) { + ids.add(idsIter.next()); + if (ids.size() > getBatchCount()) { + blobsCount += ids.size(); + saveBatchToFile(ids, bufferWriter); + } + } + + if (!ids.isEmpty()) { + blobsCount += ids.size(); + saveBatchToFile(ids, bufferWriter); + } + + // sort the file + fs.sort(fs.getAvailableRefs()); + + finished = true; + LOG.debug("Ending retrieve of all blobs : " + blobsCount); + } catch (Exception e) { + e.printStackTrace(); + } finally { + IOUtils.closeQuietly(bufferWriter); + } + } + } + + /** + * FileLineDifferenceIterator class which iterates over the difference of 2 files line by line. + * + * @param + * the generic type + */ + class FileLineDifferenceIterator implements Iterator { + + /** The marked references iterator. */ + private LineIterator markedIter; + + /** The available references iter. */ + private LineIterator allIter; + + private ArrayDeque queue; + + private boolean done = false; + + /** Temporary buffer. */ + private TreeSet markedBuffer; + + /** + * Instantiates a new file line difference iterator. + * + * @param marked + * the marked + * @param available + * the available + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public FileLineDifferenceIterator(File marked, File available) throws IOException { + this.markedIter = FileUtils.lineIterator(marked); + this.allIter = FileUtils.lineIterator(available); + queue = new ArrayDeque(getBatchCount()); + markedBuffer = Sets.newTreeSet(); + } + + /** + * Close. + */ + private void close() { + LineIterator.closeQuietly(markedIter); + LineIterator.closeQuietly(allIter); + } + + @Override + public boolean hasNext() { + if (!queue.isEmpty()) { + return true; + } else if (done) { + return false; + } else { + if (!markedIter.hasNext() && !allIter.hasNext()) { + done = true; + close(); + return false; + } else { + queue.addAll(difference()); + if (!queue.isEmpty()) { + return true; + } else { + done = true; + close(); + } + } + } + + return false; + } + + @Override + public String next() { + return nextDifference(); + } + + /** + * Next difference. + * + * @return the string + */ + public String nextDifference() { + if (!hasNext()) { + throw new NoSuchElementException("No more difference"); + } + return queue.remove(); + } + + /** + * Difference. + * + * @return the sets the + */ + protected Set difference() { + TreeSet gcSet = new TreeSet(); + + // Iterate till the gc candidate set is at least SAVE_BATCH_COUNT or + // the + // blob id set iteration is complete + while (allIter.hasNext() && + gcSet.size() < getBatchCount()) { + TreeSet allBuffer = new TreeSet(); + + while (markedIter.hasNext() && + markedBuffer.size() < getBatchCount()) { + String stre = markedIter.next(); + markedBuffer.add(stre); + } + while (allIter.hasNext() && + allBuffer.size() < getBatchCount()) { + String stre = allIter.next(); + allBuffer.add(stre); + } + + if (markedBuffer.isEmpty()) { + gcSet = allBuffer; + } else { + gcSet.addAll( + Sets.difference(allBuffer, markedBuffer)); + + if (allBuffer.last().compareTo(markedBuffer.last()) < 0) { + // filling markedLeftoverBuffer + TreeSet markedLeftoverBuffer = Sets.newTreeSet(); + markedLeftoverBuffer.addAll(markedBuffer.tailSet(allBuffer.last(), false)); + markedBuffer = markedLeftoverBuffer; + markedLeftoverBuffer = null; + } else { + markedBuffer.clear(); + } + } + } + + return gcSet; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java (revision 1567531) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIterator.java (working copy) @@ -110,10 +110,12 @@ } Map valueMap = doc.getLocalMap(key); for (String v : valueMap.values()) { + if (v != null) { loadValue(v); } } } + } return hasMore; } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1567531) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -1594,7 +1594,7 @@ return blobSerializer; } - Iterator getReferencedBlobsIterator() { + public Iterator getReferencedBlobsIterator() { return new BlobReferenceIterator(this); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java (revision 1567531) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java (working copy) @@ -17,15 +17,14 @@ package org.apache.jackrabbit.oak.plugins.document.mongo; import java.io.IOException; +import java.util.Iterator; -import org.apache.jackrabbit.mk.blobs.AbstractBlobStore; -import org.apache.jackrabbit.mk.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.google.common.collect.AbstractIterator; import com.mongodb.BasicDBObject; +import com.mongodb.Bytes; import com.mongodb.DB; import com.mongodb.DBCollection; +import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoException; import com.mongodb.QueryBuilder; @@ -32,6 +31,11 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteResult; +import org.apache.jackrabbit.mk.blobs.AbstractBlobStore; +import org.apache.jackrabbit.mk.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Implementation of blob store for the MongoDB extending from * {@link AbstractBlobStore}. It saves blobs into a separate collection in @@ -187,4 +191,46 @@ return queryBuilder.get(); } + @Override + public boolean deleteChunk(String chunkId) throws Exception { + DBCollection collection = getBlobCollection(); + BasicDBObject removeObj = new BasicDBObject(); + removeObj.append(MongoBlob.KEY_ID, chunkId); + + WriteResult result = collection.remove(removeObj); + if (result.getN() == 1) { + return true; + } + + return false; + } + + @Override + public Iterator getAllChunkIds(long maxLastModifiedTime) throws Exception { + DBCollection collection = getBlobCollection(); + + DBObject fields = new BasicDBObject(); + fields.put(MongoBlob.KEY_ID, 1); + + QueryBuilder builder = new QueryBuilder(); + if (maxLastModifiedTime != 0 && maxLastModifiedTime != -1) { + builder.and(MongoBlob.KEY_LAST_MOD).lessThanEquals(maxLastModifiedTime); + } + + final DBCursor cur = + collection.find(builder.get(), fields).hint(fields) + .addOption(Bytes.QUERYOPTION_SLAVEOK); + + return new AbstractIterator() { + protected String computeNext() { + if (cur.hasNext()) { + MongoBlob blob = (MongoBlob) cur.next(); + if (blob != null) { + return blob.getId(); + } + } + return endOfData(); + } + }; + } } \ No newline at end of file Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java (revision 1567531) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java (working copy) @@ -25,9 +25,12 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Iterator; import javax.sql.DataSource; +import com.google.common.collect.AbstractIterator; + import org.apache.jackrabbit.mk.api.MicroKernelException; import org.apache.jackrabbit.mk.blobs.AbstractBlobStore; import org.apache.jackrabbit.mk.util.StringUtils; @@ -35,7 +38,6 @@ import org.slf4j.LoggerFactory; public class RDBBlobStore extends AbstractBlobStore implements Closeable { - /** * Creates a {@linkplain RDBBlobStore} instance using an embedded H2 * database in in-memory mode. @@ -282,4 +284,61 @@ connection.commit(); } } + + @Override + public boolean deleteChunk(String chunkId) throws Exception { + try { + PreparedStatement prep = connection.prepareStatement( + "delete from datastore_meta where id = ?"); + PreparedStatement prepData = connection.prepareStatement( + "delete from datastore_data where id = ?"); + prep.setString(1, chunkId); + prep.execute(); + prepData.setString(1, chunkId); + prepData.execute(); + + prep.close(); + prepData.close(); + } finally { + connection.commit(); } + + return true; + } + + @Override + public Iterator getAllChunkIds(long maxLastModifiedTime) throws Exception { + PreparedStatement prep = null; + + if ((maxLastModifiedTime != 0) && (maxLastModifiedTime != -1)) { + prep = connection.prepareStatement( + "select id from datastore_meta where lastMod <= ?"); + prep.setLong(1, maxLastModifiedTime); + } else { + prep = connection.prepareStatement( + "select id from datastore_meta"); + } + + final ResultSet rs = prep.executeQuery(); + + return new AbstractIterator() { + protected String computeNext() { + try { + if (rs.next()) { + return rs.getString(1); + } else { + rs.close(); + } + } catch (SQLException e) { + try { + if ((rs != null) && !rs.isClosed()) { + rs.close(); + } + } catch (Exception se) { + } + } + return endOfData(); + } + }; + } +} Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/AbstractBlobStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/AbstractBlobStoreTest.java (revision 1567531) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/AbstractBlobStoreTest.java (working copy) @@ -30,9 +30,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Sets; + import org.apache.jackrabbit.mk.blobs.BlobStore; import org.apache.jackrabbit.mk.blobs.BlobStoreInputStream; import org.apache.jackrabbit.mk.blobs.FileBlobStore; @@ -116,6 +120,7 @@ public void close() { closed.set(true); } + @Override public int read() throws IOException { return -1; @@ -133,6 +138,7 @@ public void close() { closed.set(true); } + @Override public int read() throws IOException { throw new RuntimeException("abc"); @@ -361,4 +367,52 @@ list.add(file.getAbsolutePath()); } + @Test + public void list() throws Exception { + Set ids = createArtifacts(); + + Iterator iter = store.getAllChunkIds(0); + while (iter.hasNext()) { + ids.remove(iter.next()); } + + assertTrue(ids.isEmpty()); + } + + @Test + public void delete() throws Exception { + Set ids = createArtifacts(); + + for (String id : ids) { + store.deleteChunk(id); + } + + Iterator iter = store.getAllChunkIds(0); + Set ret = Sets.newHashSet(); + while (iter.hasNext()) { + ret.add(iter.next()); + } + + assertTrue(ret.isEmpty()); + } + + private Set createArtifacts() throws Exception { + Set ids = Sets.newHashSet(); + int number = 10; + for (int i = 0; i < number; i++) { + String id = store.writeBlob(randomStream(i, 2080)); + Iterator iter = store.resolveChunks(id.toString()); + while (iter.hasNext()) { + ids.add(iter.next()); + } + } + return ids; + } + + static InputStream randomStream(int seed, int size) { + Random r = new Random(seed); + byte[] data = new byte[size]; + r.nextBytes(data); + return new ByteArrayInputStream(data); + } +} Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/CloudStoreUtils.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/CloudStoreUtils.java (revision 1567531) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/CloudStoreUtils.java (working copy) @@ -33,7 +33,7 @@ * @throws Exception * the exception */ - protected static AbstractBlobStore getBlobStore() throws Exception { + public static AbstractBlobStore getBlobStore() throws Exception { BlobStoreConfiguration config = BlobStoreConfiguration.newInstance().loadFromSystemProps(); config.addProperty( Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java (working copy) @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.blob.cloud; + +import org.apache.jackrabbit.oak.plugins.blob.cloud.CloudBlobStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.MongoBlobGCTest; +import org.apache.jackrabbit.oak.plugins.document.MongoUtils; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * Test for MongoMK GC with {@link CloudBlobStore} + * + */ +public class MongoCloudBlobGCTest extends MongoBlobGCTest { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + try { + Assume.assumeNotNull(CloudStoreUtils.getBlobStore()); + } catch (Exception e) { + Assume.assumeNoException(e); + } + } + + @Before + public void setUpConnection() throws Exception { + mongoConnection = MongoUtils.getConnection(); + MongoUtils.dropCollections(mongoConnection.getDB()); + mk = new DocumentMK.Builder().setMongoDB(mongoConnection.getDB()) + .setBlobStore(CloudStoreUtils.getBlobStore()).open(); + } + + @After + public void tearDownConnection() throws Exception { + ((CloudBlobStore) mk.getNodeStore().getBlobStore()).deleteBucket(); + mk.dispose(); + // the db might already be closed + mongoConnection.close(); + mongoConnection = MongoUtils.getConnection(); + MongoUtils.dropCollections(mongoConnection.getDB()); + mongoConnection.close(); + } +} Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java (revision 1567531) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/DataStoreUtils.java (working copy) @@ -36,7 +36,7 @@ * @throws Exception * the exception */ - protected static DataStoreBlobStore getBlobStore() throws Exception { + public static DataStoreBlobStore getBlobStore() throws Exception { BlobStoreConfiguration config = BlobStoreConfiguration.newInstance().loadFromSystemProps(); config.addProperty( Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java (working copy) @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.blob.ds; + +import java.io.File; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.MongoBlobGCTest; +import org.apache.jackrabbit.oak.plugins.document.MongoUtils; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * Test for MongoMK GC with {@link DataStoreBlobStore} + * + */ +public class MongoDataStoreBlobGCTest extends MongoBlobGCTest { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + try { + Assume.assumeNotNull(DataStoreUtils.getBlobStore()); + } catch (Exception e) { + Assume.assumeNoException(e); + } + } + + @Before + public void setUpConnection() throws Exception { + mongoConnection = MongoUtils.getConnection(); + MongoUtils.dropCollections(mongoConnection.getDB()); + mk = new DocumentMK.Builder().setMongoDB(mongoConnection.getDB()) + .setBlobStore(DataStoreUtils.getBlobStore()).open(); + } + + @After + public void tearDownConnection() throws Exception { + FileUtils.deleteDirectory(new File(DataStoreUtils.PATH)); + mk.dispose(); + // the db might already be closed + mongoConnection.close(); + mongoConnection = MongoUtils.getConnection(); + MongoUtils.dropCollections(mongoConnection.getDB()); + mongoConnection.close(); + } +} Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (working copy) @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; + +import org.apache.jackrabbit.mk.blobs.GarbageCollectableBlobStore; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.junit.Test; + +/** + * Tests for MongoMK GC + */ +public class MongoBlobGCTest extends AbstractMongoConnectionTest { + + public HashSet setUp() throws Exception { + HashSet set = new HashSet(); + + DocumentNodeStore s = mk.getNodeStore(); + NodeBuilder a = s.getRoot().builder(); + + int number = 10; + // track the number of the assets to be deleted + List processed = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5; i++) { + int n = rand.nextInt(number); + if (!processed.contains(n)) { + processed.add(n); + } + } + for (int i = 0; i < number; i++) { + Blob b = s.createBlob(randomStream(i, 4160)); + if (processed.contains(i)) { + Iterator idIter = + ((GarbageCollectableBlobStore) s.getBlobStore()) + .resolveChunks(b.toString()); + while (idIter.hasNext()) { + set.add(idIter.next()); + } + } + a.child("c" + i).setProperty("x", b); + } + s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + for (int id : processed) { + delete("c" + id); + } + + return set; + } + + private void delete(String nodeId) { + DBCollection coll = mongoConnection.getDB().getCollection("nodes"); + BasicDBObject blobNodeObj = new BasicDBObject(); + blobNodeObj.put("_id", "1:/" + nodeId); + coll.remove(blobNodeObj); + } + + @Test + public void gc() throws Exception { + HashSet set = setUp(); + + DocumentNodeStore s = mk.getNodeStore(); + MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(); + gc.init(s, "./target", 2048, true, 2); + gc.collectGarbage(); + + Set existing = iterate(); + boolean empty = Sets.intersection(set, existing).isEmpty(); + assertTrue(empty); + } + + protected Set iterate() throws Exception { + GarbageCollectableBlobStore store = (GarbageCollectableBlobStore) + mk.getNodeStore().getBlobStore(); + Iterator cur = store.getAllChunkIds(0); + + Set existing = Sets.newHashSet(); + while (cur.hasNext()) { + existing.add((String) cur.next()); + } + return existing; + } + + static InputStream randomStream(int seed, int size) { + Random r = new Random(seed); + byte[] data = new byte[size]; + r.nextBytes(data); + return new ByteArrayInputStream(data); + } +} Index: oak-mk/pom.xml =================================================================== --- oak-mk/pom.xml (revision 1567531) +++ oak-mk/pom.xml (working copy) @@ -119,6 +119,13 @@ provided + + + commons-io + commons-io + 1.4 + + com.googlecode.json-simple @@ -136,12 +143,6 @@ logback-classic test - - commons-io - commons-io - 1.4 - test - Index: oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStore.java =================================================================== --- oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStore.java (revision 1567531) +++ oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStore.java (working copy) @@ -16,10 +16,6 @@ */ package org.apache.jackrabbit.mk.blobs; -import org.apache.jackrabbit.mk.util.Cache; -import org.apache.jackrabbit.mk.util.IOUtils; -import org.apache.jackrabbit.mk.util.StringUtils; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -29,13 +25,20 @@ import java.lang.ref.WeakReference; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.Map; +import java.util.NoSuchElementException; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicReference; +import org.apache.jackrabbit.mk.util.Cache; +import org.apache.jackrabbit.mk.util.IOUtils; +import org.apache.jackrabbit.mk.util.StringUtils; + /** * An abstract data store that splits the binaries in relatively small blocks, * so that each block fits in memory. @@ -421,6 +424,11 @@ } } + @Override + public Iterator resolveChunks(String blobId) throws IOException { + return new ChunkIterator(blobId); + } + /** * A block id. Blocks are small enough to fit in memory, so they can be * cached. @@ -493,4 +501,80 @@ } + class ChunkIterator implements Iterator { + final static int BATCH = 2048; + ArrayDeque queue; + ArrayDeque streamsStack; + + public ChunkIterator(String blobId) { + byte[] id = StringUtils.convertHexToBytes(blobId); + ByteArrayInputStream idStream = new ByteArrayInputStream(id); + queue = new ArrayDeque(BATCH); + streamsStack = new ArrayDeque(); + streamsStack.push(idStream); } + + @Override + public boolean hasNext() { + if (!queue.isEmpty()) { + return true; + } else { + try { + while ((queue.size() < BATCH) && + (streamsStack.peekFirst() != null)) { + ByteArrayInputStream idStream = streamsStack.peekFirst(); + + int type = idStream.read(); + if (type == -1) { + streamsStack.pop(); + } else if (type == TYPE_DATA) { + int len = IOUtils.readVarInt(idStream); + IOUtils.skipFully(idStream, len); + } else if (type == TYPE_HASH) { + int level = IOUtils.readVarInt(idStream); + // totalLength + IOUtils.readVarLong(idStream); + if (level > 0) { + // block length (ignored) + IOUtils.readVarLong(idStream); + } + byte[] digest = new byte[IOUtils.readVarInt(idStream)]; + IOUtils.readFully(idStream, digest, 0, digest.length); + if (level > 0) { + queue.add(StringUtils.convertBytesToHex(digest)); + byte[] block = readBlock(digest, 0); + idStream = new ByteArrayInputStream(block); + streamsStack.push(idStream); + } else { + queue.add(StringUtils.convertBytesToHex(digest)); + } + } else { + break; + } + } + } catch (Exception e) { + } + // Check now if ids available + if (!queue.isEmpty()) { + return true; + } + } + + return false; + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException("No data"); + } else { + return queue.remove(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + } +} Index: oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/DbBlobStore.java =================================================================== --- oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/DbBlobStore.java (revision 1567531) +++ oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/DbBlobStore.java (working copy) @@ -16,9 +16,6 @@ */ package org.apache.jackrabbit.mk.blobs; -import org.apache.jackrabbit.mk.util.StringUtils; -import org.h2.jdbcx.JdbcConnectionPool; - import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -26,7 +23,13 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Iterator; +import com.google.common.collect.AbstractIterator; + +import org.apache.jackrabbit.mk.util.StringUtils; +import org.h2.jdbcx.JdbcConnectionPool; + /** * A database blob store. */ @@ -210,4 +213,63 @@ return count; } + @Override + public boolean deleteChunk(String chunkId) throws Exception { + Connection conn = cp.getConnection(); + try { + PreparedStatement prep = conn.prepareStatement( + "delete from datastore_meta where id = ?"); + PreparedStatement prepData = conn.prepareStatement( + "delete from datastore_data where id = ?"); + prep.setString(1, chunkId); + prep.execute(); + prepData.setString(1, chunkId); + prepData.execute(); + + prep.close(); + prepData.close(); + } finally { + conn.commit(); + conn.close(); } + + return true; + } + + @Override + public Iterator getAllChunkIds(long maxLastModifiedTime) throws Exception { + final Connection conn = cp.getConnection(); + PreparedStatement prep = null; + + if ((maxLastModifiedTime != 0) && (maxLastModifiedTime != -1)) { + prep = conn.prepareStatement( + "select id from datastore_meta where lastMod <= ?"); + prep.setLong(1, maxLastModifiedTime); + } else { + prep = conn.prepareStatement( + "select id from datastore_meta"); + } + + final ResultSet rs = prep.executeQuery(); + + return new AbstractIterator() { + protected String computeNext() { + try { + if (rs.next()) { + return rs.getString(1); + } else { + conn.close(); + } + } catch (SQLException e) { + try { + if ((conn != null) && !conn.isClosed()) { + conn.close(); + } + } catch (Exception se) { + } + } + return endOfData(); + } + }; + } +} Index: oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/FileBlobStore.java =================================================================== --- oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/FileBlobStore.java (revision 1567531) +++ oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/FileBlobStore.java (working copy) @@ -16,9 +16,6 @@ */ package org.apache.jackrabbit.mk.blobs; -import org.apache.jackrabbit.mk.util.IOUtils; -import org.apache.jackrabbit.mk.util.StringUtils; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; @@ -29,7 +26,20 @@ import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Iterator; +import javax.annotation.Nullable; + +import com.google.common.base.Predicate; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.FluentIterable; +import com.google.common.io.Files; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.jackrabbit.mk.util.IOUtils; +import org.apache.jackrabbit.mk.util.StringUtils; + /** * A file blob store. */ @@ -36,6 +46,7 @@ public class FileBlobStore extends AbstractBlobStore { private static final String OLD_SUFFIX = "_old"; + private static final String FILE_SUFFIX = ".dat"; private final File baseDir; private final byte[] buffer = new byte[16 * 1024]; @@ -117,7 +128,7 @@ if (old) { sub2 += OLD_SUFFIX; } - return new File(new File(new File(baseDir, sub1), sub2), id + ".dat"); + return new File(new File(new File(baseDir, sub1), sub2), id + FILE_SUFFIX); } @Override @@ -207,4 +218,44 @@ return count; } + @Override + public boolean deleteChunk(String chunkId) throws Exception { + byte[] digest = StringUtils.convertHexToBytes(chunkId); + File f = getFile(digest, false); + if (!f.exists()) { + File old = getFile(digest, true); + f.getParentFile().mkdir(); + old.renameTo(f); + f = getFile(digest, false); + } + f.delete(); + return mark; + } + + @Override + public Iterator getAllChunkIds(final long maxLastModifiedTime) throws Exception { + FluentIterable iterable = Files.fileTreeTraverser().postOrderTraversal(baseDir); + final Iterator iter = + iterable.filter(new Predicate() { + // Ignore the directories and files newer than maxLastModifiedTime if specified + @Override + public boolean apply(@Nullable File input) { + if (!input.isDirectory() && ( + (maxLastModifiedTime == 0 || maxLastModifiedTime == -1) || + FileUtils.isFileOlder(input, maxLastModifiedTime))) { + return true; + } + return false; + } + }).iterator(); + return new AbstractIterator() { + protected String computeNext() { + if (iter.hasNext()) { + File file = iter.next(); + return FilenameUtils.removeExtension(file.getName()); + } + return endOfData(); + } + }; + } } \ No newline at end of file Index: oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/GarbageCollectableBlobStore.java =================================================================== --- oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/GarbageCollectableBlobStore.java (revision 1567531) +++ oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/GarbageCollectableBlobStore.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.mk.blobs; import java.io.IOException; +import java.util.Iterator; /** * A blob store that support garbage collection. @@ -70,4 +71,34 @@ */ long getBlockSizeMin(); + /** + * Gets all the identifiers. + * + * @param maxLastModifiedTime + * the max last modified time to consider for retrieval + * @return the identifiers + * @throws Exception + * the exception + */ + Iterator getAllChunkIds(long maxLastModifiedTime) throws Exception; + + /** + * Delete the blob with the given id. + * + * @param chunkId the chunk id + * @return true, if successful + * @throws Exception + * the exception + */ + boolean deleteChunk(String chunkId) throws Exception; + + /** + * Resolve chunks from the given Id. + * + * @param blobId the blob id + * @return the iterator + * @throws IOException + * Signals that an I/O exception has occurred. + */ + Iterator resolveChunks(String blobId) throws IOException; } Index: oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/MemoryBlobStore.java =================================================================== --- oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/MemoryBlobStore.java (revision 1567531) +++ oak-mk/src/main/java/org/apache/jackrabbit/mk/blobs/MemoryBlobStore.java (working copy) @@ -18,7 +18,13 @@ import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Maps; + +import org.apache.jackrabbit.mk.util.StringUtils; + /** * A memory blob store. Useful for testing. */ @@ -74,4 +80,38 @@ return count; } + @Override + public boolean deleteChunk(String chunkId) throws Exception { + BlockId id = new BlockId(StringUtils.convertHexToBytes(chunkId), 0); + if (map.containsKey(id)) { + map.remove(id); + } else if (old.containsKey(id)) { + old.remove(id); } + return true; + } + + /** + * Ignores the maxlastModifiedTime + */ + @Override + public Iterator getAllChunkIds(long maxLastModifiedTime) throws Exception { + HashMap combinedMap = Maps.newHashMap(); + combinedMap.putAll(map); + combinedMap.putAll(old); + final Iterator iter = combinedMap.keySet().iterator(); + + return new AbstractIterator() { + protected String computeNext() { + if (iter.hasNext()) { + BlockId blockId = iter.next(); + if (blockId != null) { + return StringUtils.convertBytesToHex(blockId.getDigest()); + } + } + return endOfData(); + } + }; + } + +} Index: oak-mk/src/test/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStoreTest.java =================================================================== --- oak-mk/src/test/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStoreTest.java (revision 1567531) +++ oak-mk/src/test/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStoreTest.java (working copy) @@ -26,13 +26,19 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.TestCase; + +import com.google.common.collect.Sets; + import org.apache.jackrabbit.mk.json.JsopBuilder; import org.apache.jackrabbit.mk.json.JsopTokenizer; import org.apache.jackrabbit.mk.util.IOUtilsTest; +import org.junit.Test; /** * Tests a BlobStore implementation. @@ -96,6 +102,7 @@ public void close() { closed.set(true); } + public int read() throws IOException { return -1; } @@ -110,6 +117,7 @@ public void close() { closed.set(true); } + public int read() throws IOException { throw new RuntimeException("abc"); } @@ -123,7 +131,6 @@ assertTrue(closed.get()); } - public void testIllegalIdentifier() throws Exception { byte[] data = new byte[1]; try { @@ -335,4 +342,52 @@ list.add(file.getAbsolutePath()); } + @Test + public void testList() throws Exception { + Set ids = createArtifacts(); + + Iterator iter = store.getAllChunkIds(0); + while (iter.hasNext()) { + ids.remove(iter.next()); } + + assertTrue(ids.isEmpty()); + } + + @Test + public void testDelete() throws Exception { + Set ids = createArtifacts(); + + for (String id : ids) { + store.deleteChunk(id); + } + + Iterator iter = store.getAllChunkIds(0); + Set ret = Sets.newHashSet(); + while (iter.hasNext()) { + ret.add(iter.next()); + } + + assertTrue(ret.isEmpty()); + } + + private Set createArtifacts() throws Exception { + Set ids = Sets.newHashSet(); + int number = 10; + for (int i = 0; i < number; i++) { + String id = store.writeBlob(randomStream(i, 4160)); + Iterator iter = store.resolveChunks(id.toString()); + while (iter.hasNext()) { + ids.add(iter.next()); + } + } + return ids; + } + + static InputStream randomStream(int seed, int size) { + Random r = new Random(seed); + byte[] data = new byte[size]; + r.nextBytes(data); + return new ByteArrayInputStream(data); + } +}