Index: src/test/java/org/apache/jackrabbit/core/persistence/PersistenceManagerTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/core/persistence/PersistenceManagerTest.java (revision 1348513) +++ src/test/java/org/apache/jackrabbit/core/persistence/PersistenceManagerTest.java (working copy) @@ -35,6 +35,7 @@ import org.apache.jackrabbit.core.persistence.PersistenceManager; import org.apache.jackrabbit.core.persistence.mem.InMemBundlePersistenceManager; import org.apache.jackrabbit.core.persistence.mem.InMemPersistenceManager; +import org.apache.jackrabbit.core.persistence.mongodb.MongoBundlePersistenceManager; import org.apache.jackrabbit.core.persistence.obj.ObjectPersistenceManager; import org.apache.jackrabbit.core.persistence.xml.XMLPersistenceManager; import org.apache.jackrabbit.core.state.ChangeLog; @@ -113,6 +114,13 @@ assertPersistenceManager(manager); } + public void testMongodbPersistenceManager() throws Exception { + MongoBundlePersistenceManager theMongoBundlePersistenceManager = new MongoBundlePersistenceManager(); + theMongoBundlePersistenceManager.setSchema("mongodbtest"); + theMongoBundlePersistenceManager.setHost("localhost"); + assertPersistenceManager(theMongoBundlePersistenceManager); + + } public void testH2PoolPersistenceManager() throws Exception { org.apache.jackrabbit.core.persistence.pool.H2PersistenceManager manager = new org.apache.jackrabbit.core.persistence.pool.H2PersistenceManager(); Index: src/main/java/org/apache/jackrabbit/core/persistence/mongodb/MongoBundlePersistenceManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/persistence/mongodb/MongoBundlePersistenceManager.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/persistence/mongodb/MongoBundlePersistenceManager.java (revision 0) @@ -0,0 +1,473 @@ +package org.apache.jackrabbit.core.persistence.mongodb; + + +import com.mongodb.*; +import com.mongodb.gridfs.GridFS; +import com.mongodb.gridfs.GridFSDBFile; +import com.mongodb.gridfs.GridFSInputFile; +import org.apache.jackrabbit.core.id.NodeId; +import org.apache.jackrabbit.core.id.PropertyId; +import org.apache.jackrabbit.core.persistence.PMContext; +import org.apache.jackrabbit.core.persistence.bundle.AbstractBundlePersistenceManager; +import org.apache.jackrabbit.core.persistence.util.*; +import org.apache.jackrabbit.core.state.ItemStateException; +import org.apache.jackrabbit.core.state.NoSuchItemStateException; +import org.apache.jackrabbit.core.state.NodeReferences; +import org.bson.types.Binary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jcr.RepositoryException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +/** + * Created with IntelliJ IDEA. + * User: linhui + * Date: 6/11/12 + * Time: 10:47 AM + * To change this template use File | Settings | File Templates. + */ + + +public class MongoBundlePersistenceManager extends AbstractBundlePersistenceManager { + + protected DB itsDb; + protected DBCollection itsNodeCollection; + + + /** + * flag for error handling + */ + protected ErrorHandling errorHandling = new ErrorHandling(); + protected BundleBinding binding; + /** + * the minimum size of a property until it gets written to the blob store + */ + private int minBlobSize = 0x1000; + /** + * the name of this persistence manager + */ + private String name = super.toString(); + + private String itsSchema; + + private boolean consistencyCheck; + + /** + * the default logger + */ + private static Logger log = LoggerFactory.getLogger(org.apache.jackrabbit.core.persistence.mongodb.MongoBundlePersistenceManager.class); + protected DBCollection itsRefCollection; + protected BLOBStore itsBlobStore; + private String itsHost; + + @Override + protected NodePropBundle loadBundle(NodeId id) throws ItemStateException { + BasicDBObject query = new BasicDBObject(); + query.put("_id", id.toString()); + BasicDBObject theDBObject = (BasicDBObject) itsNodeCollection.findOne(query); + return theDBObject == null ? null : toBundle(id, theDBObject); + } + + protected NodePropBundle toBundle(NodeId id, BasicDBObject inDBObject) throws ItemStateException { + Object theD = inDBObject.get("d"); + InputStream in = new ByteArrayInputStream(theD instanceof Binary ? ((Binary) theD).getData() : (byte[]) theD); + if (id == null) { + id = NodeId.valueOf(inDBObject.getString("_id")); + } + try { + return binding.readBundle(in, id); + } catch (IOException e) { + String msg = "failed to read bundle: " + id + ": " + e; + log.error(msg, e); + throw new ItemStateException(msg, e); + } + } + + //todo: figure out how to added bundle into bundle cache + protected List loadBundle(List ids) throws ItemStateException { + List theIds = new ArrayList(ids.size()); + for (NodeId theId : ids) { + theIds.add(theId.toString()); + } + DBObject query = QueryBuilder.start().in(theIds).get(); + List theResult = itsNodeCollection.find(query).toArray(); + List theBundles = new ArrayList(theResult.size()); + for (DBObject theDBObject : theResult) { + theBundles.add(toBundle(null, (BasicDBObject) theDBObject)); + } + return theBundles; + } + + /** + * get children id by parent id + * @param inParentId + * @return + * @throws ItemStateException + */ + protected List loadChildrenNodeId(NodeId inParentId) throws ItemStateException { + BasicDBObject query = new BasicDBObject(); + query.put("_p", inParentId.toString()); + DBCursor theCusor = itsNodeCollection.find(query); + + List theBundleDb = new LinkedList(); + while (theCusor.hasNext()) { + BasicDBObject theDBObject = (BasicDBObject) theCusor.next(); + theBundleDb.add(NodeId.valueOf(theDBObject.getString("_id"))); + } + return theBundleDb; + } + + + /** + * initial size of buffer used to serialize objects + */ + protected static final int INITIAL_BUFFER_SIZE = 1024; + + @Override + protected synchronized void storeBundle(NodePropBundle bundle) throws ItemStateException { + try { + DBObject theQuery = new BasicDBObject(); + theQuery.put("_id", bundle.getId().toString()); + + DBObject theDbObject = new BasicDBObject(); + theDbObject.put("_id", bundle.getId().toString()); + ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); + binding.writeBundle(out, bundle); + theDbObject.put("d", out.toByteArray()); + + if (bundle.getParentId() != null) { + theDbObject.put("_p", bundle.getParentId().toString()); + } + + itsNodeCollection.update(theQuery, theDbObject, true, false); + + } catch (IOException e) { + String msg = "failed to write bundle: " + bundle.getId() + ": " + e; + log.error(msg, e); + throw new ItemStateException(msg, e); + } + } + + @Override + protected synchronized void destroyBundle(NodePropBundle bundle) throws ItemStateException { + //todo: delete entire subtree in single query + DBObject theDBObject = new BasicDBObject(); + theDBObject.put("_id", bundle.getId().toString()); + itsNodeCollection.remove(theDBObject); + } + + @Override + protected synchronized void destroy(NodeReferences refs) throws ItemStateException { + DBObject theDBObject = new BasicDBObject(); + theDBObject.put("_id", refs.getTargetId().toString()); + itsRefCollection.remove(theDBObject); + } + + @Override + protected synchronized void store(NodeReferences refs) throws ItemStateException { + + DBObject theQuery = new BasicDBObject(); + theQuery.put("_id", refs.getTargetId().toString()); + + DBObject theRef = new BasicDBObject(); + theRef.put("_id", refs.getTargetId().toString()); + + ByteArrayOutputStream out = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE); + // serialize references + try { + org.apache.jackrabbit.core.persistence.util.Serializer.serialize(refs, out); + theRef.put("d", out.toByteArray()); + itsRefCollection.update(theQuery, theRef, true, false); + } catch (Exception e) { + String msg = "failed to write " + refs; + log.error(msg, e); + throw new ItemStateException(msg, e); + } + + + } + + + public void init(PMContext inContext) throws Exception { + super.init(inContext); + log.info("init mongo db persistance manager: " + getClass().getName()); + Mongo m = new Mongo(getHost()); + itsDb = m.getDB(getSchema()); + itsNodeCollection = itsDb.getCollection("node"); + itsRefCollection = itsDb.getCollection("ref"); + itsBlobStore = new MongoGridFSBLOBStore(); + // load namespaces + binding = new BundleBinding(errorHandling, itsBlobStore, getNsIndex(), getNameIndex(), context.getDataStore()); + binding.setMinBlobSize(minBlobSize); + if(isConsistencyCheck()) { + checkConsistency(null, true, true); + } + log.info("complete init mongo db persistance manager: " + getClass().getName()); + } + + public NodeReferences loadReferencesTo(NodeId id) throws NoSuchItemStateException, ItemStateException { + + BasicDBObject theQuery = new BasicDBObject(); + theQuery.put("_id", id.toString()); + + BasicDBObject theRefDb = (BasicDBObject) itsRefCollection.findOne(theQuery); + if (theRefDb == null) { + throw new NoSuchItemStateException(id.toString()); + } + + Object theD = theRefDb.get("d"); + InputStream in = new ByteArrayInputStream(theD instanceof Binary ? ((Binary) theD).getData() : (byte[]) theD); + + NodeReferences theRef = new NodeReferences(id); + try { + Serializer.deserialize(theRef, in); + return theRef; + } catch (Exception e) { + if (e instanceof NoSuchItemStateException) { + throw (NoSuchItemStateException) e; + } + String msg = "failed to read references: " + id; + log.error(msg, e); + throw new ItemStateException(msg, e); + } + } + + public synchronized boolean existsReferencesTo(NodeId targetId) throws ItemStateException { + + BasicDBObject theDBObject = new BasicDBObject(); + theDBObject.put("_id", targetId.toString()); + BasicDBObject theRefDb = (BasicDBObject) itsRefCollection.findOne(theDBObject); + return theRefDb != null; + } + + public List getAllNodeIds(NodeId after, int maxCount) throws ItemStateException, RepositoryException { + DBObject theSort = new BasicDBObject(); + theSort.put("_id", "1"); + + DBObject theQuery = new BasicDBObject(); + theQuery.put("_id", after.toString()); + + theQuery = QueryBuilder.start().greaterThan(theQuery).get(); + DBCursor theCursor = itsRefCollection.find(theQuery).sort(theSort).limit(maxCount); + + ArrayList result = new ArrayList(); + + for (DBObject theDBObject : theCursor) { + result.add(NodeId.valueOf(theDBObject.get("_id").toString())); + } + + return result; + } + + protected void checkBundleConsistency(NodeId id, NodePropBundle bundle, + boolean fix, Collection modifications) { + //log.info(name + ": checking bundle '" + id + "'"); + + // skip all system nodes except root node + if (id.toString().endsWith("babecafebabe") + && !id.toString().equals("cafebabe-cafe-babe-cafe-babecafebabe")) { + return; + } + + Collection missingChildren = new ArrayList(); + + try { + List theChildren = loadChildrenNodeId(bundle.getId()); + for (NodePropBundle.ChildNodeEntry theChildNodeEntry : bundle.getChildNodeEntries()) { + if (!theChildren.contains(theChildNodeEntry.getId())) { + missingChildren.add(theChildNodeEntry); + } + } + } catch (ItemStateException e) { + + } + + // remove child node entry (if fixing is enabled) + if (fix && !missingChildren.isEmpty()) { + for (NodePropBundle.ChildNodeEntry entry : missingChildren) { + bundle.getChildNodeEntries().remove(entry); + } + modifications.add(bundle); + } + + // check parent reference + NodeId parentId = bundle.getParentId(); + try { + // skip root nodes (that point to itself) + if (parentId != null && !id.toString().endsWith("babecafebabe")) { + if (exists(parentId)) { + log.error("NodeState '" + id + "' references inexistent parent uuid '" + parentId + "'"); + } + } + } catch (ItemStateException e) { + log.error("Error reading node '" + parentId + "' (parent of '" + id + "'): " + e); + } + } + + public void checkConsistency(String[] uuids, boolean recursive, boolean fix) { + int count = 0; + int total = 0; + Collection modifications = new ArrayList(); + + if (uuids == null) { + // get all node bundles in the database with a single sql statement, + // which is (probably) faster than loading each bundle and traversing the tree + DBCursor rs; + try { + rs = itsNodeCollection.find(); + total = rs.count(); + // iterate over all node bundles in the db + while (rs.hasNext()) { + BasicDBObject theDBObject = (BasicDBObject) rs.next(); + NodeId id = NodeId.valueOf(theDBObject.getString("_id")); + checkBundleConsistency(id, toBundle(id, theDBObject), fix, modifications); + count++; + if (count % 1000 == 0) { + log.info(name + ": checked " + count + "/" + total + " bundles..."); + } + } + } catch (Exception e) { + log.error("Error loading bundle", e); + } finally { + total = count; + } + } else { + // check only given uuids, handle recursive flag + + // 1) convert uuid array to modifiable list + // 2) for each uuid do + // a) load node bundle + // b) check bundle, store any bundle-to-be-modified in collection + // c) if recursive, add child uuids to list of uuids + + List idList = new ArrayList(uuids.length); + // convert uuid string array to list of UUID objects + for (int i = 0; i < uuids.length; i++) { + try { + idList.add(new NodeId(uuids[i])); + } catch (IllegalArgumentException e) { + log.error("Invalid uuid for consistency check, skipping: '" + uuids[i] + "': " + e); + } + } + + try { + while (idList.size() > 0) { + List theBundles = loadBundle(idList); + idList.clear(); + + for (NodePropBundle theBundle : theBundles) { + checkBundleConsistency(theBundle.getId(), theBundle, fix, modifications); + if (recursive) { + for (NodePropBundle.ChildNodeEntry theChildNodeEntry : theBundle.getChildNodeEntries()) { + idList.add(theChildNodeEntry.getId()); + } + } + count++; + if (count % 1000 == 0) { + log.info(name + ": checked " + count + "/" + idList.size() + " bundles..."); + } + } + } + } catch (ItemStateException e) { + } + total = idList.size(); + } + + // repair collected broken bundles + if (fix && !modifications.isEmpty()) { + log.info(name + ": Fixing " + modifications.size() + " inconsistent bundle(s)..."); + for (NodePropBundle bundle : modifications) { + try { + log.info(name + ": Fixing bundle '" + bundle.getId() + "'"); + bundle.markOld(); // use UPDATE instead of INSERT + storeBundle(bundle); + evictBundle(bundle.getId()); + } catch (ItemStateException e) { + log.error(name + ": Error storing fixed bundle: " + e); + } + } + } + + log.info(name + ": checked " + count + "/" + total + " bundles."); + } + + @Override + protected BLOBStore getBlobStore() { + return itsBlobStore; //To change body of implemented methods use File | Settings | File Templates. + } + + public String getSchema() { + return itsSchema; + } + + public void setSchema(String inSchema) { + itsSchema = inSchema; + } + + public boolean isConsistencyCheck() { + return consistencyCheck; + } + + public void setConsistencyCheck(boolean inConsistencyCheck) { + consistencyCheck = inConsistencyCheck; + } + + public void setHost(String inLocalhost) { + itsHost = inLocalhost; + } + + public String getHost() { + return itsHost; + } + + + private class MongoGridFSBLOBStore implements BLOBStore { + + private GridFS itsGripFS; + + public MongoGridFSBLOBStore() { + + itsGripFS = new GridFS(itsDb); + + } + + public String createId(PropertyId id, int index) { + + StringBuffer buf = new StringBuffer(); + buf.append(id.getParentId().toString()); + buf.append('.'); + buf.append(getNsIndex().stringToIndex(id.getName().getNamespaceURI())); + buf.append('.'); + buf.append(getNameIndex().stringToIndex(id.getName().getLocalName())); + buf.append('.'); + buf.append(index); + return buf.toString(); + } + + public void put(String blobId, InputStream in, long size) throws Exception { + GridFSInputFile theGridFSInputFile = itsGripFS.createFile(in, true); + theGridFSInputFile.put("_id", blobId); + theGridFSInputFile.save(); + + + } + + public InputStream get(String blobId) throws Exception { + GridFSDBFile out = itsGripFS.findOne(new BasicDBObject("_id", blobId)); + return out.getInputStream(); + } + + public boolean remove(String blobId) throws Exception { + itsGripFS.remove(new BasicDBObject("_id", blobId)); + return true; + } + } +} Index: pom.xml =================================================================== --- pom.xml (revision 1348513) +++ pom.xml (working copy) @@ -275,6 +275,13 @@ org.apache.derby derby + + + org.mongodb + mongo-java-driver + 2.7.2 + + org.apache.jackrabbit jackrabbit-jcr-tests