Index: oak-core/pom.xml =================================================================== --- oak-core/pom.xml (revision 1696301) +++ oak-core/pom.xml (working copy) @@ -317,6 +317,12 @@ test + junit-addons + junit-addons + 1.4 + test + + org.mockito mockito-core 1.9.5 Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (revision 1696301) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (working copy) @@ -67,6 +67,13 @@ public static final String LEASE_END_KEY = "leaseEnd"; /** + * The key for the root-revision of the last background write (of unsaved modifications) + * - that is: the last root-revision written by the instance in case of a clear shutdown + * or via recovery of another instance in case of a crash + */ + public static final String LAST_WRITTEN_ROOT_REV_KEY = "lastWrittenRootRev"; + + /** * The state of the cluster. On proper shutdown the state should be cleared. * * @see org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java (revision 1696301) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java (working copy) @@ -77,4 +77,9 @@ private RecoverLockState getRecoveryState(){ return RecoverLockState.fromString((String) get(ClusterNodeInfo.REV_RECOVERY_LOCK)); } + + /** the root-revision of the last background write (of unsaved modifications) **/ + public String getLastWrittenRootRev() { + return (String) get(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY); + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java (working copy) @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +/** + * DocumentNS-internal listener that gets invoked when a change in the + * clusterNodes collection (active/inactive/timed out/recovering) is detected. + */ +public interface ClusterStateChangeListener { + + /** + * Informs the listener that DocumentNodeStore has discovered a change in the + * clusterNodes collection. + */ + public void handleClusterStateChange(); + +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java (working copy) @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.jackrabbit.oak.commons.json.JsopBuilder; + +/** + * A ClusterView represents the state of a cluster at a particular + * moment in time. + *

+ * This is a combination of what is stored in the ClusterViewDocument + * and the list of instances that currently have a backlog. + *

+ * In order to be able to differentiate and clearly identify the + * different states an instance is in, the ClusterView uses a + * slightly different terminology of states that it reports: + *

+ * The JSON generated by the ClusterView (which is propagated to JMX) + * has the following fields: + * + */ +class ClusterView { + + /** the json containing the complete information of the state of this ClusterView. + * Created at constructor time for performance reasons (json will be polled via + * JMX very frequently, thus must be provided fast) + */ + private final String json; + + /** + * Factory method that creates a ClusterView given a ClusterViewDocument and a list + * of instances that currently have a backlog. + *

+ * The ClusterViewDocument contains instances in the following states: + *

+ * The ClusterView however reports these upwards as follows: + * + * @param localInstanceId the id of the local instance (me) + * @param clusterViewDoc the ClusterViewDocument which contains the currently persisted cluster view + * @param backlogIds the ids that the local instances still has not finished a background read for + * and thus still have a backlog + * @return the ClusterView representing the provided info + */ + static ClusterView fromDocument(int localInstanceId, ClusterViewDocument clusterViewDoc, Set backlogIds) { + final Set activeIds = clusterViewDoc.getActiveIds(); + final Set deactivatingIds = new HashSet(); + deactivatingIds.addAll(clusterViewDoc.getRecoveringIds()); + deactivatingIds.addAll(backlogIds); + final Set inactiveIds = new HashSet(); + inactiveIds.addAll(clusterViewDoc.getInactiveIds()); + if (!inactiveIds.removeAll(backlogIds) && backlogIds.size()>0) { + // then not all backlogIds were listed is inactive - which is contrary to the expectation + // in which case we indeed do a paranoia exception here: + throw new IllegalStateException("not all backlogIds ("+backlogIds+") are part of inactiveIds ("+clusterViewDoc.getInactiveIds()+")"); + } + return new ClusterView(clusterViewDoc.getViewSeqNum(), backlogIds.size()==0, clusterViewDoc.getClusterViewId(), localInstanceId, + activeIds, deactivatingIds, inactiveIds); + } + + ClusterView(final long viewSeqNum, + final boolean viewFinal, + final String clusterViewId, + final int localId, + final Set activeIds, + final Set deactivatingIds, + final Set inactiveIds) { + if (viewSeqNum<0) { + throw new IllegalStateException("viewSeqNum must be zero or higher: "+viewSeqNum); + } + if (clusterViewId==null || clusterViewId.length()==0) { + throw new IllegalStateException("clusterViewId must not be zero or empty: "+clusterViewId); + } + if (localId<0) { + throw new IllegalStateException("localId must not be zero or higher: "+localId); + } + if (activeIds==null || activeIds.size()==0) { + throw new IllegalStateException("activeIds must not be null or empty"); + } + if (deactivatingIds==null) { + throw new IllegalStateException("deactivatingIds must not be null"); + } + if (inactiveIds==null) { + throw new IllegalStateException("inactiveIds must not be null"); + } + + json = asJson(viewSeqNum, viewFinal, clusterViewId, localId, activeIds, deactivatingIds, inactiveIds); + } + + /** Converts the provided parameters into the clusterview json that will be provided via JMX **/ + private String asJson(final long viewSeqNum, final boolean viewFinal, final String clusterViewId, final int localId, + final Set activeIds, + final Set deactivatingIds, + final Set inactiveIds) { + JsopBuilder builder = new JsopBuilder(); + builder.object(); + builder.key("seq").value(viewSeqNum); + builder.key("final").value(viewFinal); + builder.key("id").value(clusterViewId); + builder.key("me").value(localId); + builder.key("active").array(); + for (Iterator it = activeIds.iterator(); it + .hasNext();) { + Integer anInstance = it.next(); + builder.value(anInstance); + } + builder.endArray(); + builder.key("deactivating").array(); + for (Iterator it = deactivatingIds.iterator(); it + .hasNext();) { + Integer anInstance = it.next(); + builder.value(anInstance); + } + builder.endArray(); + builder.key("inactive").array(); + for (Iterator it = inactiveIds.iterator(); it + .hasNext();) { + Integer anInstance = it.next(); + builder.value(anInstance); + } + builder.endArray(); + builder.endObject(); + return builder.toString(); + } + + /** Debugging toString() **/ + @Override + public String toString() { + return "a ClusterView["+json+"]"; + } + + /** This is the main getter that will be polled via JMX **/ + String asDescriptorValue() { + return json; + } + +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java (working copy) @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.jackrabbit.oak.commons.json.JsopBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents the document stored in the settings collection containing + * a 'cluster view'. + *

+ * A 'cluster view' is the state of the membership of instances that are + * or have all been connected to the same oak repository. + * The 'cluster view' is maintained by all instances in the cluster + * concurrently - the faster one wins. Its information is derived + * from the clusterNodes collection. From there the following three + * states are derived and instances are grouped into these: + *

    + *
  • Active: an instance is active and has no recoveryLock is currently + * acquired. The lease timeout is ignored. When the lease times out, this + * is noticed by one of the instances at some point and a recovery + * is started, at which point the instance transitions from 'Active' + * to 'Recovering'.
  • + *
  • Recovering: an instance that was active but currently has the + * recoveryLock acquired by one of the instances.
  • + *
  • Inactive: an instance is not set to active (in which case + * the recoveryLock is never set)
  • + *
+ *

+ * Note that the states managed in this ClusterViewDocument differs + * from the one from ClusterView - since ClusterView also manages + * the fact that after a recovery of a crashed instance there could + * be a 'backlog' of changes which it doesn't yet see until a + * background read is performed. + */ +class ClusterViewDocument { + + private static final Logger logger = LoggerFactory.getLogger(ClusterViewDocument.class); + + /** the id of this document is always 'clusterView' **/ + private static final String CLUSTERVIEW_DOC_ID = "clusterView"; + + // keys that we store in the root document - and in the history + /** document key that stores the stable id of the cluster (will never change) + * (Note: a better term would have been just clusterId - + * but that one is already occupied with what should actually be called clusterNodeId or just nodeId) **/ + static final String CLUSTER_VIEW_ID_KEY = "clusterViewId"; + + /** document key that stores the monotonically incrementing sequence number of the cluster view. Any update will increase this by 1 **/ + static final String VIEW_SEQ_NUM_KEY = "seqNum"; + + /** document key that stores the comma-separated list of active instance ids **/ + static final String ACTIVE_KEY = "active"; + + /** document key that stores the comma-separated list of inactive instance ids + * (they might still have a backlog, that is handled in ClusterView though, never persisted */ + static final String INACTIVE_KEY = "inactive"; + + /** document key that stores the comma-separated list of recovering instance ids **/ + static final String RECOVERING_KEY = "recovering"; + + /** document key that stores the date and time when this view was created - for debugging purpose only **/ + private static final String CREATED_KEY = "created"; + + /** document key that stores the id of the instance that created this view - for debugging purpose only **/ + private static final String CREATOR_KEY = "creator"; + + /** document key that stores the date and time when this was was retired - for debugging purpose only **/ + private static final String RETIRED_KEY = "retired"; + + /** document key that stores the id of the instance that retired this view - for debugging purpose only **/ + private static final String RETIRER_KEY = "retirer"; + + /** document key that stores a short, limited history of previous cluster views - for debugging purpose only **/ + private static final String CLUSTER_VIEW_HISTORY_KEY = "clusterViewHistory"; + + /** the format used when storing date+time **/ + private static final DateFormat standardDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + /** number of elements kept in the CLUSTERVIEW_HISTORY_KEY field **/ + private static final int HISTORY_LIMIT = 10; + + /** the monotonically incrementing sequence number of this cluster view **/ + private final long viewSeqNum; + + /** the stable id of this cluster **/ + private final String clusterViewId; + + /** the ids of instances that are active at this moment **/ + private final Integer[] activeIds; + + /** the ids of instances that are recovering (lastRev-recovery) at this moment **/ + private final Integer[] recoveringIds; + + /** the ids of instances that are inactive at this moment **/ + private final Integer[] inactiveIds; + + /** the short, limited history of previous cluster views, for debugging only **/ + private final Map viewHistory; + + /** the date+time at which this view was created, for debugging only **/ + private final String createdAt; + + /** the id of the instance that created this view, for debugging only **/ + private final Integer createdBy; + + /** + * Main method by which the ClusterViewDocument is updated in the settings collection + * @return the resulting ClusterViewDocument as just updated in the settings collection - or null + * if another instance was updating the clusterview concurrently (in which case the caller + * should re-read first and possibly re-update if needed) + */ + static ClusterViewDocument readOrUpdate(DocumentNodeStore documentNodeStore, + Set activeIds, Set recoveringIds, Set inactiveIds) { + logger.trace("readOrUpdate: expected: activeIds: {}, recoveringIds: {}, inactiveIds: {}", + activeIds, recoveringIds, inactiveIds); + if (activeIds==null || activeIds.size()==0) { + logger.info("readOrUpdate: activeIds must not be null or empty"); + throw new IllegalStateException("activeIds must not be null or empty"); + } + final int localClusterId = documentNodeStore.getClusterId(); + + final ClusterViewDocument previousView = doRead(documentNodeStore); + if (previousView!=null) { + if (previousView.matches(activeIds, recoveringIds, inactiveIds)) { + logger.trace("readOrUpdate: view unchanged, returning: {}", previousView); + return previousView; + } + } + logger.trace("readOrUpdate: view change detected, going to update from {} to activeIds: {}, recoveringIds: {}, inactiveIds: {}", + previousView, activeIds, recoveringIds, inactiveIds); + final UpdateOp updateOp = new UpdateOp(CLUSTERVIEW_DOC_ID, true); + final Date now = new Date(); + updateOp.set(ACTIVE_KEY, setToCsv(activeIds)); + updateOp.set(RECOVERING_KEY, setToCsv(recoveringIds)); + updateOp.set(INACTIVE_KEY, setToCsv(inactiveIds)); + updateOp.set(CREATED_KEY, standardDateFormat.format(now)); + updateOp.set(CREATOR_KEY, localClusterId); + Map historyMap = new HashMap(); + if (previousView!=null) { + Map previousHistory = previousView.getHistory(); + if (previousHistory!=null) { + historyMap.putAll(previousHistory); + } + + historyMap.put(Revision.newRevision(localClusterId).toString(), + asHistoryEntry(previousView, localClusterId, now)); + } + applyHistoryLimit(historyMap); + updateOp.set(CLUSTER_VIEW_HISTORY_KEY, historyMap); + + final Long newViewSeqNum; + if (previousView==null) { + // we are the first ever, looks like, that the clusterview is defined + // so we can use viewId==1 and we make sure no other cluster node + // tries to create this first one simultaneously - so we use + // 'create' + + // going via 'create' requires ID to be set again (not only in new UpdateOp(id,isNew)): + updateOp.set(Document.ID, CLUSTERVIEW_DOC_ID); + final ArrayList updateOps = new ArrayList(); + newViewSeqNum=1L; + updateOp.setNew(true); // paranoia as that's already set above + updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum); + // first view ever => choose a new unique clusterViewId + final String clusterViewId = UUID.randomUUID().toString(); + updateOp.set(CLUSTER_VIEW_ID_KEY, clusterViewId); + updateOps.add(updateOp); + logger.debug("updateAndRead: trying to create the first ever clusterView - hence {}={} and {}={}", VIEW_SEQ_NUM_KEY, newViewSeqNum, CLUSTER_VIEW_ID_KEY, clusterViewId); + if (!documentNodeStore.getDocumentStore().create(Collection.SETTINGS, updateOps)) { + logger.debug("updateAndRead: someone else just created the first view ever while I tried - reread that one later"); + return null; + } + } else { + // there were earlier clusterViews (the normal case) - thus we + // use 'findAndUpdate' with the condition that + // the view id is still at the previousview one + final Long previousViewSeqNum = previousView.getViewSeqNum(); + updateOp.setNew(false); // change to false from true above + updateOp.equals(VIEW_SEQ_NUM_KEY, null, previousViewSeqNum); + newViewSeqNum = previousViewSeqNum+1; + updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum); + logger.debug("updateAndRead: trying to update the clusterView to {}={} ",VIEW_SEQ_NUM_KEY,newViewSeqNum); + if (documentNodeStore.getDocumentStore().findAndUpdate(Collection.SETTINGS, updateOp)==null) { + logger.debug("updateAndRead: someone else just updated the view which I wanted to do as well - reread that one later"); + return null; + } + } + + // whatever the outcome of the above - we don't care - + // re-reading will in any case definitely show what has been persisted + // and if the re-read view contains the same id, it is what we have written + // - otherwise someone else came in between and we have to step back and retry + final ClusterViewDocument readResult = doRead(documentNodeStore); + if (readResult==null) { + logger.debug("updateAndRead: got null from read - whatever the exact reason, we must retry in a moment."); + return null; + } else if (newViewSeqNum.equals(readResult.getViewSeqNum())) { + logger.debug("updateAndRead: matching view - no change"); + return readResult; + } else { + logger.debug("updateAndRead: someone else in the cluster was updating right after I also succeeded - re-read in a bit"); + return null; + } + } + + /** + * Pruning method that makes sure the history never gets larger than HISTORY_LIMIT + * @param historyMap the pruning is done directly on this map + */ + private static void applyHistoryLimit(Map historyMap) { + while (historyMap.size()>HISTORY_LIMIT) { + // remove the oldest + String oldestRevision = null; + for (Iterator it = historyMap.keySet().iterator(); it.hasNext();) { + final Object obj = it.next(); + String r; + if (obj instanceof Revision) { + r = ((Revision)obj).toString(); + } else { + r = (String) obj; + } + if (oldestRevision==null) { + oldestRevision = r; + } else if (Revision.getTimestampDifference(Revision.fromString(r), Revision.fromString(oldestRevision))<0) { + oldestRevision = r; + } + } + if (oldestRevision==null) { + break; + } else { + if (historyMap.remove(oldestRevision)==null) { + if (historyMap.remove(Revision.fromString(oldestRevision))==null) { + break; + } + } + } + } + } + + /** Converts a previous clusterView document into a history 'string' **/ + private static String asHistoryEntry(final ClusterViewDocument previousView, int retiringClusterNodeId, Date retireTime) { + String h; + JsopBuilder b = new JsopBuilder(); + b.object(); + b.key(VIEW_SEQ_NUM_KEY); + b.value(previousView.getViewSeqNum()); + b.key(CREATED_KEY); + b.value(String.valueOf(previousView.getCreatedAt())); + b.key(CREATOR_KEY); + b.value(previousView.getCreatedBy()); + b.key(RETIRED_KEY); + b.value(String.valueOf(standardDateFormat.format(retireTime))); + b.key(RETIRER_KEY); + b.value(retiringClusterNodeId); + b.key(ACTIVE_KEY); + b.value(arrayToCsv((Integer[])previousView.getActiveIds().toArray(new Integer[0]))); + b.key(RECOVERING_KEY); + b.value(arrayToCsv((Integer[])previousView.getRecoveringIds().toArray(new Integer[0]))); + b.key(INACTIVE_KEY); + b.value(arrayToCsv((Integer[])previousView.getInactiveIds().toArray(new Integer[0]))); + b.endObject(); + h = b.toString(); + return h; + } + + /** helper method to convert a set to a comma-separated string (without using toString() for safety) **/ + private static String setToCsv(Set ids) { + if (ids==null || ids.size()==0) { + return null; + } + StringBuffer sb = new StringBuffer(); + Iterator it = ids.iterator(); + while(it.hasNext()) { + final Integer id = it.next(); + if (sb.length()!=0) { + sb.append(","); + } + sb.append(id); + } + return sb.toString(); + } + + /** helper method to convert an array to a comma-separated string **/ + static String arrayToCsv(Integer[] arr) { + if (arr==null || arr.length==0) { + return null; + } else if (arr.length==1) { + return String.valueOf(arr[0]); + } + StringBuffer sb = new StringBuffer(); + sb.append(String.valueOf(arr[0])); + for (int i = 1; i < arr.length; i++) { + final Object a = arr[i]; + sb.append(","); + sb.append(String.valueOf(a)); + } + return sb.toString(); + } + + /** inverse helper method which converts a comma-separated string into an integer array **/ + static Integer[] csvToIntegerArray(String csv) { + if (csv==null) { + return null; + } + String[] split = csv.split(","); + Integer[] result = new Integer[split.length]; + for (int i = 0; i < split.length; i++) { + result[i] = Integer.parseInt(split[i]); + } + return result; + } + + /** internal reader of an existing clusterView document from the settings collection **/ + private static ClusterViewDocument doRead(DocumentNodeStore documentNodeStore) { + final DocumentStore documentStore = documentNodeStore.getDocumentStore(); + final Document doc = documentStore.find(Collection.SETTINGS, "clusterView", -1 /* -1; avoid caching */); + if (doc==null) { + return null; + } else { + final ClusterViewDocument clusterView = new ClusterViewDocument(doc); + if (clusterView.isValid()) { + return clusterView; + } else { + logger.warn("read: clusterView document is not valid: "+doc.format()); + return null; + } + } + } + + /** comparison helper that compares an integer array with a set **/ + static boolean matches(Integer[] expected, Set actual) { + final boolean expectedIsEmpty = expected==null || expected.length==0; + final boolean actualIsEmpty = actual==null || actual.size()==0; + if (expectedIsEmpty && actualIsEmpty) { + // if both are null or empty, they match + return true; + } + if (expectedIsEmpty!=actualIsEmpty) { + // if one of them is only empty, but the other not, then they don't match + return false; + } + if (expected.length!=actual.size()) { + // different size + return false; + } + for (int i = 0; i < expected.length; i++) { + Integer aMemberId = expected[i]; + if (!actual.contains(aMemberId)) { + return false; + } + } + return true; + } + + @SuppressWarnings("unchecked") + ClusterViewDocument(Document doc) { + if (doc==null) { + throw new IllegalArgumentException("doc must not be null"); + } + this.clusterViewId = (String) doc.get(CLUSTER_VIEW_ID_KEY); + this.viewSeqNum = (Long) doc.get(VIEW_SEQ_NUM_KEY); + this.createdAt = (String) doc.get(CREATED_KEY); + this.createdBy = (Integer) doc.get(CREATOR_KEY); + + final Object obj = doc.get(ACTIVE_KEY); + if (obj==null || !(obj instanceof String)) { + logger.trace(": {} : {}", ACTIVE_KEY, obj); + this.activeIds = new Integer[0]; + } else { + this.activeIds = csvToIntegerArray((String) obj); + } + + final Object obj2 = doc.get(RECOVERING_KEY); + if (obj2==null || !(obj2 instanceof String)) { + logger.trace(": {} : {}", RECOVERING_KEY, obj2); + this.recoveringIds= new Integer[0]; + } else { + this.recoveringIds = csvToIntegerArray((String) obj2); + } + + final Object obj3 = doc.get(INACTIVE_KEY); + if (obj3==null || !(obj3 instanceof String)) { + logger.trace(": {} : {}", INACTIVE_KEY, obj3); + this.inactiveIds = new Integer[0]; + } else { + this.inactiveIds = csvToIntegerArray((String) obj3); + } + + final Object obj4 = doc.get(CLUSTER_VIEW_HISTORY_KEY); + if (obj4==null || !(obj4 instanceof Map)) { + logger.trace(" viewHistory is null"); + this.viewHistory = null; + } else { + this.viewHistory = ((Map)obj4); + } + } + + /** Returns the set of active ids of this cluster view **/ + Set getActiveIds() { + return new HashSet(Arrays.asList(activeIds)); + } + + /** Returns the set of recovering ids of this cluster view **/ + Set getRecoveringIds() { + return new HashSet(Arrays.asList(recoveringIds)); + } + + /** Returns the set of inactive ids of this cluster view **/ + Set getInactiveIds() { + return new HashSet(Arrays.asList(inactiveIds)); + } + + /** Returns the history map **/ + private Map getHistory() { + return viewHistory; + } + + @Override + public String toString() { + return "a ClusterView[valid="+isValid()+", viewSeqNum="+viewSeqNum+", clusterViewId="+clusterViewId+ + ", activeIds="+arrayToCsv(activeIds)+", recoveringIds="+arrayToCsv(recoveringIds)+ + ", inactiveIds="+arrayToCsv(inactiveIds)+"]"; + } + + boolean isValid() { + return viewSeqNum>=0 && activeIds!=null && activeIds.length>0; + } + + /** Returns the date+time when this view was created, for debugging purpose only **/ + String getCreatedAt() { + return createdAt; + } + + /** Returns the id of the instance that created this view, for debugging purpose only **/ + int getCreatedBy() { + return createdBy; + } + + /** Returns the monotonically incrementing sequenece number of this view **/ + long getViewSeqNum() { + return viewSeqNum; + } + + /** Returns a UUID representing this cluster - will never change, propagates from view to view **/ + String getClusterViewId() { + return clusterViewId; + } + + private boolean matches(Set activeIds, Set recoveringIds, Set inactiveIds) { + if (!matches(this.activeIds, activeIds)) { + return false; + } + if (!matches(this.recoveringIds, recoveringIds)) { + return false; + } + if (!matches(this.inactiveIds, inactiveIds)) { + return false; + } + return true; + } + +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java (working copy) @@ -0,0 +1,615 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import javax.jcr.Value; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.felix.scr.annotations.Service; +import org.apache.jackrabbit.commons.SimpleValueFactory; +import org.apache.jackrabbit.oak.api.Descriptors; +import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The DocumentDiscoveryLiteService is taking care of providing a + * repository descriptor that contains the current cluster-view details. + *

+ * The clusterView is provided via a repository descriptor (see + * OAK_DISCOVERYLITE_CLUSTERVIEW) + *

+ * The cluster-view lists all instances (ever) known in the cluster + * in one of the following states: + *

    + *
  • active: the instance is currently running and has an up-to-date + * lease
  • + *
  • deactivating: the instance failed to update the lease recently + * thus a recovery is happening - or it has just finished and the local + * instance is yet to do a backgroundRead before it has finished + * reading the crashed/shutdown instance's last changes
  • + *
  • inactive: the instance is currently not running and all its + * changes have been seen by the local instance
  • + *
+ *

+ * Additionally, the cluster-view is assigned a monotonically increasing + * sequence number to. This sequence number is persisted, thus all + * instances in the cluster will show the same sequence number for + * a particular cluster-view in time. + *

+ * Note that the 'deactivating' state might be hiding some complexity + * that is deliberately not shown: for the documentNS the state + * 'deactivating' consists of two substates: 'recovering' as in + * _lastRevs are updated, and 'backlog processing' for a pending + * backgroundRead to get the latest head state of a crashed/shutdown + * instance. So when an instance is in 'deactivating' state, it is + * not indicated via the cluster-view whether it is recovering or + * has backlog to process. However, the fact that an instance has + * to yet do a backgroundRead to get changes is a per-instance + * story: other instances might already have done the backgroundRead + * and thus no longer have a backlog for the instance(s) that left. + * Even though 'deactivating' therefore is dependent on the instance + * you get the information from, the cluster-view must have a + * sequence number that uniquely identifies it in the cluster. + * These two constraints conflict. As a simple solution to handle + * this case nevertheless, the 'final' flag has been introduced: + * the cluster-view has this flag 'final' set to true when the + * view is final and nothing will be changed in this sequence number + * anymore. If the 'final' flag is false however it indicates + * that the cluster-view with this particular sequence number might + * still experience a change (more concretely: the deactivating instances + * might change). Note that there alternatives to this 'final' flag + * have been discussed, such as using vector-counters, but there + * was no obvious gain achieve using an alternative approach. + *

+ * In other words: whenever the 'final' flag is false, the view + * must be interpreted as 'in flux' wrt the deactivating/inactive instances + * and any action that depends on stable deactivating/inactive instances + * must not yet be done until the 'final' flag becomes true. + *

+ * Underneath, the DocumentDiscoveryLiteService uses the clusterNodes + * collection to derive the clusterView, which it stores in the settings + * collection. Whenever it updates the clusterView it increments the + * sequence number by 1. + *

+ * While this new 'clusterView' document in the settings collection + * sounds like redundant data (since it is just derived from the clusterNodes), + * it actually is not. By persisting the clusterView it becomes the + * new source of truth wrt what the clusterView looks like. And no + * two instances in the same cluster can make different conclusions + * based eg on different clocks they have or based on reading the clusterNodes + * in a slightly different moment etc. Also, the clusterView allows + * to store the currently two additional attributes: the clusterViewId + * (which is the permanent id for this cluster similar to the slingId being + * a permanent id for an instance) as well as the sequence number + * (which allows the instances to make reference to the same clusterView, + * and be able to simply detect whether anything has changed) + *

+ * Prerequisites that the clusterView mechanism is stable: + *

    + *
  • the machine clocks are reasonably in sync - that is, they should + * be off by magnitudes less than the lease updateFrequency/timeout
  • + *
  • the write-delays from any instance to the mongo server where + * the clusterNodes and settings collections are stored should be + * very fast - at least orders of magnitudes lower again than the + * lease timeout
  • + *
  • when this instance notices that others have kicked it out of the + * clusterView (which it can find out when either its clusterNodes + * document is set to recovering or it is not in the clusterView + * anymore, although it just was - ie not just because of a fresh start), + * then this instance must step back gracefully. + * The exact definition is to be applied elsewhere - but it should + * include: stopping to update its own lease, waiting for the view + * to have stabilized - waiting for recovery of its own instance + * by the remaining instances in the cluster to have finished - + * and then probably waiting for another gracePeriod until it + * might rejoin the cluster. In between, any commit should + * fail with BannedFromClusterException
  • + *
+ * @see #OAK_DISCOVERYLITE_CLUSTERVIEW + */ +@Component(immediate = true, name = DocumentDiscoveryLiteService.COMPONENT_NAME) +@Service(value = { DocumentDiscoveryLiteService.class, Observer.class }) +public class DocumentDiscoveryLiteService implements ClusterStateChangeListener, Observer { + + static final String COMPONENT_NAME = "org.apache.jackrabbit.oak.plugins.document.DocumentDiscoveryLiteService"; + + /** Name of the repository descriptor via which the clusterView is published - which is the raison d'etre of the DocumentDiscoveryLiteService **/ + public static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview"; + + private static final Logger logger = LoggerFactory.getLogger(DocumentDiscoveryLiteService.class); + + /** describes the reason why the BackgroundWorker should be woken up **/ + private static enum WakeupReason { + CLUSTER_STATE_CHANGED, + BACKGROUND_READ_FINISHED + } + + /** The BackgroundWorker is taking care of regularly invoking checkView - which in turn will detect if anything changed **/ + private class BackgroundWorker implements Runnable { + + final Random random = new Random(); + + boolean stopped = false; + + private void stop() { + logger.trace("stop: start"); + synchronized(BackgroundWorker.this) { + stopped = true; + } + logger.trace("stop: end"); + } + + @Override + public void run() { + logger.info("BackgroundWorker.run: start"); + try{ + doRun(); + } finally { + logger.info("BackgroundWorker.run: end {finally}"); + } + } + + private void doRun() { + while(!stopped) { + try{ + logger.trace("BackgroundWorker.doRun: going to call checkView"); + final boolean shortSleep = checkView(); + logger.trace("BackgroundWorker.doRun: checkView terminated with {} (=shortSleep)", shortSleep); + final long sleepMillis = shortSleep ? (50 + random.nextInt(450)) : 5000; + logger.trace("BackgroundWorker.doRun: sleeping {}ms", sleepMillis); + synchronized(BackgroundWorker.this) { + if (stopped) return; + BackgroundWorker.this.wait(sleepMillis); + if (stopped) return; + } + logger.trace("BackgorundWorker.doRun: done sleeping, looping"); + } catch(Exception e) { + logger.error("doRun: got an exception: "+e, e); + try{ + Thread.sleep(5000); + } catch(Exception e2) { + logger.error("doRun: got an exception while sleeping due to another exception: "+e2, e2); + } + } + } + } + + } + + /** This provides the 'clusterView' repository descriptors **/ + private class DiscoveryLiteDescriptor implements Descriptors { + + final SimpleValueFactory factory = new SimpleValueFactory(); + + @Override + public String[] getKeys() { + return new String[] {OAK_DISCOVERYLITE_CLUSTERVIEW}; + } + + @Override + public boolean isStandardDescriptor(String key) { + if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) { + return false; + } + return true; + } + + @Override + public boolean isSingleValueDescriptor(String key) { + if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) { + return false; + } + return true; + } + + @Override + public Value getValue(String key) { + if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) { + return null; + } + return factory.createValue(getClusterViewAsDescriptorValue()); + } + + @Override + public Value[] getValues(String key) { + if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) { + return null; + } + return new Value[] {getValue(key)}; + } + + } + + /** DocumentNodeStore's (hence local) clusterId **/ + private int clusterNodeId = -1; + + /** the DocumentNodeStore - used to get the active/inactive cluster ids from **/ + private DocumentNodeStore documentNodeStore; + + /** background job that periodically verifies and updates the clusterView **/ + private BackgroundWorker backgroundWorker; + + /** the ClusterViewDocument which was used in the last checkView run **/ + private ClusterViewDocument previousClusterViewDocument; + + /** the ClusterView that was valid as a result of the previous checkView run **/ + private ClusterView previousClusterView; + + /** kept volatile as this is frequently read in contentChanged which is better kept unsynchronized as long as possible **/ + private volatile boolean hasInstancesWithBacklog; + + /** Require a static reference to the NodeStore. Note that this implies the service is active for both segment and document **/ + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY, + policy = ReferencePolicy.STATIC) + private volatile NodeStore nodeStore; + + /** inactive nodes that have been so for a while, ie they have no backlog anymore, so no need to check for backlog every time **/ + private Set longTimeInactives = new HashSet(); + + /** returns the clusterView as a json value for it to be provided via the repository descriptor **/ + private String getClusterViewAsDescriptorValue() { + if (previousClusterView==null) { + return null; + } else { + return previousClusterView.asDescriptorValue(); + } + } + + /** On activate the MongoDiscoveryService tries to start the background job */ + @Activate + public void activate(ComponentContext context) { + logger.trace("activate: start"); + + // Document vs Segment check + if (!(nodeStore instanceof DocumentNodeStore)) { + // that is the case when running on SegmentNodeStore - and atm + // there's no other simple dependency that could guarantee that + // DocumentDiscoveryLiteService is only activated when we are + // indeed running on DocumentNodeStore. + // So that's why there's this instanceof check. + // and if it fails, then disable the Document-discoveryliteservice + logger.info("activate: nodeStore is not a DocumentNodeStore, thus disabling: "+COMPONENT_NAME); + context.disableComponent(COMPONENT_NAME); + return; + } + + // set the ClusterStateChangeListener with the DocumentNodeStore + this.documentNodeStore = (DocumentNodeStore) nodeStore; + documentNodeStore.setClusterStateChangeListener(this); + + // retrieve the clusterId + clusterNodeId = documentNodeStore.getClusterId(); + + // start the background worker + backgroundWorker = new BackgroundWorker(); + Thread th = new Thread(backgroundWorker, "DiscoveryLite-BackgroundWorker-["+clusterNodeId+"]"); + th.setDaemon(true); + th.start(); + + // register the Descriptors - for Oak to pass it upwards + if (context!=null) { + OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext()); + whiteboard.register(Descriptors.class, new DiscoveryLiteDescriptor(), Collections.emptyMap()); + } + logger.trace("activate: end"); + } + + /** On deactivate the background job is stopped - if it was running at all **/ + @Deactivate + protected void deactivate() { + logger.trace("deactivate: deactivated"); + + if (backgroundWorker!=null) { + backgroundWorker.stop(); + backgroundWorker = null; + } + logger.trace("deactivate: end"); + } + + /** + * Checks if anything changed in the current view and updates the service + * fields accordingly. + * @return true if anything changed or is about to be changed (eg recovery/backlog), false if + * the view is stable + */ + private boolean checkView() { + logger.trace("checkView: start"); + List allClusterNodes = + ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore()); + + final Map allNodeIds = new HashMap(); + final Map activeNotTimedOutNodes = new HashMap(); + final Map activeButTimedOutNodes = new HashMap(); + final Map recoveringNodes = new HashMap(); + final Map backlogNodes = new HashMap(); + final Map inactiveNoBacklogNodes = new HashMap(); + + for (Iterator it = allClusterNodes.iterator(); it.hasNext();) { + ClusterNodeInfoDocument clusterNode = it.next(); + allNodeIds.put(clusterNode.getClusterId(), clusterNode); + if (clusterNode.isBeingRecovered()) { + recoveringNodes.put(clusterNode.getClusterId(), clusterNode); + } else if (!clusterNode.isActive()) { + if (hasBacklog(clusterNode)) { + backlogNodes.put(clusterNode.getClusterId(), clusterNode); + } else { + inactiveNoBacklogNodes.put(clusterNode.getClusterId(), clusterNode); + } + } else if (clusterNode.getLeaseEndTime() allActives; + allActives = new HashMap(activeNotTimedOutNodes); + allActives.putAll(activeButTimedOutNodes); + + // terminology: + // 'inactivating' are nodes that are either 'recovering' or 'backlog' ones + // 'recovering' are nodes for which one node is doing the recover() of lastRevs + // 'backlog' ones are nodes that are no longer active, that have finished the + // recover() but for which a backgroundRead is still pending to read + // the latest root changes. + + logger.debug("checkView: active nodes: {}, timed out nodes: {}, recovering nodes: {}, backlog nodes: {}, inactive nodes: {}, total: {}, hence view nodes: {}", + activeNotTimedOutNodes.size(), activeButTimedOutNodes.size(), recoveringNodes.size(), backlogNodes.size(), inactiveNoBacklogNodes.size(), allNodeIds.size(), allActives.size()); + + final ClusterViewDocument originalView = previousClusterViewDocument; + final ClusterViewDocument newView = doCheckView(allActives.keySet(), recoveringNodes.keySet(), backlogNodes.keySet(), inactiveNoBacklogNodes.keySet()); + if (newView==null) { + logger.trace("checkView: end. newView: null"); + return true; + } + boolean newHasInstancesWithBacklog = recoveringNodes.size()>0 || backlogNodes.size()>0; + boolean changed = originalView==null || (newView.getViewSeqNum()!=originalView.getViewSeqNum()) || (newHasInstancesWithBacklog!=hasInstancesWithBacklog); + logger.debug("checkView: viewFine: {}, changed: {}, originalView: {}, newView: {}", newView!=null, changed, originalView, newView); + + if (longTimeInactives.addAll(inactiveNoBacklogNodes.keySet())) { + logger.debug("checkView: updated longTimeInactives to {} (inactiveNoBacklogNodes: {})", longTimeInactives, inactiveNoBacklogNodes); + } + + if (changed) { + ClusterView v = ClusterView.fromDocument(clusterNodeId, newView, backlogNodes.keySet()); + final ClusterView previousView = previousClusterView; + previousClusterView = v; + hasInstancesWithBacklog = newHasInstancesWithBacklog; + logger.info("checkView: view changed from: "+previousView+", to: "+v+", hasInstancesWithBacklog: "+hasInstancesWithBacklog); + return true; + } else { + logger.debug("checkView: no changes whatsoever, still at view: "+previousClusterView); + return hasInstancesWithBacklog; + } + } + + private Revision getLastKnownRevision(int clusterNodeId) { + String[] lastKnownRevisions = documentNodeStore.getMBean().getLastKnownRevisions(); + for (int i = 0; i < lastKnownRevisions.length; i++) { + String aLastKnownRevisionStr = lastKnownRevisions[i]; + String[] split = aLastKnownRevisionStr.split("="); + if (split.length==2) { + try{ + Integer id = Integer.parseInt(split[0]); + if (id==clusterNodeId) { + final Revision lastKnownRev = Revision.fromString(split[1]); + logger.trace("getLastKnownRevision: end. clusterNode: {}, lastKnownRevision: {}", clusterNodeId, lastKnownRev); + return lastKnownRev; + } + } catch(NumberFormatException nfe) { + logger.warn("getLastKnownRevision: could not parse integer '"+split[0]+"': "+nfe, nfe); + } + } else { + logger.warn("getLastKnownRevision: cannot parse lastKnownRevision: "+aLastKnownRevisionStr); + } + } + logger.warn("getLastKnownRevision: no lastKnownRevision found for "+clusterNodeId); + return null; + } + + private boolean hasBacklog(ClusterNodeInfoDocument clusterNode) { + if (logger.isTraceEnabled()) { + logger.trace("hasBacklog: start. clusterNodeId: {}", clusterNode.getClusterId()); + } + final Revision lastKnownRevision = getLastKnownRevision(clusterNode.getClusterId()); + if (lastKnownRevision==null) { + logger.warn("hasBacklog: no lastKnownRevision found, hence cannot determine backlog for node "+clusterNode.getClusterId()); + return false; + } + + // The lastKnownRevision is what the local instance has last read/seen from another instance. + // This must be compared to what the other instance *actually* has written as the very last thing. + // Now the knowledge what the other instance has last written (after recovery) would sit + // in the root document - so that could in theory be used. But reading the root document + // would have to be done *uncached*. And that's quite a change to what the original + // idea was: that the root document would only be read every second, to avoid contention. + // So this 'what the other instance has last written' information is retrieved via + // a new, dedicated property in the clusterNodes collection: the 'lastWrittenRootRev'. + // The 'lastWrittenRootRev' is written by 'UnsavedModifications' during backgroundUpdate + // and retrieved here quite regularly (but it should not be a big deal, as the + // discovery-lite is the only one reading this field so frequently and it does not + // interfere with normal (jcr) nodes at all). + String lastWrittenRootRevStr = clusterNode.getLastWrittenRootRev(); + if (lastWrittenRootRevStr==null) { + logger.warn("hasBacklog: node has lastWrittenRootRev=null"); + return false; + } + Revision lastWrittenRootRev = Revision.fromString(lastWrittenRootRevStr); + if (lastWrittenRootRev==null) { + logger.warn("hasBacklog: node has no lastWrittenRootRev: "+clusterNode.getClusterId()); + return false; + } + + final boolean hasBacklog = Revision.getTimestampDifference(lastKnownRevision, lastWrittenRootRev)<0; + if (logger.isDebugEnabled()) { + logger.debug("hasBacklog: clusterNodeId: {}, lastKnownRevision: {}, lastWrittenRootRev: {}, hasBacklog: {}", + clusterNode.getClusterId(), lastKnownRevision, lastWrittenRootRev, hasBacklog); + } + return hasBacklog; + } + + + private ClusterViewDocument doCheckView(final Set activeNodes, + final Set recoveringNodes, final Set backlogNodes, final Set inactiveNodes) { + logger.trace("doCheckView: start: activeNodes: {}, recoveringNodes: {}, backlogNodes: {}, inactiveNodes: {}", + activeNodes, recoveringNodes, backlogNodes, inactiveNodes); + + Set allInactives = new HashSet(); + allInactives.addAll(inactiveNodes); + allInactives.addAll(backlogNodes); + + if (activeNodes.size()==0) { + // then we have zero active nodes - that's nothing expected as that includes our own node not to be active + // hence handle with care - ie wait until we get an active node + logger.warn("doCheckView: empty active ids. activeNodes:{}, recoveringNodes:{}, inactiveNodes:{}", activeNodes, recoveringNodes, inactiveNodes); + return null; + } + ClusterViewDocument newViewOrNull; + try{ + newViewOrNull = ClusterViewDocument.readOrUpdate(documentNodeStore, activeNodes, recoveringNodes, allInactives); + } catch(RuntimeException re) { + logger.error("doCheckView: RuntimeException: re: "+re, re); + return null; + } catch(Error er) { + logger.error("doCheckView: Error: er: "+er, er); + return null; + } + logger.trace("doChckView: readOrUpdate result: {}", newViewOrNull); + + // and now for some verbose documentation and logging: + if (newViewOrNull==null) { + // then there was a concurrent update of the clusterView + // and we should do some quick backoff sleeping + logger.debug("doCheckView: newViewOrNull is null: "+newViewOrNull); + return null; + } else { + // otherwise we now hold the newly valid view + // it could be the same or different to the previous one, let's check + if (previousClusterViewDocument==null) { + // oh ok, this is the very first one + previousClusterViewDocument = newViewOrNull; + logger.debug("doCheckView: end. first ever view: {}", newViewOrNull); + return newViewOrNull; + } else if (previousClusterViewDocument.getViewSeqNum()==newViewOrNull.getViewSeqNum()) { + // that's the normal case: the viewId matches, nothing has changed, we've already + // processed the previousClusterView, so: + logger.debug("doCheckView: end. seqNum did not change. view: {}", newViewOrNull); + return newViewOrNull; + } else { + // otherwise the view has changed + logger.info("doCheckView: view has changed from: {} to: {} - sending event...", previousClusterViewDocument, newViewOrNull); + previousClusterViewDocument = newViewOrNull; + logger.debug("doCheckView: end. changed view: {}", newViewOrNull); + return newViewOrNull; + } + } + } + + @Override + public void handleClusterStateChange() { + // handleClusterStateChange is needed to learn about any state change in the clusternodes + // collection asap and being able to react on it - so this will wake up the + // backgroundWorker which in turn will - in a separate thread - check the view + // and send out events accordingly + wakeupBackgroundWorker(WakeupReason.CLUSTER_STATE_CHANGED); + } + + private void wakeupBackgroundWorker(WakeupReason wakeupReason) { + final BackgroundWorker bw = backgroundWorker; + if (bw!=null) { + // get a copy of this.hasInstancesWithBacklog for just the code-part in this synchronized + final boolean hasInstancesWithBacklog = this.hasInstancesWithBacklog; + + if (wakeupReason==WakeupReason.BACKGROUND_READ_FINISHED) { + // then only forward the notify if' hasInstancesWithBacklog' + // ie, we have anything we could be waiting for - otherwise + // we dont need to wakeup the background thread + if (!hasInstancesWithBacklog) { + logger.trace("wakeupBackgroundWorker: not waking up backgroundWorker, as we do not have any instances with backlog"); + return; + } + } + logger.trace("wakeupBackgroundWorker: waking up backgroundWorker, reason: {} (hasInstancesWithBacklog: {})", + wakeupReason, hasInstancesWithBacklog); + synchronized(bw) { + bw.notifyAll(); + } + } + } + + /** + *

+ * Additionally the DocumentDiscoveryLiteService must be notified when the + * background-read has finished - as it could be waiting for a crashed node's recovery + * to finish - which it can only do by checking the lastKnownRevision of the crashed + * instance - and that check is best done after the background read is just finished + * (it could optinoally do that just purely time based as well, but going via a listener + * is more timely, that's why this approach has been chosen). + */ + @Override + public void contentChanged(NodeState root, CommitInfo info) { + // contentChanged is only used to react as quickly as possible + // when we have instances that have a 'backlog' - ie when instances crashed + // and are being recovered - then we must wait until the recovery is finished + // AND until the subsequent background read actually reads that instance' + // last changes. To catch that moment as quickly as possible, + // this contentChanged is used. + // Now from the above it also results that this only wakes up the + // backgroundWorker if we have any pending 'backlogy instances' + // otherwise this is a no-op + if (info==null) { + // then ignore this as this is likely an external change + // note: it could be a compacted change, in which case we should + // probably still process it - but we have a 5sec fallback + // in the BackgroundWorker to handle that case too, + // so: + logger.trace("contentChanged: ignoring content change due to commit info being null"); + return; + } + logger.trace("contentChanged: handling content changed by waking up worker if necessary"); + wakeupBackgroundWorker(WakeupReason.BACKGROUND_READ_FINISHED); + } + +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1696301) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -348,6 +348,14 @@ private final BlobStore blobStore; /** + * The clusterStateChangeListener is invoked on any noticed change in the clusterNodes collection. + *

+ * Note that there is no synchronization between setting this one and using it, but arguably + * that is not necessary since it will be set at startup time and then never be changed. + */ + private ClusterStateChangeListener clusterStateChangeListener; + + /** * The BlobSerializer. */ private final BlobSerializer blobSerializer = new BlobSerializer() { @@ -1637,7 +1645,8 @@ runBackgroundReadOperations(); } - private void runBackgroundUpdateOperations() { + /** Note: made package-protected for testing purpose, would otherwise be private **/ + void runBackgroundUpdateOperations() { if (isDisposed.get()) { return; } @@ -1680,7 +1689,8 @@ //----------------------< background read operations >---------------------- - private void runBackgroundReadOperations() { + /** Note: made package-protected for testing purpose, would otherwise be private **/ + void runBackgroundReadOperations() { if (isDisposed.get()) { return; } @@ -1724,8 +1734,11 @@ /** * Updates the state about cluster nodes in {@link #activeClusterNodes} * and {@link #inactiveClusterNodes}. + * @return true if the cluster state has changed, false if the cluster state + * remained unchanged */ - void updateClusterState() { + boolean updateClusterState() { + boolean hasChanged = false; long now = clock.getTime(); Set inactive = Sets.newHashSet(); for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(store)) { @@ -1733,14 +1746,15 @@ if (cId != this.clusterId && !doc.isActive()) { inactive.add(cId); } else { - activeClusterNodes.put(cId, doc.getLeaseEndTime()); + hasChanged |= activeClusterNodes.put(cId, doc.getLeaseEndTime())==null; } } - activeClusterNodes.keySet().removeAll(inactive); - inactiveClusterNodes.keySet().retainAll(inactive); + hasChanged |= activeClusterNodes.keySet().removeAll(inactive); + hasChanged |= inactiveClusterNodes.keySet().retainAll(inactive); for (Integer clusterId : inactive) { - inactiveClusterNodes.putIfAbsent(clusterId, now); + hasChanged |= inactiveClusterNodes.putIfAbsent(clusterId, now)==null; } + return hasChanged; } /** @@ -2436,6 +2450,15 @@ return blobGC; } + void setClusterStateChangeListener(ClusterStateChangeListener clusterStateChangeListener) { + this.clusterStateChangeListener = clusterStateChangeListener; + } + + void signalClusterStateChange() { + if (clusterStateChangeListener!=null) { + clusterStateChangeListener.handleClusterStateChange(); + } + } //-----------------------------< DocumentNodeStoreMBean >--------------------------------- public DocumentNodeStoreMBean getMBean() { @@ -2607,10 +2630,16 @@ @Override protected void execute(@Nonnull DocumentNodeStore nodeStore) { - if (nodeStore.renewClusterIdLease()) { - nodeStore.updateClusterState(); + // first renew the clusterId lease + nodeStore.renewClusterIdLease(); + + // then, independently if the lease had to be updated or not, check the status: + if (nodeStore.updateClusterState()) { + // then inform the discovery lite listener - if it is registered + nodeStore.signalClusterStateChange(); } } + } public BlobStore getBlobStore() { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (revision 1696301) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (working copy) @@ -278,8 +278,16 @@ return recover(suspects.iterator(), clusterId); } finally { Utils.closeIfCloseable(suspects); + // Relinquish the lock on the recovery for the cluster on the clusterInfo + //TODO: in case recover throws a RuntimeException (or Error..) then + // the recovery might have failed, yet the instance is marked + // as 'recovered' (by setting the state to NONE). + // is this really fine here? or should we not retry - or at least + // log the throwable? missingLastRevUtil.releaseRecoveryLock(clusterId); + + nodeStore.signalClusterStateChange(); } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (revision 1696301) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (working copy) @@ -214,6 +214,17 @@ lastRev = null; } } + Revision writtenRootRev = pending.get("/"); + if (writtenRootRev!=null) { + int cid = writtenRootRev.getClusterId(); + if (store.getDocumentStore().find(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, String.valueOf(cid))!=null) { + UpdateOp update = new UpdateOp(String.valueOf(cid), false); + update.equals(Document.ID, null, String.valueOf(cid)); + update.set(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY, writtenRootRev.toString()); + store.getDocumentStore().findAndUpdate(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, update); + } + } + stats.write = clock.getTime() - time; return stats; } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java (working copy) @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.HashSet; +import java.util.Set; + +/** Test helper class that is capable to simply creating ClusterView and ClusterViewDocument objs **/ +class ClusterViewBuilder { + + private final Set activeIds = new HashSet(); + private final Set recoveringIds = new HashSet(); + private final Set backlogIds = new HashSet(); + private final Set inactiveIds = new HashSet(); + private final long viewSeqNum; + private final String clusterViewId; + private final int myId; + + ClusterViewBuilder(long viewSeqNum, String clusterViewId, int myId) { + this.viewSeqNum = viewSeqNum; + this.clusterViewId = clusterViewId; + this.myId = myId; + } + + public ClusterViewBuilder active(int... instanceIds) { + for (int i = 0; i < instanceIds.length; i++) { + int anId = instanceIds[i]; + activeIds.add(anId); + } + return this; + } + public ClusterViewBuilder recovering(int... instanceIds) { + for (int i = 0; i < instanceIds.length; i++) { + int anId = instanceIds[i]; + recoveringIds.add(anId); + } + return this; + } + public ClusterViewBuilder backlogs(int... instanceIds) { + for (int i = 0; i < instanceIds.length; i++) { + int anId = instanceIds[i]; + backlogIds.add(anId); + } + return this; + } + public ClusterViewBuilder inactive(int... instanceIds) { + for (int i = 0; i < instanceIds.length; i++) { + int anId = instanceIds[i]; + inactiveIds.add(anId); + } + return this; + } + + public ClusterViewDocument asDoc() { + /* + * "_id" : "clusterView", + "seqNum" : NumberLong(1), + "inactive" : null, + "clusterViewHistory" : { + + }, + "deactivating" : null, + "created" : "2015-06-30T08:21:29.393+0200", + "clusterViewId" : "882f8926-1112-493a-81a0-f946087b2986", + "active" : "1", + "creator" : 1, + "_modCount" : NumberLong(1) + */ + Document doc = new Document(); + doc.put(ClusterViewDocument.VIEW_SEQ_NUM_KEY, viewSeqNum); + doc.put(ClusterViewDocument.INACTIVE_KEY, asArrayStr(inactiveIds)); + doc.put(ClusterViewDocument.RECOVERING_KEY, asArrayStr(recoveringIds)); + doc.put(ClusterViewDocument.ACTIVE_KEY, asArrayStr(activeIds)); + doc.put(ClusterViewDocument.CLUSTER_VIEW_ID_KEY, clusterViewId); + ClusterViewDocument clusterViewDoc = new ClusterViewDocument(doc); + return clusterViewDoc; + } + public ClusterView asView() { + return ClusterView.fromDocument(myId, asDoc(), backlogIds); + } + + private String asArrayStr(Set ids) { + return ClusterViewDocument.arrayToCsv(asArray(ids)); + } + + private Integer[] asArray(Set set) { + if (set.size()==0) { + return null; + } else { + return set.toArray(new Integer[set.size()]); + } + } + +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java (working copy) @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.junit.Test; + +public class ClusterViewDocumentTest { + + @Test + public void testConstructor() { + ClusterViewDocument doc = new ClusterViewBuilder(1, "2", 3).active(3,4).asDoc(); + assertNotNull(doc); + assertEquals("2", doc.getClusterViewId()); + assertEquals(0, doc.getRecoveringIds().size()); + assertEquals(0, doc.getInactiveIds().size()); + assertEquals(2, doc.getActiveIds().size()); + assertTrue(doc.getActiveIds().contains(3)); + assertTrue(doc.getActiveIds().contains(4)); + + doc = new ClusterViewBuilder(1, "2", 3).active(3,4).backlogs(5).inactive(5,6).asDoc(); + assertNotNull(doc); + assertEquals("2", doc.getClusterViewId()); + assertEquals(0, doc.getRecoveringIds().size()); + assertEquals(2, doc.getInactiveIds().size()); + assertEquals(2, doc.getActiveIds().size()); + assertTrue(doc.getActiveIds().contains(3)); + assertTrue(doc.getActiveIds().contains(4)); + assertTrue(doc.getInactiveIds().contains(5)); + assertTrue(doc.getInactiveIds().contains(6)); + + doc = new ClusterViewBuilder(11, "x", 4).active(3,4,5).recovering(6).inactive(7,8).asDoc(); + assertNotNull(doc); + assertEquals(11, doc.getViewSeqNum()); + assertEquals("x", doc.getClusterViewId()); + assertEquals(1, doc.getRecoveringIds().size()); + assertEquals(2, doc.getInactiveIds().size()); + assertEquals(3, doc.getActiveIds().size()); + assertTrue(doc.getActiveIds().contains(3)); + assertTrue(doc.getActiveIds().contains(4)); + assertTrue(doc.getActiveIds().contains(5)); + assertTrue(doc.getRecoveringIds().contains(6)); + assertTrue(doc.getInactiveIds().contains(7)); + assertTrue(doc.getInactiveIds().contains(8)); + + } + + @Test + public void testReadUpdate() throws Exception { + final int localClusterId = 11; + final DocumentNodeStore ns = createMK(localClusterId).nodeStore; + + try{ + ClusterViewDocument.readOrUpdate(ns, null, null, null); + fail("should complain"); + } catch(Exception ok) { + // ok + } + + try{ + ClusterViewDocument.readOrUpdate(ns, new HashSet(), null, null); + fail("should complain"); + } catch(Exception ok) { + // ok + } + + Set activeIds = new HashSet(); + activeIds.add(2); + Set recoveringIds = null; + Set inactiveIds = null; + // first ever view: + ClusterViewDocument doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + final String id = doc.getClusterViewId(); + assertTrue(id!=null && id.length()>0); + String createdAt = doc.getCreatedAt(); + assertTrue(createdAt!=null && createdAt.length()>0); + int createdBy = doc.getCreatedBy(); + assertEquals(localClusterId, createdBy); + assertEquals(1, doc.getViewSeqNum()); + assertEquals(1, doc.getActiveIds().size()); + assertTrue(doc.getActiveIds().contains(2)); + assertEquals(0, doc.getRecoveringIds().size()); + assertEquals(0, doc.getInactiveIds().size()); + + // now let's check if it doesn't change anything when we're not doing any update + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(1, doc.getViewSeqNum()); + + // and now add a new active id + activeIds.add(3); + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(id, doc.getClusterViewId()); + createdAt = doc.getCreatedAt(); + assertTrue(createdAt!=null && createdAt.length()>0); + createdBy = doc.getCreatedBy(); + assertEquals(localClusterId, createdBy); + assertEquals(2, doc.getViewSeqNum()); + assertEquals(2, doc.getActiveIds().size()); + assertTrue(doc.getActiveIds().contains(2)); + assertTrue(doc.getActiveIds().contains(3)); + assertEquals(0, doc.getRecoveringIds().size()); + assertEquals(0, doc.getInactiveIds().size()); + + // now let's check if it doesn't change anything when we're not doing any update + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(2, doc.getViewSeqNum()); + + // and now add a new recovering id + recoveringIds = new HashSet(); + recoveringIds.add(4); + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(id, doc.getClusterViewId()); + createdAt = doc.getCreatedAt(); + assertTrue(createdAt!=null && createdAt.length()>0); + createdBy = doc.getCreatedBy(); + assertEquals(localClusterId, createdBy); + assertEquals(3, doc.getViewSeqNum()); + assertEquals(2, doc.getActiveIds().size()); + assertTrue(doc.getActiveIds().contains(2)); + assertTrue(doc.getActiveIds().contains(3)); + assertEquals(1, doc.getRecoveringIds().size()); + assertTrue(doc.getRecoveringIds().contains(4)); + assertEquals(0, doc.getInactiveIds().size()); + + // now let's check if it doesn't change anything when we're not doing any update + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(3, doc.getViewSeqNum()); + + // and now move that one to inactive + recoveringIds = new HashSet(); + inactiveIds = new HashSet(); + inactiveIds.add(4); + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(id, doc.getClusterViewId()); + createdAt = doc.getCreatedAt(); + assertTrue(createdAt!=null && createdAt.length()>0); + createdBy = doc.getCreatedBy(); + assertEquals(localClusterId, createdBy); + assertEquals(4, doc.getViewSeqNum()); + assertEquals(2, doc.getActiveIds().size()); + assertTrue(doc.getActiveIds().contains(2)); + assertTrue(doc.getActiveIds().contains(3)); + assertEquals(0, doc.getRecoveringIds().size()); + assertEquals(1, doc.getInactiveIds().size()); + assertTrue(doc.getInactiveIds().contains(4)); + + // now let's check if it doesn't change anything when we're not doing any update + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(4, doc.getViewSeqNum()); + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(4, doc.getViewSeqNum()); + doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds); + assertEquals(4, doc.getViewSeqNum()); + } + + private static DocumentMK createMK(int clusterId){ + return create(new MemoryDocumentStore(), clusterId); + } + + private static DocumentMK create(DocumentStore ds, int clusterId){ + return new DocumentMK.Builder() + .setAsyncDelay(0) + .setDocumentStore(ds) + .setClusterId(clusterId) + .setPersistentCache("target/persistentCache,time") + .open(); + } + +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java (working copy) @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.jackrabbit.oak.commons.json.JsonObject; +import org.apache.jackrabbit.oak.commons.json.JsopTokenizer; +import org.junit.Test; + +/** Simple paranoia tests for constructor and getters of ClusterViewImpl **/ +public class ClusterViewTest { + + @Test + public void testConstructor() throws Exception { + final Integer viewId = 3; + final String clusterViewId = UUID.randomUUID().toString(); + final Integer instanceId = 2; + final Set emptyInstanceIds = new HashSet(); + final Set instanceIds = new HashSet(); + instanceIds.add(1); + final Set deactivating = new HashSet(); + final Set inactive = new HashSet(); + try{ + new ClusterView(-1, true, clusterViewId, instanceId, instanceIds, deactivating, inactive); + fail("should complain"); + } catch(IllegalStateException e) { + // ok + } + try{ + new ClusterView(viewId, true, null, instanceId, instanceIds, deactivating, inactive); + fail("should complain"); + } catch(IllegalStateException e) { + // ok + } + try{ + new ClusterView(viewId, true, clusterViewId, -1, instanceIds, deactivating, inactive); + fail("should complain"); + } catch(IllegalStateException e) { + // ok + } + try{ + new ClusterView(viewId, true, clusterViewId, instanceId, emptyInstanceIds, deactivating, inactive); + fail("should complain"); + } catch(IllegalStateException e) { + // ok + } + try{ + new ClusterView(viewId, true, clusterViewId, instanceId, null, deactivating, inactive); + fail("should complain"); + } catch(IllegalStateException e) { + // ok + } + try{ + new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, null, inactive); + fail("should complain"); + } catch(Exception e) { + // ok + } + try{ + new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, null); + fail("should complain"); + } catch(Exception e) { + // ok + } + final Set nonEmptyDeactivating = new HashSet(); + nonEmptyDeactivating.add(3); + new ClusterView(viewId, false, clusterViewId, instanceId, instanceIds, nonEmptyDeactivating, inactive); + new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, nonEmptyDeactivating, inactive); + // should not complain about: + new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, inactive); + } + + @Test + public void testGetters() throws Exception { + final Integer viewId = 3; + final String clusterViewId = UUID.randomUUID().toString(); + final Set instanceIds = new HashSet(); + instanceIds.add(1); + final Integer instanceId = 2; + final Set deactivating = new HashSet(); + final Set inactive = new HashSet(); + final ClusterView cv = new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, inactive); + assertNotNull(cv); + assertTrue(cv.asDescriptorValue().length()>0); + assertTrue(cv.toString().length()>0); + } + + @Test + public void testOneActiveOnly() throws Exception { + String clusterViewId = UUID.randomUUID().toString(); + ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 21); + ClusterView view = builder.active(21).asView(); + + // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]} + JsonObject o = asJsonObject(view); + Map props = o.getProperties(); + assertEquals("10", props.get("seq")); + assertEquals(clusterViewId, unwrapString(props.get("id"))); + assertEquals("21", props.get("me")); + assertEquals(asJsonArray(21), props.get("active")); + assertEquals(asJsonArray(), props.get("deactivating")); + assertEquals(asJsonArray(), props.get("inactive")); + } + + @Test + public void testOneActiveOneInactive() throws Exception { + String clusterViewId = UUID.randomUUID().toString(); + ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2); + ClusterView view = builder.active(2).inactive(3).asView(); + + // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]} + JsonObject o = asJsonObject(view); + Map props = o.getProperties(); + assertEquals("10", props.get("seq")); + assertEquals(clusterViewId, unwrapString(props.get("id"))); + assertEquals("2", props.get("me")); + assertEquals(asJsonArray(2), props.get("active")); + assertEquals(asJsonArray(), props.get("deactivating")); + assertEquals(asJsonArray(3), props.get("inactive")); + } + + @Test + public void testSeveralActiveOneInactive() throws Exception { + String clusterViewId = UUID.randomUUID().toString(); + ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2); + ClusterView view = builder.active(2,5,6).inactive(3).asView(); + + // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]} + JsonObject o = asJsonObject(view); + Map props = o.getProperties(); + assertEquals("10", props.get("seq")); + assertEquals("true", props.get("final")); + assertEquals(clusterViewId, unwrapString(props.get("id"))); + assertEquals("2", props.get("me")); + assertEquals(asJsonArray(2,5,6), props.get("active")); + assertEquals(asJsonArray(), props.get("deactivating")); + assertEquals(asJsonArray(3), props.get("inactive")); + } + + @Test + public void testOneActiveSeveralInactive() throws Exception { + String clusterViewId = UUID.randomUUID().toString(); + ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2); + ClusterView view = builder.active(2).inactive(3,4,5,6).asView(); + + // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]} + JsonObject o = asJsonObject(view); + Map props = o.getProperties(); + assertEquals("10", props.get("seq")); + assertEquals("true", props.get("final")); + assertEquals(clusterViewId, unwrapString(props.get("id"))); + assertEquals("2", props.get("me")); + assertEquals(asJsonArray(2), props.get("active")); + assertEquals(asJsonArray(), props.get("deactivating")); + assertEquals(asJsonArray(3,4,5,6), props.get("inactive")); + } + + @Test + public void testWithRecoveringOnly() throws Exception { + String clusterViewId = UUID.randomUUID().toString(); + ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2); + ClusterView view = builder.active(2,3).recovering(4).inactive(5,6).asView(); + + JsonObject o = asJsonObject(view); + Map props = o.getProperties(); + assertEquals("10", props.get("seq")); + assertEquals("true", props.get("final")); + assertEquals(clusterViewId, unwrapString(props.get("id"))); + assertEquals("2", props.get("me")); + assertEquals(asJsonArray(2,3), props.get("active")); + assertEquals(asJsonArray(4), props.get("deactivating")); + assertEquals(asJsonArray(5,6), props.get("inactive")); + } + + @Test + public void testWithRecoveringAndBacklog() throws Exception { + String clusterViewId = UUID.randomUUID().toString(); + ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2); + ClusterView view = builder.active(2,3).recovering(4).inactive(5,6).backlogs(5).asView(); + + JsonObject o = asJsonObject(view); + Map props = o.getProperties(); + assertEquals("10", props.get("seq")); + assertEquals(clusterViewId, unwrapString(props.get("id"))); + assertEquals("2", props.get("me")); + assertEquals("false", props.get("final")); + assertEquals(asJsonArray(2,3), props.get("active")); + assertEquals(asJsonArray(4,5), props.get("deactivating")); + assertEquals(asJsonArray(6), props.get("inactive")); + } + + @Test + public void testBacklogButNotInactive() throws Exception { + String clusterViewId = UUID.randomUUID().toString(); + ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2); + try{ + ClusterView view = builder.active(2,3).backlogs(5).asView(); + fail("should complain"); + } catch(Exception ok) { + // ok + } + } + + private JsonObject asJsonObject(final ClusterView view) { + final String json = view.asDescriptorValue(); + System.out.println(json); + JsopTokenizer t = new JsopTokenizer(json); + t.read('{'); + JsonObject o = JsonObject.create(t); + return o; + } + + private String unwrapString(String stringWithQuotes) { + //TODO: I'm not really sure why the JsonObject parses this string including the " + // perhaps that's rather a bug .. + assertTrue(stringWithQuotes.startsWith("\"")); + assertTrue(stringWithQuotes.endsWith("\"")); + return stringWithQuotes.substring(1, stringWithQuotes.length()-1); + } + + static String asJsonArray(int... ids) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < ids.length; i++) { + int anId = ids[i]; + if (i!=0) { + sb.append(","); + } + sb.append(String.valueOf(anId)); + } + sb.append("]"); + return sb.toString(); + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java (working copy) @@ -0,0 +1,1002 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jcr.PropertyType; +import javax.jcr.Value; +import javax.jcr.ValueFormatException; + +import junitx.util.PrivateAccessor; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.Descriptors; +import org.apache.jackrabbit.oak.commons.json.JsonObject; +import org.apache.jackrabbit.oak.commons.json.JsopTokenizer; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.mongodb.DB; + +/** + * Tests for the DocumentDiscoveryLiteService + */ +public class DocumentDiscoveryLiteServiceTest { + + /** container for what should represent an instance, but is not a complete one, hence 'simplified'. + * it contains most importantly the DocuemntNodeStore and the discoveryLite service + */ + class SimplifiedInstance { + + private DocumentDiscoveryLiteService service; + private DocumentNodeStore ns; + private final Descriptors descriptors; + private Map registeredServices; + private final long lastRevInterval; + private volatile boolean lastRevStopped = false; + private volatile boolean writeSimulationStopped = false; + private Thread lastRevThread; + private Thread writeSimulationThread; + public String workingDir; + + SimplifiedInstance(DocumentDiscoveryLiteService service, + DocumentNodeStore ns, Descriptors descriptors, + Map registeredServices, long lastRevInterval, + String workingDir) { + this.service = service; + this.ns = ns; + this.workingDir = workingDir; + this.descriptors = descriptors; + this.registeredServices = registeredServices; + this.lastRevInterval = lastRevInterval; + if (lastRevInterval>0) { + startLastRevThread(); + } + } + + @Override + public String toString() { + return "SimplifiedInstance[cid="+ns.getClusterId()+"]"; + } + + private void startLastRevThread() { + lastRevStopped = false; + lastRevThread = new Thread(new Runnable() { + + @Override + public void run() { + while(!lastRevStopped) { + SimplifiedInstance.this.ns.getLastRevRecoveryAgent().performRecoveryIfNeeded(); + try { + Thread.sleep(SimplifiedInstance.this.lastRevInterval); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + }); + lastRevThread.setDaemon(true); + lastRevThread.setName("lastRevThread[cid="+ns.getClusterId()+"]"); + lastRevThread.start(); + } + + void stopLastRevThread() throws InterruptedException { + lastRevStopped = true; + lastRevThread.join(); + } + + boolean isFinal() throws Exception { + final JsonObject clusterViewObj = getClusterViewObj(); + if (clusterViewObj==null) { + throw new IllegalStateException("should always have that final flag set"); + } + + String finalStr = clusterViewObj.getProperties().get("final"); + + return Boolean.valueOf(finalStr); + } + + boolean hasActiveIds(String clusterViewStr, int... expected) throws Exception { + return hasIds(clusterViewStr, "active", expected); + } + + boolean hasDeactivatingIds(String clusterViewStr, int... expected) throws Exception { + return hasIds(clusterViewStr, "deactivating", expected); + } + + boolean hasInactiveIds(String clusterViewStr, int... expected) throws Exception { + return hasIds(clusterViewStr, "inactive", expected); + } + + private boolean hasIds(final String clusterViewStr, final String key, int... expectedIds) + throws Exception { + final JsonObject clusterViewObj = asJsonObject(clusterViewStr); + String actualIdsStr = clusterViewObj==null ? null : clusterViewObj.getProperties().get(key); + + boolean actualEmpty = actualIdsStr==null || actualIdsStr.length()==0 || actualIdsStr.equals("[]"); + boolean expectedEmpty = expectedIds==null || expectedIds.length==0; + + if (actualEmpty && expectedEmpty) { + return true; + } + if (actualEmpty != expectedEmpty) { + return false; + } + + final List actualList = Arrays.asList(ClusterViewDocument.csvToIntegerArray(actualIdsStr.substring(1, actualIdsStr.length()-1))); + if (expectedIds.length!=actualList.size()) { + return false; + } + for (int i = 0; i < expectedIds.length; i++) { + int anExpectedId = expectedIds[i]; + if (!actualList.contains(anExpectedId)) { + return false; + } + } + return true; + } + + JsonObject getClusterViewObj() throws Exception { + final String json = getClusterViewStr(); + return asJsonObject(json); + } + + private JsonObject asJsonObject(final String json) { + if (json==null) { + return null; + } + JsopTokenizer t = new JsopTokenizer(json); + t.read('{'); + JsonObject o = JsonObject.create(t); + return o; + } + + String getClusterViewStr() throws Exception { + return getDescriptor(DocumentDiscoveryLiteService.OAK_DISCOVERYLITE_CLUSTERVIEW); + } + + String getDescriptor(String key) throws Exception { + final Value value = descriptors.getValue(key); + if (value==null) { + return null; + } + if (value.getType()!=PropertyType.STRING) { + return null; + } + try{ + return value.getString(); + } catch(ValueFormatException vfe) { + return null; + } + } + + public void dispose() { + logger.info("Disposing "+this); + try { + stopSimulatingWrites(); + } catch (InterruptedException e) { + fail("interrupted"); + } + if (lastRevThread!=null) { + try{ + stopLastRevThread(); + } catch(InterruptedException ok) { + fail("interrupted"); + } + lastRevThread = null; + } + if (service!=null) { + service.deactivate(); + service = null; + } + if (ns!=null) { + ns.dispose(); + ns = null; + } + if (registeredServices!=null) { + registeredServices.clear(); + registeredServices = null; + } + } + + /** shutdown simulates the normal, graceful, shutdown + * @throws InterruptedException */ + public void shutdown() throws InterruptedException { + stopSimulatingWrites(); + stopLastRevThread(); + ns.dispose(); + service.deactivate(); + } + + /** crash simulates a kill -9, sort of + * @throws Throwable */ + public void crash() throws Throwable { + logger.info("crash: stopping simulating writes..."); + stopSimulatingWrites(); + logger.info("crash: stopping lastrev thread..."); + stopLastRevThread(); + logger.info("crash: stopped lastrev thread, now setting least to end within 1 sec"); + + boolean renewed = setLeaseTime(1000 /* 1 sec */); + if (!renewed) { + logger.info("halt"); + fail("did not renew clusterid lease"); + } + + logger.info("crash: now stopping background read/update"); + stopAllBackgroundThreads(); + // but don't do the following from DocumentNodeStore.dispose(): + // * don't do the last internalRunBackgroundUpdateOperations - as we're trying to simulate a crash here + // * don't dispose clusterNodeInfo to leave the node in active state + + // the DocumentDiscoveryLiteService currently can simply be deactivated, doesn't differ much from crashing + service.deactivate(); + logger.info("crash: crash simulation done."); + } + + /** + * very hacky way of doing the following: + * make sure this instance's clusterNodes entry is marked with a very short (1 sec off) lease end time + * so that the crash detection doesn't take a minute (as it would by default) + */ + private boolean setLeaseTime(final int leaseTime) + throws NoSuchFieldException { + ns.getClusterInfo().setLeaseTime(leaseTime); + PrivateAccessor.setField(ns.getClusterInfo(), "leaseEndTime", System.currentTimeMillis()+(leaseTime/2)); + boolean renewed = ns.renewClusterIdLease(); + return renewed; + } + + private AtomicBoolean getIsDisposed() throws NoSuchFieldException { + AtomicBoolean isDisposed = (AtomicBoolean) PrivateAccessor.getField(ns, "isDisposed"); + return isDisposed; + } + + private void stopAllBackgroundThreads() throws NoSuchFieldException { + // get all those background threads... + Thread backgroundReadThread = (Thread) PrivateAccessor.getField(ns, "backgroundReadThread"); + assertNotNull(backgroundReadThread); + Thread backgroundUpdateThread = (Thread) PrivateAccessor.getField(ns, "backgroundUpdateThread"); + assertNotNull(backgroundUpdateThread); + Thread leaseUpdateThread = (Thread) PrivateAccessor.getField(ns, "leaseUpdateThread"); + assertNotNull(leaseUpdateThread); + + // start doing what DocumentNodeStore.dispose() would do - except do it very fine controlled, basically: + // make sure to stop backgroundReadThread, backgroundUpdateThread and leaseUpdateThread + // but then nothing else. + final AtomicBoolean isDisposed = getIsDisposed(); + assertFalse(isDisposed.getAndSet(true)); + // notify background threads waiting on isDisposed + synchronized (isDisposed) { + isDisposed.notifyAll(); + } + try { + backgroundReadThread.join(5000); + assertTrue(!backgroundReadThread.isAlive()); + } catch (InterruptedException e) { + // ignore + } + try { + backgroundUpdateThread.join(5000); + assertTrue(!backgroundUpdateThread.isAlive()); + } catch (InterruptedException e) { + // ignore + } + try { + leaseUpdateThread.join(5000); + assertTrue(!leaseUpdateThread.isAlive()); + } catch (InterruptedException e) { + // ignore + } + } + + public void stopBgReadThread() throws NoSuchFieldException { + final Thread backgroundReadThread = (Thread) PrivateAccessor.getField(ns, "backgroundReadThread"); + assertNotNull(backgroundReadThread); + final Runnable bgReadRunnable = (Runnable) PrivateAccessor.getField(backgroundReadThread, "target"); + assertNotNull(bgReadRunnable); + final AtomicBoolean bgReadIsDisposed = new AtomicBoolean(false); + PrivateAccessor.setField(bgReadRunnable, "isDisposed", bgReadIsDisposed); + assertFalse(bgReadIsDisposed.getAndSet(true)); + try { + backgroundReadThread.join(5000); + assertTrue(!backgroundReadThread.isAlive()); + } catch (InterruptedException e) { + // ignore + } + // big of heavy work, but now the backgroundReadThread is stopped and all the others are still running + } + + public void addNode(String path) throws CommitFailedException { + NodeBuilder root = ns.getRoot().builder(); + NodeBuilder child = root; + String[] split = path.split("/"); + for(int i=1; i mks = Lists.newArrayList(); + private MemoryDocumentStore ds; + private MemoryBlobStore bs; + + final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private List allInstances = new LinkedList(); + + @Test + public void testActivateDeactivate() throws Exception { + // test the variant where we're not running with DocumentNodeStore + DocumentDiscoveryLiteService discoveryLite = new DocumentDiscoveryLiteService(); + ComponentContext c = mock(ComponentContext.class); + discoveryLite.activate(c); + verify(c, times(1)).disableComponent(DocumentDiscoveryLiteService.COMPONENT_NAME); + + // then test normal start with a DocumentNodeStore + DocumentMK mk1 = createMK(1, 0); + discoveryLite = new DocumentDiscoveryLiteService(); + PrivateAccessor.setField(discoveryLite, "nodeStore", mk1.nodeStore); + BundleContext bc = mock(BundleContext.class); + c = mock(ComponentContext.class); + when(c.getBundleContext()).thenReturn(bc); + discoveryLite.activate(c); + verify(c, times(0)).disableComponent(DocumentDiscoveryLiteService.COMPONENT_NAME); + discoveryLite.deactivate(); + } + + /** Borrowed from http://stackoverflow.com/questions/3301635/change-private-static-final-field-using-java-reflection */ + static void setFinalStatic(Field field, Object newValue) throws Exception { + field.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + field.set(null, newValue); + } + + // subsequent tests should get a DocumentDiscoveryLiteService setup from the start + private DocumentNodeStore createNodeStore(String workingDir) throws SecurityException, Exception { + // ensure that we always get a fresh cluster[node]id + System.setProperty("user.dir", workingDir); + setFinalStatic(ClusterNodeInfo.class.getDeclaredField("WORKING_DIR"), workingDir); + + // then create the DocumentNodeStore + DocumentMK mk1 = createMK(0 /* to make sure the clusterNodes collection is used **/, 500 /* asyncDelay: background interval*/); + + logger.info("createNodeStore: created DocumentNodeStore with cid="+mk1.nodeStore.getClusterId()+", workingDir="+workingDir); + return mk1.nodeStore; + } + + private SimplifiedInstance createInstance() throws Exception { + final String workingDir = UUID.randomUUID().toString(); + return createInstance(workingDir); + } + + private SimplifiedInstance createInstance(String workingDir) throws SecurityException, Exception { + DocumentNodeStore ns = createNodeStore(workingDir); + return createInstance(ns, workingDir); + } + private SimplifiedInstance createInstance(DocumentNodeStore ns, String workingDir) throws NoSuchFieldException { + DocumentDiscoveryLiteService discoveryLite = new DocumentDiscoveryLiteService(); + PrivateAccessor.setField(discoveryLite, "nodeStore", ns); + BundleContext bc = mock(BundleContext.class); + ComponentContext c = mock(ComponentContext.class); + when(c.getBundleContext()).thenReturn(bc); + final Map registeredServices = new HashMap(); + when(bc.registerService(anyString(), anyObject(), (Properties)anyObject())).then(new Answer() { + @Override + public ServiceRegistration answer(InvocationOnMock invocation) { + registeredServices.put((String) invocation.getArguments()[0], invocation.getArguments()[1]); + return null; + } + }); + discoveryLite.activate(c); + Descriptors d = (Descriptors) registeredServices.get(Descriptors.class.getName()); + final SimplifiedInstance result = new SimplifiedInstance(discoveryLite, ns, d, registeredServices, 500, workingDir); + allInstances.add(result); + logger.info("Created "+result); + return result; + } + + private void waitFor(Expectation expectation, int timeout, String msg) throws Exception { + final long tooLate = System.currentTimeMillis() + timeout; + while(true) { + final String fulfillmentResult = expectation.fulfilled(); + if (fulfillmentResult==null) { + // everything's fine + return; + } + if (System.currentTimeMillis()>tooLate) { + fail("expectation not fulfilled within "+timeout+"ms: "+msg+", fulfillment result: "+fulfillmentResult); + } + Thread.sleep(100); + } + } + + @Test + public void testOneNode() throws Exception { + final SimplifiedInstance s1 = createInstance(); + final ViewExpectation expectation = new ViewExpectation(s1); + expectation.setActiveIds(s1.ns.getClusterId()); + waitFor(expectation, 2000, "see myself as active"); + } + + @Test + public void testTwoNodesWithCleanShutdown() throws Exception { + final SimplifiedInstance s1 = createInstance(); + final SimplifiedInstance s2 = createInstance(); + final ViewExpectation expectation1 = new ViewExpectation(s1); + final ViewExpectation expectation2 = new ViewExpectation(s2); + expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId()); + expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId()); + waitFor(expectation1, 2000, "first should see both as active"); + waitFor(expectation2, 2000, "second should see both as active"); + + s2.shutdown(); + final ViewExpectation expectation1AfterShutdown = new ViewExpectation(s1); + expectation1AfterShutdown.setActiveIds(s1.ns.getClusterId()); + expectation1AfterShutdown.setInactiveIds(s2.ns.getClusterId()); + waitFor(expectation1AfterShutdown, 2000, "first should only see itself after shutdown"); + } + + @Test + public void testTwoNodesWithCrash() throws Throwable { + final SimplifiedInstance s1 = createInstance(); + final SimplifiedInstance s2 = createInstance(); + final ViewExpectation expectation1 = new ViewExpectation(s1); + final ViewExpectation expectation2 = new ViewExpectation(s2); + expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId()); + expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId()); + waitFor(expectation1, 2000, "first should see both as active"); + waitFor(expectation2, 2000, "second should see both as active"); + + s2.crash(); + + final ViewExpectation expectation1AfterShutdown = new ViewExpectation(s1); + expectation1AfterShutdown.setActiveIds(s1.ns.getClusterId()); + expectation1AfterShutdown.setInactiveIds(s2.ns.getClusterId()); + waitFor(expectation1AfterShutdown, 2000, "first should only see itself after shutdown"); + } + + @Test + public void testTwoNodesWithCrashAndLongduringRecovery() throws Throwable { + doTestTwoNodesWithCrashAndLongduringDeactivation(false); + } + + @Test + public void testTwoNodesWithCrashAndLongduringRecoveryAndBacklog() throws Throwable { + doTestTwoNodesWithCrashAndLongduringDeactivation(true); + } + + void doTestTwoNodesWithCrashAndLongduringDeactivation(boolean withBacklog) throws Throwable { + final int TEST_WAIT_TIMEOUT = 10000; + final SimplifiedInstance s1 = createInstance(); + final SimplifiedInstance s2 = createInstance(); + final ViewExpectation expectation1 = new ViewExpectation(s1); + final ViewExpectation expectation2 = new ViewExpectation(s2); + expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId()); + expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId()); + waitFor(expectation1, TEST_WAIT_TIMEOUT, "first should see both as active"); + waitFor(expectation2, TEST_WAIT_TIMEOUT, "second should see both as active"); + + // before crashing s2, make sure that s1's lastRevRecovery thread doesn't run + s1.stopLastRevThread(); + if (withBacklog) { + // plus also stop s1's backgroundReadThread - in case we want to test backlog handling + s1.stopBgReadThread(); + + // and then, if we want to do backlog testing, then s2 should write something + // before it crashes, so here it comes: + s2.addNode("/foo/bar"); + s2.setProperty("/foo/bar", "prop", "value"); + } + + // then crash s2 + s2.crash(); + + // then wait 2 sec + Thread.sleep(2000); + + // at this stage, while s2 has crashed, we have stopped s1's lastRevRecoveryThread, so we should still see both as active + logger.info(s1.getClusterViewStr()); + final ViewExpectation expectation1AfterCrashBeforeLastRevRecovery = new ViewExpectation(s1); + expectation1AfterCrashBeforeLastRevRecovery.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId()); + waitFor(expectation1AfterCrashBeforeLastRevRecovery, TEST_WAIT_TIMEOUT, "first should still see both as active"); + + // the next part is a bit tricky: we want to fine-control the lastRevRecoveryThread's acquire/release locking. + // the chosen way to do this is to make heavy use of mockito and two semaphores: + // when acquireRecoveryLock is called, that thread should wait for the waitBeforeLocking semaphore to be released + final MissingLastRevSeeker missingLastRevUtil = + (MissingLastRevSeeker) PrivateAccessor.getField(s1.ns.getLastRevRecoveryAgent(), "missingLastRevUtil"); + assertNotNull(missingLastRevUtil); + MissingLastRevSeeker mockedLongduringMissingLastRevUtil = mock(MissingLastRevSeeker.class, delegatesTo(missingLastRevUtil)); + final Semaphore waitBeforeLocking = new Semaphore(0); + when(mockedLongduringMissingLastRevUtil.acquireRecoveryLock(anyInt())).then(new Answer() { + + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + logger.info("going to waitBeforeLocking"); + waitBeforeLocking.acquire(); + logger.info("done with waitBeforeLocking"); + return missingLastRevUtil.acquireRecoveryLock((Integer)invocation.getArguments()[0]); + } + }); + PrivateAccessor.setField(s1.ns.getLastRevRecoveryAgent(), "missingLastRevUtil", mockedLongduringMissingLastRevUtil); + + // so let's start the lastRevThread again and wait for that waitBeforeLocking semaphore to be hit + s1.startLastRevThread(); + waitFor(new Expectation() { + + @Override + public String fulfilled() throws Exception { + if (!waitBeforeLocking.hasQueuedThreads()) { + return "no thread queued"; + } + return null; + } + + }, TEST_WAIT_TIMEOUT, "lastRevRecoveryThread should acquire a lock"); + + // at this stage the crashed s2 is still not in recovery mode, so let's check: + logger.info(s1.getClusterViewStr()); + final ViewExpectation expectation1AfterCrashBeforeLastRevRecoveryLocking = new ViewExpectation(s1); + expectation1AfterCrashBeforeLastRevRecoveryLocking.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId()); + waitFor(expectation1AfterCrashBeforeLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see both as active"); + + // one thing, before we let the waitBeforeLocking go, setup the release semaphore/mock: + final Semaphore waitBeforeUnlocking = new Semaphore(0); + Mockito.doAnswer(new Answer() { + public Void answer(InvocationOnMock invocation) throws InterruptedException { + logger.info("Going to waitBeforeUnlocking"); + waitBeforeUnlocking.acquire(); + logger.info("Done with waitBeforeUnlocking"); + missingLastRevUtil.releaseRecoveryLock((Integer)invocation.getArguments()[0]); + return null; + } + }).when(mockedLongduringMissingLastRevUtil).releaseRecoveryLock(anyInt()); + + // let go (or tschaedere loh) + waitBeforeLocking.release(); + + // then, right after we let the waitBeforeLocking semaphore go, we should see s2 in recovery mode + final ViewExpectation expectation1AfterCrashWhileLastRevRecoveryLocking = new ViewExpectation(s1); + expectation1AfterCrashWhileLastRevRecoveryLocking.setActiveIds(s1.ns.getClusterId()); + expectation1AfterCrashWhileLastRevRecoveryLocking.setDeactivatingIds(s2.ns.getClusterId()); + waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering"); + + // ok, meanwhile, the lastRevRecoveryAgent should have hit the ot + waitFor(new Expectation() { + + @Override + public String fulfilled() throws Exception { + if (!waitBeforeUnlocking.hasQueuedThreads()) { + return "no thread queued"; + } + return null; + } + + }, TEST_WAIT_TIMEOUT, "lastRevRecoveryThread should want to release a lock"); + + // so then, we should still see the same state + waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering"); + + logger.info("Waiting 1,5sec"); + Thread.sleep(1500); + logger.info("Waiting done"); + + // first, lets check to see what the view looks like - should be unchanged: + waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering"); + + // let waitBeforeUnlocking go + logger.info("releasing waitBeforeUnlocking, state: "+s1.getClusterViewStr()); + waitBeforeUnlocking.release(); + logger.info("released waitBeforeUnlocking"); + + if (!withBacklog) { + final ViewExpectation expectationWithoutBacklog = new ViewExpectation(s1); + expectationWithoutBacklog.setActiveIds(s1.ns.getClusterId()); + expectationWithoutBacklog.setInactiveIds(s2.ns.getClusterId()); + waitFor(expectationWithoutBacklog, TEST_WAIT_TIMEOUT, "finally we should see s2 as completely inactive"); + } else { + // wait just 2 sec to see if the bgReadThread is really stopped + logger.info("sleeping 2 sec"); + Thread.sleep(2000); + logger.info("sleeping 2 sec done, state: "+s1.getClusterViewStr()); + + // when that's the case, check the view - it should now be in a special 'final=false' mode + final ViewExpectation expectationBeforeBgRead = new ViewExpectation(s1); + expectationBeforeBgRead.setActiveIds(s1.ns.getClusterId()); + expectationBeforeBgRead.setDeactivatingIds(s2.ns.getClusterId()); + expectationBeforeBgRead.setFinal(false); + waitFor(expectationBeforeBgRead, TEST_WAIT_TIMEOUT, "first should only see itself after shutdown"); + + // ook, now we explicitly do a background read to get out of the backlog situation + s1.ns.runBackgroundReadOperations(); + + final ViewExpectation expectationAfterBgRead = new ViewExpectation(s1); + expectationAfterBgRead.setActiveIds(s1.ns.getClusterId()); + expectationAfterBgRead.setInactiveIds(s2.ns.getClusterId()); + waitFor(expectationAfterBgRead, TEST_WAIT_TIMEOUT, "finally we should see s2 as completely inactive"); + } + } + + /** This test creates a large number of documentnodestores which it starts, runs, stops + * in a random fashion, always testing to make sure the clusterView is correct + */ + @Test + public void testLargeStartStopFiesta() throws Throwable { + final List instances = new LinkedList(); + final Map inactiveIds = new HashMap(); + final Random random = new Random(); + final int LOOP_CNT = 50; // with too many loops have also seen mongo connections becoming starved thus test failed + final int CHECK_EVERY = 3; + final int MAX_NUM_INSTANCES = 8; + for(int i=0; i0) { + logger.info("Case 0 - reactivating an instance..."); + final int n = random.nextInt(inactiveIds.size()); + final Integer cid = new LinkedList(inactiveIds.keySet()).get(n); + final String reactivatedWorkingDir = inactiveIds.remove(cid); + if (reactivatedWorkingDir==null) { + fail("reactivatedWorkingDir null for n="+n+", cid="+cid+", other inactives: "+inactiveIds); + } + assertNotNull(reactivatedWorkingDir); + logger.info("Case 0 - reactivated instance "+cid+", workingDir="+reactivatedWorkingDir); + workingDir = reactivatedWorkingDir; + logger.info("Case 0: creating instance"); + final SimplifiedInstance newInstance = createInstance(workingDir); + newInstance.setLeastTimeout(5000); + newInstance.startSimulatingWrites(500); + logger.info("Case 0: created instance: "+newInstance.ns.getClusterId()); + if (newInstance.ns.getClusterId()!=cid) { + logger.info("Case 0: reactivated instance did not take over cid - probably a testing artifact. expected cid: {}, actual cid: {}", cid, newInstance.ns.getClusterId()); + inactiveIds.put(cid, reactivatedWorkingDir); + // remove the newly reactivated from the inactives - although it shouldn't be there, it might! + inactiveIds.remove(newInstance.ns.getClusterId()); + } + instances.add(newInstance); + } + break; + } + case 1: { + // creates a new instance + if (instances.size()1) { + // before shutting down: make sure we have a stable view (we could otherwise not correctly startup too) + checkFiestaState(instances, inactiveIds.keySet()); + final SimplifiedInstance instance = instances.remove(random.nextInt(instances.size())); + assertNotNull(instance.workingDir); + logger.info("Case 3: Shutdown instance: "+instance.ns.getClusterId()); + inactiveIds.put(instance.ns.getClusterId(), instance.workingDir); + instance.shutdown(); + } + break; + } + case 4: { + // crash instance + if (instances.size()>1) { + // before crashing make sure we have a stable view (we could otherwise not correctly startup too) + checkFiestaState(instances, inactiveIds.keySet()); + final SimplifiedInstance instance = instances.remove(random.nextInt(instances.size())); + assertNotNull(instance.workingDir); + logger.info("Case 4: Crashing instance: "+instance.ns.getClusterId()); + inactiveIds.put(instance.ns.getClusterId(), instance.workingDir); + instance.addNode("/"+instance.ns.getClusterId()+"/stuffForRecovery/"+random.nextInt(10000)); + instance.crash(); + } + break; + } + } + } + } + + private void dumpChildren(DocumentNodeState root) { + logger.info("testEmptyParentRecovery: root: "+root); + Iterator it = root.getChildNodeNames().iterator(); + while(it.hasNext()) { + String n = it.next(); + logger.info("testEmptyParentRecovery: a child: '"+n+"'"); + } + } + + private void checkFiestaState(final List instances, Set inactiveIds) throws Exception { + final List activeIds = new LinkedList(); + for (Iterator it = instances.iterator(); it.hasNext();) { + SimplifiedInstance anInstance = it.next(); + activeIds.add(anInstance.ns.getClusterId()); + } + for (Iterator it = instances.iterator(); it.hasNext();) { + SimplifiedInstance anInstance = it.next(); + + final ViewExpectation e = new ViewExpectation(anInstance); + e.setActiveIds(activeIds.toArray(new Integer[activeIds.size()])); + e.setInactiveIds(inactiveIds.toArray(new Integer[inactiveIds.size()])); + waitFor(e, 20000, "checkFiestaState failed for "+anInstance+", with instances: "+instances+", and inactiveIds: "+inactiveIds); + } + } + + @Before + @After + public void clear() { + for(SimplifiedInstance i : allInstances) { + i.dispose(); + } + for (DocumentMK mk : mks) { + mk.dispose(); + } + mks.clear(); + if (MONGO_DB) { + DB db = MongoUtils.getConnection().getDB(); + MongoUtils.dropCollections(db); + } + } + + private DocumentMK createMK(int clusterId, int asyncDelay) { + if (MONGO_DB) { + DB db = MongoUtils.getConnection().getDB(); + return register(new DocumentMK.Builder().setMongoDB(db) + .setLeaseCheck(false) + .setClusterId(clusterId).setAsyncDelay(asyncDelay).open()); + } else { + if (ds == null) { + ds = new MemoryDocumentStore(); + } + if (bs == null) { + bs = new MemoryBlobStore(); + } + return createMK(clusterId, asyncDelay, ds, bs); + } + } + + private DocumentMK createMK(int clusterId, int asyncDelay, + DocumentStore ds, BlobStore bs) { + return register(new DocumentMK.Builder().setDocumentStore(ds) + .setBlobStore(bs).setClusterId(clusterId) + .setLeaseCheck(false) + .setAsyncDelay(asyncDelay).open()); + } + + private DocumentMK register(DocumentMK mk) { + mks.add(mk); + return mk; + } + +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property