it = inactiveIds.iterator(); it
+ .hasNext();) {
+ Integer anInstance = it.next();
+ builder.value(anInstance);
+ }
+ builder.endArray();
+ builder.endObject();
+ return builder.toString();
+ }
+
+ /** Debugging toString() **/
+ @Override
+ public String toString() {
+ return "a ClusterView["+json+"]";
+ }
+
+ /** This is the main getter that will be polled via JMX **/
+ String asDescriptorValue() {
+ return json;
+ }
+
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java (revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java (working copy)
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the document stored in the settings collection containing
+ * a 'cluster view'.
+ *
+ * A 'cluster view' is the state of the membership of instances that are
+ * or have all been connected to the same oak repository.
+ * The 'cluster view' is maintained by all instances in the cluster
+ * concurrently - the faster one wins. Its information is derived
+ * from the clusterNodes collection. From there the following three
+ * states are derived and instances are grouped into these:
+ *
+ * Active: an instance is active and has no recoveryLock is currently
+ * acquired. The lease timeout is ignored. When the lease times out, this
+ * is noticed by one of the instances at some point and a recovery
+ * is started, at which point the instance transitions from 'Active'
+ * to 'Recovering'.
+ * Recovering: an instance that was active but currently has the
+ * recoveryLock acquired by one of the instances.
+ * Inactive: an instance is not set to active (in which case
+ * the recoveryLock is never set)
+ *
+ *
+ * Note that the states managed in this ClusterViewDocument differs
+ * from the one from ClusterView - since ClusterView also manages
+ * the fact that after a recovery of a crashed instance there could
+ * be a 'backlog' of changes which it doesn't yet see until a
+ * background read is performed.
+ */
+class ClusterViewDocument {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClusterViewDocument.class);
+
+ /** the id of this document is always 'clusterView' **/
+ private static final String CLUSTERVIEW_DOC_ID = "clusterView";
+
+ // keys that we store in the root document - and in the history
+ /** document key that stores the stable id of the cluster (will never change)
+ * (Note: a better term would have been just clusterId -
+ * but that one is already occupied with what should actually be called clusterNodeId or just nodeId) **/
+ static final String CLUSTER_VIEW_ID_KEY = "clusterViewId";
+
+ /** document key that stores the monotonically incrementing sequence number of the cluster view. Any update will increase this by 1 **/
+ static final String VIEW_SEQ_NUM_KEY = "seqNum";
+
+ /** document key that stores the comma-separated list of active instance ids **/
+ static final String ACTIVE_KEY = "active";
+
+ /** document key that stores the comma-separated list of inactive instance ids
+ * (they might still have a backlog, that is handled in ClusterView though, never persisted */
+ static final String INACTIVE_KEY = "inactive";
+
+ /** document key that stores the comma-separated list of recovering instance ids **/
+ static final String RECOVERING_KEY = "recovering";
+
+ /** document key that stores the date and time when this view was created - for debugging purpose only **/
+ private static final String CREATED_KEY = "created";
+
+ /** document key that stores the id of the instance that created this view - for debugging purpose only **/
+ private static final String CREATOR_KEY = "creator";
+
+ /** document key that stores the date and time when this was was retired - for debugging purpose only **/
+ private static final String RETIRED_KEY = "retired";
+
+ /** document key that stores the id of the instance that retired this view - for debugging purpose only **/
+ private static final String RETIRER_KEY = "retirer";
+
+ /** document key that stores a short, limited history of previous cluster views - for debugging purpose only **/
+ private static final String CLUSTER_VIEW_HISTORY_KEY = "clusterViewHistory";
+
+ /** the format used when storing date+time **/
+ private static final DateFormat standardDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ /** number of elements kept in the CLUSTERVIEW_HISTORY_KEY field **/
+ private static final int HISTORY_LIMIT = 10;
+
+ /** the monotonically incrementing sequence number of this cluster view **/
+ private final long viewSeqNum;
+
+ /** the stable id of this cluster **/
+ private final String clusterViewId;
+
+ /** the ids of instances that are active at this moment **/
+ private final Integer[] activeIds;
+
+ /** the ids of instances that are recovering (lastRev-recovery) at this moment **/
+ private final Integer[] recoveringIds;
+
+ /** the ids of instances that are inactive at this moment **/
+ private final Integer[] inactiveIds;
+
+ /** the short, limited history of previous cluster views, for debugging only **/
+ private final Map viewHistory;
+
+ /** the date+time at which this view was created, for debugging only **/
+ private final String createdAt;
+
+ /** the id of the instance that created this view, for debugging only **/
+ private final Integer createdBy;
+
+ /**
+ * Main method by which the ClusterViewDocument is updated in the settings collection
+ * @return the resulting ClusterViewDocument as just updated in the settings collection - or null
+ * if another instance was updating the clusterview concurrently (in which case the caller
+ * should re-read first and possibly re-update if needed)
+ */
+ static ClusterViewDocument readOrUpdate(DocumentNodeStore documentNodeStore,
+ Set activeIds, Set recoveringIds, Set inactiveIds) {
+ logger.trace("readOrUpdate: expected: activeIds: {}, recoveringIds: {}, inactiveIds: {}",
+ activeIds, recoveringIds, inactiveIds);
+ if (activeIds==null || activeIds.size()==0) {
+ logger.info("readOrUpdate: activeIds must not be null or empty");
+ throw new IllegalStateException("activeIds must not be null or empty");
+ }
+ final int localClusterId = documentNodeStore.getClusterId();
+
+ final ClusterViewDocument previousView = doRead(documentNodeStore);
+ if (previousView!=null) {
+ if (previousView.matches(activeIds, recoveringIds, inactiveIds)) {
+ logger.trace("readOrUpdate: view unchanged, returning: {}", previousView);
+ return previousView;
+ }
+ }
+ logger.trace("readOrUpdate: view change detected, going to update from {} to activeIds: {}, recoveringIds: {}, inactiveIds: {}",
+ previousView, activeIds, recoveringIds, inactiveIds);
+ final UpdateOp updateOp = new UpdateOp(CLUSTERVIEW_DOC_ID, true);
+ final Date now = new Date();
+ updateOp.set(ACTIVE_KEY, setToCsv(activeIds));
+ updateOp.set(RECOVERING_KEY, setToCsv(recoveringIds));
+ updateOp.set(INACTIVE_KEY, setToCsv(inactiveIds));
+ updateOp.set(CREATED_KEY, standardDateFormat.format(now));
+ updateOp.set(CREATOR_KEY, localClusterId);
+ Map historyMap = new HashMap();
+ if (previousView!=null) {
+ Map previousHistory = previousView.getHistory();
+ if (previousHistory!=null) {
+ historyMap.putAll(previousHistory);
+ }
+
+ historyMap.put(Revision.newRevision(localClusterId).toString(),
+ asHistoryEntry(previousView, localClusterId, now));
+ }
+ applyHistoryLimit(historyMap);
+ updateOp.set(CLUSTER_VIEW_HISTORY_KEY, historyMap);
+
+ final Long newViewSeqNum;
+ if (previousView==null) {
+ // we are the first ever, looks like, that the clusterview is defined
+ // so we can use viewId==1 and we make sure no other cluster node
+ // tries to create this first one simultaneously - so we use
+ // 'create'
+
+ // going via 'create' requires ID to be set again (not only in new UpdateOp(id,isNew)):
+ updateOp.set(Document.ID, CLUSTERVIEW_DOC_ID);
+ final ArrayList updateOps = new ArrayList();
+ newViewSeqNum=1L;
+ updateOp.setNew(true); // paranoia as that's already set above
+ updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
+ // first view ever => choose a new unique clusterViewId
+ final String clusterViewId = UUID.randomUUID().toString();
+ updateOp.set(CLUSTER_VIEW_ID_KEY, clusterViewId);
+ updateOps.add(updateOp);
+ logger.debug("updateAndRead: trying to create the first ever clusterView - hence {}={} and {}={}", VIEW_SEQ_NUM_KEY, newViewSeqNum, CLUSTER_VIEW_ID_KEY, clusterViewId);
+ if (!documentNodeStore.getDocumentStore().create(Collection.SETTINGS, updateOps)) {
+ logger.debug("updateAndRead: someone else just created the first view ever while I tried - reread that one later");
+ return null;
+ }
+ } else {
+ // there were earlier clusterViews (the normal case) - thus we
+ // use 'findAndUpdate' with the condition that
+ // the view id is still at the previousview one
+ final Long previousViewSeqNum = previousView.getViewSeqNum();
+ updateOp.setNew(false); // change to false from true above
+ updateOp.equals(VIEW_SEQ_NUM_KEY, null, previousViewSeqNum);
+ newViewSeqNum = previousViewSeqNum+1;
+ updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
+ logger.debug("updateAndRead: trying to update the clusterView to {}={} ",VIEW_SEQ_NUM_KEY,newViewSeqNum);
+ if (documentNodeStore.getDocumentStore().findAndUpdate(Collection.SETTINGS, updateOp)==null) {
+ logger.debug("updateAndRead: someone else just updated the view which I wanted to do as well - reread that one later");
+ return null;
+ }
+ }
+
+ // whatever the outcome of the above - we don't care -
+ // re-reading will in any case definitely show what has been persisted
+ // and if the re-read view contains the same id, it is what we have written
+ // - otherwise someone else came in between and we have to step back and retry
+ final ClusterViewDocument readResult = doRead(documentNodeStore);
+ if (readResult==null) {
+ logger.debug("updateAndRead: got null from read - whatever the exact reason, we must retry in a moment.");
+ return null;
+ } else if (newViewSeqNum.equals(readResult.getViewSeqNum())) {
+ logger.debug("updateAndRead: matching view - no change");
+ return readResult;
+ } else {
+ logger.debug("updateAndRead: someone else in the cluster was updating right after I also succeeded - re-read in a bit");
+ return null;
+ }
+ }
+
+ /**
+ * Pruning method that makes sure the history never gets larger than HISTORY_LIMIT
+ * @param historyMap the pruning is done directly on this map
+ */
+ private static void applyHistoryLimit(Map historyMap) {
+ while (historyMap.size()>HISTORY_LIMIT) {
+ // remove the oldest
+ String oldestRevision = null;
+ for (Iterator it = historyMap.keySet().iterator(); it.hasNext();) {
+ final Object obj = it.next();
+ String r;
+ if (obj instanceof Revision) {
+ r = ((Revision)obj).toString();
+ } else {
+ r = (String) obj;
+ }
+ if (oldestRevision==null) {
+ oldestRevision = r;
+ } else if (Revision.getTimestampDifference(Revision.fromString(r), Revision.fromString(oldestRevision))<0) {
+ oldestRevision = r;
+ }
+ }
+ if (oldestRevision==null) {
+ break;
+ } else {
+ if (historyMap.remove(oldestRevision)==null) {
+ if (historyMap.remove(Revision.fromString(oldestRevision))==null) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /** Converts a previous clusterView document into a history 'string' **/
+ private static String asHistoryEntry(final ClusterViewDocument previousView, int retiringClusterNodeId, Date retireTime) {
+ String h;
+ JsopBuilder b = new JsopBuilder();
+ b.object();
+ b.key(VIEW_SEQ_NUM_KEY);
+ b.value(previousView.getViewSeqNum());
+ b.key(CREATED_KEY);
+ b.value(String.valueOf(previousView.getCreatedAt()));
+ b.key(CREATOR_KEY);
+ b.value(previousView.getCreatedBy());
+ b.key(RETIRED_KEY);
+ b.value(String.valueOf(standardDateFormat.format(retireTime)));
+ b.key(RETIRER_KEY);
+ b.value(retiringClusterNodeId);
+ b.key(ACTIVE_KEY);
+ b.value(arrayToCsv((Integer[])previousView.getActiveIds().toArray(new Integer[0])));
+ b.key(RECOVERING_KEY);
+ b.value(arrayToCsv((Integer[])previousView.getRecoveringIds().toArray(new Integer[0])));
+ b.key(INACTIVE_KEY);
+ b.value(arrayToCsv((Integer[])previousView.getInactiveIds().toArray(new Integer[0])));
+ b.endObject();
+ h = b.toString();
+ return h;
+ }
+
+ /** helper method to convert a set to a comma-separated string (without using toString() for safety) **/
+ private static String setToCsv(Set ids) {
+ if (ids==null || ids.size()==0) {
+ return null;
+ }
+ StringBuffer sb = new StringBuffer();
+ Iterator it = ids.iterator();
+ while(it.hasNext()) {
+ final Integer id = it.next();
+ if (sb.length()!=0) {
+ sb.append(",");
+ }
+ sb.append(id);
+ }
+ return sb.toString();
+ }
+
+ /** helper method to convert an array to a comma-separated string **/
+ static String arrayToCsv(Integer[] arr) {
+ if (arr==null || arr.length==0) {
+ return null;
+ } else if (arr.length==1) {
+ return String.valueOf(arr[0]);
+ }
+ StringBuffer sb = new StringBuffer();
+ sb.append(String.valueOf(arr[0]));
+ for (int i = 1; i < arr.length; i++) {
+ final Object a = arr[i];
+ sb.append(",");
+ sb.append(String.valueOf(a));
+ }
+ return sb.toString();
+ }
+
+ /** inverse helper method which converts a comma-separated string into an integer array **/
+ static Integer[] csvToIntegerArray(String csv) {
+ if (csv==null) {
+ return null;
+ }
+ String[] split = csv.split(",");
+ Integer[] result = new Integer[split.length];
+ for (int i = 0; i < split.length; i++) {
+ result[i] = Integer.parseInt(split[i]);
+ }
+ return result;
+ }
+
+ /** internal reader of an existing clusterView document from the settings collection **/
+ private static ClusterViewDocument doRead(DocumentNodeStore documentNodeStore) {
+ final DocumentStore documentStore = documentNodeStore.getDocumentStore();
+ final Document doc = documentStore.find(Collection.SETTINGS, "clusterView", -1 /* -1; avoid caching */);
+ if (doc==null) {
+ return null;
+ } else {
+ final ClusterViewDocument clusterView = new ClusterViewDocument(doc);
+ if (clusterView.isValid()) {
+ return clusterView;
+ } else {
+ logger.warn("read: clusterView document is not valid: "+doc.format());
+ return null;
+ }
+ }
+ }
+
+ /** comparison helper that compares an integer array with a set **/
+ static boolean matches(Integer[] expected, Set actual) {
+ final boolean expectedIsEmpty = expected==null || expected.length==0;
+ final boolean actualIsEmpty = actual==null || actual.size()==0;
+ if (expectedIsEmpty && actualIsEmpty) {
+ // if both are null or empty, they match
+ return true;
+ }
+ if (expectedIsEmpty!=actualIsEmpty) {
+ // if one of them is only empty, but the other not, then they don't match
+ return false;
+ }
+ if (expected.length!=actual.size()) {
+ // different size
+ return false;
+ }
+ for (int i = 0; i < expected.length; i++) {
+ Integer aMemberId = expected[i];
+ if (!actual.contains(aMemberId)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ ClusterViewDocument(Document doc) {
+ if (doc==null) {
+ throw new IllegalArgumentException("doc must not be null");
+ }
+ this.clusterViewId = (String) doc.get(CLUSTER_VIEW_ID_KEY);
+ this.viewSeqNum = (Long) doc.get(VIEW_SEQ_NUM_KEY);
+ this.createdAt = (String) doc.get(CREATED_KEY);
+ this.createdBy = (Integer) doc.get(CREATOR_KEY);
+
+ final Object obj = doc.get(ACTIVE_KEY);
+ if (obj==null || !(obj instanceof String)) {
+ logger.trace(": {} : {}", ACTIVE_KEY, obj);
+ this.activeIds = new Integer[0];
+ } else {
+ this.activeIds = csvToIntegerArray((String) obj);
+ }
+
+ final Object obj2 = doc.get(RECOVERING_KEY);
+ if (obj2==null || !(obj2 instanceof String)) {
+ logger.trace(": {} : {}", RECOVERING_KEY, obj2);
+ this.recoveringIds= new Integer[0];
+ } else {
+ this.recoveringIds = csvToIntegerArray((String) obj2);
+ }
+
+ final Object obj3 = doc.get(INACTIVE_KEY);
+ if (obj3==null || !(obj3 instanceof String)) {
+ logger.trace(": {} : {}", INACTIVE_KEY, obj3);
+ this.inactiveIds = new Integer[0];
+ } else {
+ this.inactiveIds = csvToIntegerArray((String) obj3);
+ }
+
+ final Object obj4 = doc.get(CLUSTER_VIEW_HISTORY_KEY);
+ if (obj4==null || !(obj4 instanceof Map)) {
+ logger.trace(" viewHistory is null");
+ this.viewHistory = null;
+ } else {
+ this.viewHistory = ((Map)obj4);
+ }
+ }
+
+ /** Returns the set of active ids of this cluster view **/
+ Set getActiveIds() {
+ return new HashSet(Arrays.asList(activeIds));
+ }
+
+ /** Returns the set of recovering ids of this cluster view **/
+ Set getRecoveringIds() {
+ return new HashSet(Arrays.asList(recoveringIds));
+ }
+
+ /** Returns the set of inactive ids of this cluster view **/
+ Set getInactiveIds() {
+ return new HashSet(Arrays.asList(inactiveIds));
+ }
+
+ /** Returns the history map **/
+ private Map getHistory() {
+ return viewHistory;
+ }
+
+ @Override
+ public String toString() {
+ return "a ClusterView[valid="+isValid()+", viewSeqNum="+viewSeqNum+", clusterViewId="+clusterViewId+
+ ", activeIds="+arrayToCsv(activeIds)+", recoveringIds="+arrayToCsv(recoveringIds)+
+ ", inactiveIds="+arrayToCsv(inactiveIds)+"]";
+ }
+
+ boolean isValid() {
+ return viewSeqNum>=0 && activeIds!=null && activeIds.length>0;
+ }
+
+ /** Returns the date+time when this view was created, for debugging purpose only **/
+ String getCreatedAt() {
+ return createdAt;
+ }
+
+ /** Returns the id of the instance that created this view, for debugging purpose only **/
+ int getCreatedBy() {
+ return createdBy;
+ }
+
+ /** Returns the monotonically incrementing sequenece number of this view **/
+ long getViewSeqNum() {
+ return viewSeqNum;
+ }
+
+ /** Returns a UUID representing this cluster - will never change, propagates from view to view **/
+ String getClusterViewId() {
+ return clusterViewId;
+ }
+
+ private boolean matches(Set activeIds, Set recoveringIds, Set inactiveIds) {
+ if (!matches(this.activeIds, activeIds)) {
+ return false;
+ }
+ if (!matches(this.recoveringIds, recoveringIds)) {
+ return false;
+ }
+ if (!matches(this.inactiveIds, inactiveIds)) {
+ return false;
+ }
+ return true;
+ }
+
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java (revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java (working copy)
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.jcr.Value;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.commons.SimpleValueFactory;
+import org.apache.jackrabbit.oak.api.Descriptors;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The DocumentDiscoveryLiteService is taking care of providing a
+ * repository descriptor that contains the current cluster-view details.
+ *
+ * The clusterView is provided via a repository descriptor (see
+ * OAK_DISCOVERYLITE_CLUSTERVIEW)
+ *
+ * The cluster-view lists all instances (ever) known in the cluster
+ * in one of the following states:
+ *
+ * active: the instance is currently running and has an up-to-date
+ * lease
+ * deactivating: the instance failed to update the lease recently
+ * thus a recovery is happening - or it has just finished and the local
+ * instance is yet to do a backgroundRead before it has finished
+ * reading the crashed/shutdown instance's last changes
+ * inactive: the instance is currently not running and all its
+ * changes have been seen by the local instance
+ *
+ *
+ * Additionally, the cluster-view is assigned a monotonically increasing
+ * sequence number to. This sequence number is persisted, thus all
+ * instances in the cluster will show the same sequence number for
+ * a particular cluster-view in time.
+ *
+ * Note that the 'deactivating' state might be hiding some complexity
+ * that is deliberately not shown: for the documentNS the state
+ * 'deactivating' consists of two substates: 'recovering' as in
+ * _lastRevs are updated, and 'backlog processing' for a pending
+ * backgroundRead to get the latest head state of a crashed/shutdown
+ * instance. So when an instance is in 'deactivating' state, it is
+ * not indicated via the cluster-view whether it is recovering or
+ * has backlog to process. However, the fact that an instance has
+ * to yet do a backgroundRead to get changes is a per-instance
+ * story: other instances might already have done the backgroundRead
+ * and thus no longer have a backlog for the instance(s) that left.
+ * Even though 'deactivating' therefore is dependent on the instance
+ * you get the information from, the cluster-view must have a
+ * sequence number that uniquely identifies it in the cluster.
+ * These two constraints conflict. As a simple solution to handle
+ * this case nevertheless, the 'final' flag has been introduced:
+ * the cluster-view has this flag 'final' set to true when the
+ * view is final and nothing will be changed in this sequence number
+ * anymore. If the 'final' flag is false however it indicates
+ * that the cluster-view with this particular sequence number might
+ * still experience a change (more concretely: the deactivating instances
+ * might change). Note that there alternatives to this 'final' flag
+ * have been discussed, such as using vector-counters, but there
+ * was no obvious gain achieve using an alternative approach.
+ *
+ * In other words: whenever the 'final' flag is false, the view
+ * must be interpreted as 'in flux' wrt the deactivating/inactive instances
+ * and any action that depends on stable deactivating/inactive instances
+ * must not yet be done until the 'final' flag becomes true.
+ *
+ * Underneath, the DocumentDiscoveryLiteService uses the clusterNodes
+ * collection to derive the clusterView, which it stores in the settings
+ * collection. Whenever it updates the clusterView it increments the
+ * sequence number by 1.
+ *
+ * While this new 'clusterView' document in the settings collection
+ * sounds like redundant data (since it is just derived from the clusterNodes),
+ * it actually is not. By persisting the clusterView it becomes the
+ * new source of truth wrt what the clusterView looks like. And no
+ * two instances in the same cluster can make different conclusions
+ * based eg on different clocks they have or based on reading the clusterNodes
+ * in a slightly different moment etc. Also, the clusterView allows
+ * to store the currently two additional attributes: the clusterViewId
+ * (which is the permanent id for this cluster similar to the slingId being
+ * a permanent id for an instance) as well as the sequence number
+ * (which allows the instances to make reference to the same clusterView,
+ * and be able to simply detect whether anything has changed)
+ *
+ * Prerequisites that the clusterView mechanism is stable:
+ *
+ * the machine clocks are reasonably in sync - that is, they should
+ * be off by magnitudes less than the lease updateFrequency/timeout
+ * the write-delays from any instance to the mongo server where
+ * the clusterNodes and settings collections are stored should be
+ * very fast - at least orders of magnitudes lower again than the
+ * lease timeout
+ * when this instance notices that others have kicked it out of the
+ * clusterView (which it can find out when either its clusterNodes
+ * document is set to recovering or it is not in the clusterView
+ * anymore, although it just was - ie not just because of a fresh start),
+ * then this instance must step back gracefully.
+ * The exact definition is to be applied elsewhere - but it should
+ * include: stopping to update its own lease, waiting for the view
+ * to have stabilized - waiting for recovery of its own instance
+ * by the remaining instances in the cluster to have finished -
+ * and then probably waiting for another gracePeriod until it
+ * might rejoin the cluster. In between, any commit should
+ * fail with BannedFromClusterException
+ *
+ * @see #OAK_DISCOVERYLITE_CLUSTERVIEW
+ */
+@Component(immediate = true, name = DocumentDiscoveryLiteService.COMPONENT_NAME)
+@Service(value = { DocumentDiscoveryLiteService.class, Observer.class })
+public class DocumentDiscoveryLiteService implements ClusterStateChangeListener, Observer {
+
+ static final String COMPONENT_NAME = "org.apache.jackrabbit.oak.plugins.document.DocumentDiscoveryLiteService";
+
+ /** Name of the repository descriptor via which the clusterView is published - which is the raison d'etre of the DocumentDiscoveryLiteService **/
+ public static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview";
+
+ private static final Logger logger = LoggerFactory.getLogger(DocumentDiscoveryLiteService.class);
+
+ /** describes the reason why the BackgroundWorker should be woken up **/
+ private static enum WakeupReason {
+ CLUSTER_STATE_CHANGED,
+ BACKGROUND_READ_FINISHED
+ }
+
+ /** The BackgroundWorker is taking care of regularly invoking checkView - which in turn will detect if anything changed **/
+ private class BackgroundWorker implements Runnable {
+
+ final Random random = new Random();
+
+ boolean stopped = false;
+
+ private void stop() {
+ logger.trace("stop: start");
+ synchronized(BackgroundWorker.this) {
+ stopped = true;
+ }
+ logger.trace("stop: end");
+ }
+
+ @Override
+ public void run() {
+ logger.info("BackgroundWorker.run: start");
+ try{
+ doRun();
+ } finally {
+ logger.info("BackgroundWorker.run: end {finally}");
+ }
+ }
+
+ private void doRun() {
+ while(!stopped) {
+ try{
+ logger.trace("BackgroundWorker.doRun: going to call checkView");
+ final boolean shortSleep = checkView();
+ logger.trace("BackgroundWorker.doRun: checkView terminated with {} (=shortSleep)", shortSleep);
+ final long sleepMillis = shortSleep ? (50 + random.nextInt(450)) : 5000;
+ logger.trace("BackgroundWorker.doRun: sleeping {}ms", sleepMillis);
+ synchronized(BackgroundWorker.this) {
+ if (stopped) return;
+ BackgroundWorker.this.wait(sleepMillis);
+ if (stopped) return;
+ }
+ logger.trace("BackgorundWorker.doRun: done sleeping, looping");
+ } catch(Exception e) {
+ logger.error("doRun: got an exception: "+e, e);
+ try{
+ Thread.sleep(5000);
+ } catch(Exception e2) {
+ logger.error("doRun: got an exception while sleeping due to another exception: "+e2, e2);
+ }
+ }
+ }
+ }
+
+ }
+
+ /** This provides the 'clusterView' repository descriptors **/
+ private class DiscoveryLiteDescriptor implements Descriptors {
+
+ final SimpleValueFactory factory = new SimpleValueFactory();
+
+ @Override
+ public String[] getKeys() {
+ return new String[] {OAK_DISCOVERYLITE_CLUSTERVIEW};
+ }
+
+ @Override
+ public boolean isStandardDescriptor(String key) {
+ if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isSingleValueDescriptor(String key) {
+ if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Value getValue(String key) {
+ if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+ return null;
+ }
+ return factory.createValue(getClusterViewAsDescriptorValue());
+ }
+
+ @Override
+ public Value[] getValues(String key) {
+ if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+ return null;
+ }
+ return new Value[] {getValue(key)};
+ }
+
+ }
+
+ /** DocumentNodeStore's (hence local) clusterId **/
+ private int clusterNodeId = -1;
+
+ /** the DocumentNodeStore - used to get the active/inactive cluster ids from **/
+ private DocumentNodeStore documentNodeStore;
+
+ /** background job that periodically verifies and updates the clusterView **/
+ private BackgroundWorker backgroundWorker;
+
+ /** the ClusterViewDocument which was used in the last checkView run **/
+ private ClusterViewDocument previousClusterViewDocument;
+
+ /** the ClusterView that was valid as a result of the previous checkView run **/
+ private ClusterView previousClusterView;
+
+ /** kept volatile as this is frequently read in contentChanged which is better kept unsynchronized as long as possible **/
+ private volatile boolean hasInstancesWithBacklog;
+
+ /** Require a static reference to the NodeStore. Note that this implies the service is active for both segment and document **/
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY,
+ policy = ReferencePolicy.STATIC)
+ private volatile NodeStore nodeStore;
+
+ /** inactive nodes that have been so for a while, ie they have no backlog anymore, so no need to check for backlog every time **/
+ private Set longTimeInactives = new HashSet();
+
+ /** returns the clusterView as a json value for it to be provided via the repository descriptor **/
+ private String getClusterViewAsDescriptorValue() {
+ if (previousClusterView==null) {
+ return null;
+ } else {
+ return previousClusterView.asDescriptorValue();
+ }
+ }
+
+ /** On activate the MongoDiscoveryService tries to start the background job */
+ @Activate
+ public void activate(ComponentContext context) {
+ logger.trace("activate: start");
+
+ // Document vs Segment check
+ if (!(nodeStore instanceof DocumentNodeStore)) {
+ // that is the case when running on SegmentNodeStore - and atm
+ // there's no other simple dependency that could guarantee that
+ // DocumentDiscoveryLiteService is only activated when we are
+ // indeed running on DocumentNodeStore.
+ // So that's why there's this instanceof check.
+ // and if it fails, then disable the Document-discoveryliteservice
+ logger.info("activate: nodeStore is not a DocumentNodeStore, thus disabling: "+COMPONENT_NAME);
+ context.disableComponent(COMPONENT_NAME);
+ return;
+ }
+
+ // set the ClusterStateChangeListener with the DocumentNodeStore
+ this.documentNodeStore = (DocumentNodeStore) nodeStore;
+ documentNodeStore.setClusterStateChangeListener(this);
+
+ // retrieve the clusterId
+ clusterNodeId = documentNodeStore.getClusterId();
+
+ // start the background worker
+ backgroundWorker = new BackgroundWorker();
+ Thread th = new Thread(backgroundWorker, "DiscoveryLite-BackgroundWorker-["+clusterNodeId+"]");
+ th.setDaemon(true);
+ th.start();
+
+ // register the Descriptors - for Oak to pass it upwards
+ if (context!=null) {
+ OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
+ whiteboard.register(Descriptors.class, new DiscoveryLiteDescriptor(), Collections.emptyMap());
+ }
+ logger.trace("activate: end");
+ }
+
+ /** On deactivate the background job is stopped - if it was running at all **/
+ @Deactivate
+ protected void deactivate() {
+ logger.trace("deactivate: deactivated");
+
+ if (backgroundWorker!=null) {
+ backgroundWorker.stop();
+ backgroundWorker = null;
+ }
+ logger.trace("deactivate: end");
+ }
+
+ /**
+ * Checks if anything changed in the current view and updates the service
+ * fields accordingly.
+ * @return true if anything changed or is about to be changed (eg recovery/backlog), false if
+ * the view is stable
+ */
+ private boolean checkView() {
+ logger.trace("checkView: start");
+ List allClusterNodes =
+ ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore());
+
+ final Map allNodeIds = new HashMap();
+ final Map activeNotTimedOutNodes = new HashMap();
+ final Map activeButTimedOutNodes = new HashMap();
+ final Map recoveringNodes = new HashMap();
+ final Map backlogNodes = new HashMap();
+ final Map inactiveNoBacklogNodes = new HashMap();
+
+ for (Iterator it = allClusterNodes.iterator(); it.hasNext();) {
+ ClusterNodeInfoDocument clusterNode = it.next();
+ allNodeIds.put(clusterNode.getClusterId(), clusterNode);
+ if (clusterNode.isBeingRecovered()) {
+ recoveringNodes.put(clusterNode.getClusterId(), clusterNode);
+ } else if (!clusterNode.isActive()) {
+ if (hasBacklog(clusterNode)) {
+ backlogNodes.put(clusterNode.getClusterId(), clusterNode);
+ } else {
+ inactiveNoBacklogNodes.put(clusterNode.getClusterId(), clusterNode);
+ }
+ } else if (clusterNode.getLeaseEndTime() allActives;
+ allActives = new HashMap(activeNotTimedOutNodes);
+ allActives.putAll(activeButTimedOutNodes);
+
+ // terminology:
+ // 'inactivating' are nodes that are either 'recovering' or 'backlog' ones
+ // 'recovering' are nodes for which one node is doing the recover() of lastRevs
+ // 'backlog' ones are nodes that are no longer active, that have finished the
+ // recover() but for which a backgroundRead is still pending to read
+ // the latest root changes.
+
+ logger.debug("checkView: active nodes: {}, timed out nodes: {}, recovering nodes: {}, backlog nodes: {}, inactive nodes: {}, total: {}, hence view nodes: {}",
+ activeNotTimedOutNodes.size(), activeButTimedOutNodes.size(), recoveringNodes.size(), backlogNodes.size(), inactiveNoBacklogNodes.size(), allNodeIds.size(), allActives.size());
+
+ final ClusterViewDocument originalView = previousClusterViewDocument;
+ final ClusterViewDocument newView = doCheckView(allActives.keySet(), recoveringNodes.keySet(), backlogNodes.keySet(), inactiveNoBacklogNodes.keySet());
+ if (newView==null) {
+ logger.trace("checkView: end. newView: null");
+ return true;
+ }
+ boolean newHasInstancesWithBacklog = recoveringNodes.size()>0 || backlogNodes.size()>0;
+ boolean changed = originalView==null || (newView.getViewSeqNum()!=originalView.getViewSeqNum()) || (newHasInstancesWithBacklog!=hasInstancesWithBacklog);
+ logger.debug("checkView: viewFine: {}, changed: {}, originalView: {}, newView: {}", newView!=null, changed, originalView, newView);
+
+ if (longTimeInactives.addAll(inactiveNoBacklogNodes.keySet())) {
+ logger.debug("checkView: updated longTimeInactives to {} (inactiveNoBacklogNodes: {})", longTimeInactives, inactiveNoBacklogNodes);
+ }
+
+ if (changed) {
+ ClusterView v = ClusterView.fromDocument(clusterNodeId, newView, backlogNodes.keySet());
+ final ClusterView previousView = previousClusterView;
+ previousClusterView = v;
+ hasInstancesWithBacklog = newHasInstancesWithBacklog;
+ logger.info("checkView: view changed from: "+previousView+", to: "+v+", hasInstancesWithBacklog: "+hasInstancesWithBacklog);
+ return true;
+ } else {
+ logger.debug("checkView: no changes whatsoever, still at view: "+previousClusterView);
+ return hasInstancesWithBacklog;
+ }
+ }
+
+ private Revision getLastKnownRevision(int clusterNodeId) {
+ String[] lastKnownRevisions = documentNodeStore.getMBean().getLastKnownRevisions();
+ for (int i = 0; i < lastKnownRevisions.length; i++) {
+ String aLastKnownRevisionStr = lastKnownRevisions[i];
+ String[] split = aLastKnownRevisionStr.split("=");
+ if (split.length==2) {
+ try{
+ Integer id = Integer.parseInt(split[0]);
+ if (id==clusterNodeId) {
+ final Revision lastKnownRev = Revision.fromString(split[1]);
+ logger.trace("getLastKnownRevision: end. clusterNode: {}, lastKnownRevision: {}", clusterNodeId, lastKnownRev);
+ return lastKnownRev;
+ }
+ } catch(NumberFormatException nfe) {
+ logger.warn("getLastKnownRevision: could not parse integer '"+split[0]+"': "+nfe, nfe);
+ }
+ } else {
+ logger.warn("getLastKnownRevision: cannot parse lastKnownRevision: "+aLastKnownRevisionStr);
+ }
+ }
+ logger.warn("getLastKnownRevision: no lastKnownRevision found for "+clusterNodeId);
+ return null;
+ }
+
+ private boolean hasBacklog(ClusterNodeInfoDocument clusterNode) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("hasBacklog: start. clusterNodeId: {}", clusterNode.getClusterId());
+ }
+ final Revision lastKnownRevision = getLastKnownRevision(clusterNode.getClusterId());
+ if (lastKnownRevision==null) {
+ logger.warn("hasBacklog: no lastKnownRevision found, hence cannot determine backlog for node "+clusterNode.getClusterId());
+ return false;
+ }
+
+ // The lastKnownRevision is what the local instance has last read/seen from another instance.
+ // This must be compared to what the other instance *actually* has written as the very last thing.
+ // Now the knowledge what the other instance has last written (after recovery) would sit
+ // in the root document - so that could in theory be used. But reading the root document
+ // would have to be done *uncached*. And that's quite a change to what the original
+ // idea was: that the root document would only be read every second, to avoid contention.
+ // So this 'what the other instance has last written' information is retrieved via
+ // a new, dedicated property in the clusterNodes collection: the 'lastWrittenRootRev'.
+ // The 'lastWrittenRootRev' is written by 'UnsavedModifications' during backgroundUpdate
+ // and retrieved here quite regularly (but it should not be a big deal, as the
+ // discovery-lite is the only one reading this field so frequently and it does not
+ // interfere with normal (jcr) nodes at all).
+ String lastWrittenRootRevStr = clusterNode.getLastWrittenRootRev();
+ if (lastWrittenRootRevStr==null) {
+ logger.warn("hasBacklog: node has lastWrittenRootRev=null");
+ return false;
+ }
+ Revision lastWrittenRootRev = Revision.fromString(lastWrittenRootRevStr);
+ if (lastWrittenRootRev==null) {
+ logger.warn("hasBacklog: node has no lastWrittenRootRev: "+clusterNode.getClusterId());
+ return false;
+ }
+
+ final boolean hasBacklog = Revision.getTimestampDifference(lastKnownRevision, lastWrittenRootRev)<0;
+ if (logger.isDebugEnabled()) {
+ logger.debug("hasBacklog: clusterNodeId: {}, lastKnownRevision: {}, lastWrittenRootRev: {}, hasBacklog: {}",
+ clusterNode.getClusterId(), lastKnownRevision, lastWrittenRootRev, hasBacklog);
+ }
+ return hasBacklog;
+ }
+
+
+ private ClusterViewDocument doCheckView(final Set activeNodes,
+ final Set recoveringNodes, final Set backlogNodes, final Set inactiveNodes) {
+ logger.trace("doCheckView: start: activeNodes: {}, recoveringNodes: {}, backlogNodes: {}, inactiveNodes: {}",
+ activeNodes, recoveringNodes, backlogNodes, inactiveNodes);
+
+ Set allInactives = new HashSet();
+ allInactives.addAll(inactiveNodes);
+ allInactives.addAll(backlogNodes);
+
+ if (activeNodes.size()==0) {
+ // then we have zero active nodes - that's nothing expected as that includes our own node not to be active
+ // hence handle with care - ie wait until we get an active node
+ logger.warn("doCheckView: empty active ids. activeNodes:{}, recoveringNodes:{}, inactiveNodes:{}", activeNodes, recoveringNodes, inactiveNodes);
+ return null;
+ }
+ ClusterViewDocument newViewOrNull;
+ try{
+ newViewOrNull = ClusterViewDocument.readOrUpdate(documentNodeStore, activeNodes, recoveringNodes, allInactives);
+ } catch(RuntimeException re) {
+ logger.error("doCheckView: RuntimeException: re: "+re, re);
+ return null;
+ } catch(Error er) {
+ logger.error("doCheckView: Error: er: "+er, er);
+ return null;
+ }
+ logger.trace("doChckView: readOrUpdate result: {}", newViewOrNull);
+
+ // and now for some verbose documentation and logging:
+ if (newViewOrNull==null) {
+ // then there was a concurrent update of the clusterView
+ // and we should do some quick backoff sleeping
+ logger.debug("doCheckView: newViewOrNull is null: "+newViewOrNull);
+ return null;
+ } else {
+ // otherwise we now hold the newly valid view
+ // it could be the same or different to the previous one, let's check
+ if (previousClusterViewDocument==null) {
+ // oh ok, this is the very first one
+ previousClusterViewDocument = newViewOrNull;
+ logger.debug("doCheckView: end. first ever view: {}", newViewOrNull);
+ return newViewOrNull;
+ } else if (previousClusterViewDocument.getViewSeqNum()==newViewOrNull.getViewSeqNum()) {
+ // that's the normal case: the viewId matches, nothing has changed, we've already
+ // processed the previousClusterView, so:
+ logger.debug("doCheckView: end. seqNum did not change. view: {}", newViewOrNull);
+ return newViewOrNull;
+ } else {
+ // otherwise the view has changed
+ logger.info("doCheckView: view has changed from: {} to: {} - sending event...", previousClusterViewDocument, newViewOrNull);
+ previousClusterViewDocument = newViewOrNull;
+ logger.debug("doCheckView: end. changed view: {}", newViewOrNull);
+ return newViewOrNull;
+ }
+ }
+ }
+
+ @Override
+ public void handleClusterStateChange() {
+ // handleClusterStateChange is needed to learn about any state change in the clusternodes
+ // collection asap and being able to react on it - so this will wake up the
+ // backgroundWorker which in turn will - in a separate thread - check the view
+ // and send out events accordingly
+ wakeupBackgroundWorker(WakeupReason.CLUSTER_STATE_CHANGED);
+ }
+
+ private void wakeupBackgroundWorker(WakeupReason wakeupReason) {
+ final BackgroundWorker bw = backgroundWorker;
+ if (bw!=null) {
+ // get a copy of this.hasInstancesWithBacklog for just the code-part in this synchronized
+ final boolean hasInstancesWithBacklog = this.hasInstancesWithBacklog;
+
+ if (wakeupReason==WakeupReason.BACKGROUND_READ_FINISHED) {
+ // then only forward the notify if' hasInstancesWithBacklog'
+ // ie, we have anything we could be waiting for - otherwise
+ // we dont need to wakeup the background thread
+ if (!hasInstancesWithBacklog) {
+ logger.trace("wakeupBackgroundWorker: not waking up backgroundWorker, as we do not have any instances with backlog");
+ return;
+ }
+ }
+ logger.trace("wakeupBackgroundWorker: waking up backgroundWorker, reason: {} (hasInstancesWithBacklog: {})",
+ wakeupReason, hasInstancesWithBacklog);
+ synchronized(bw) {
+ bw.notifyAll();
+ }
+ }
+ }
+
+ /**
+ *
+ * Additionally the DocumentDiscoveryLiteService must be notified when the
+ * background-read has finished - as it could be waiting for a crashed node's recovery
+ * to finish - which it can only do by checking the lastKnownRevision of the crashed
+ * instance - and that check is best done after the background read is just finished
+ * (it could optinoally do that just purely time based as well, but going via a listener
+ * is more timely, that's why this approach has been chosen).
+ */
+ @Override
+ public void contentChanged(NodeState root, CommitInfo info) {
+ // contentChanged is only used to react as quickly as possible
+ // when we have instances that have a 'backlog' - ie when instances crashed
+ // and are being recovered - then we must wait until the recovery is finished
+ // AND until the subsequent background read actually reads that instance'
+ // last changes. To catch that moment as quickly as possible,
+ // this contentChanged is used.
+ // Now from the above it also results that this only wakes up the
+ // backgroundWorker if we have any pending 'backlogy instances'
+ // otherwise this is a no-op
+ if (info==null) {
+ // then ignore this as this is likely an external change
+ // note: it could be a compacted change, in which case we should
+ // probably still process it - but we have a 5sec fallback
+ // in the BackgroundWorker to handle that case too,
+ // so:
+ logger.trace("contentChanged: ignoring content change due to commit info being null");
+ return;
+ }
+ logger.trace("contentChanged: handling content changed by waking up worker if necessary");
+ wakeupBackgroundWorker(WakeupReason.BACKGROUND_READ_FINISHED);
+ }
+
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1696301)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy)
@@ -348,6 +348,14 @@
private final BlobStore blobStore;
/**
+ * The clusterStateChangeListener is invoked on any noticed change in the clusterNodes collection.
+ *
+ * Note that there is no synchronization between setting this one and using it, but arguably
+ * that is not necessary since it will be set at startup time and then never be changed.
+ */
+ private ClusterStateChangeListener clusterStateChangeListener;
+
+ /**
* The BlobSerializer.
*/
private final BlobSerializer blobSerializer = new BlobSerializer() {
@@ -1637,7 +1645,8 @@
runBackgroundReadOperations();
}
- private void runBackgroundUpdateOperations() {
+ /** Note: made package-protected for testing purpose, would otherwise be private **/
+ void runBackgroundUpdateOperations() {
if (isDisposed.get()) {
return;
}
@@ -1680,7 +1689,8 @@
//----------------------< background read operations >----------------------
- private void runBackgroundReadOperations() {
+ /** Note: made package-protected for testing purpose, would otherwise be private **/
+ void runBackgroundReadOperations() {
if (isDisposed.get()) {
return;
}
@@ -1724,8 +1734,11 @@
/**
* Updates the state about cluster nodes in {@link #activeClusterNodes}
* and {@link #inactiveClusterNodes}.
+ * @return true if the cluster state has changed, false if the cluster state
+ * remained unchanged
*/
- void updateClusterState() {
+ boolean updateClusterState() {
+ boolean hasChanged = false;
long now = clock.getTime();
Set inactive = Sets.newHashSet();
for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(store)) {
@@ -1733,14 +1746,15 @@
if (cId != this.clusterId && !doc.isActive()) {
inactive.add(cId);
} else {
- activeClusterNodes.put(cId, doc.getLeaseEndTime());
+ hasChanged |= activeClusterNodes.put(cId, doc.getLeaseEndTime())==null;
}
}
- activeClusterNodes.keySet().removeAll(inactive);
- inactiveClusterNodes.keySet().retainAll(inactive);
+ hasChanged |= activeClusterNodes.keySet().removeAll(inactive);
+ hasChanged |= inactiveClusterNodes.keySet().retainAll(inactive);
for (Integer clusterId : inactive) {
- inactiveClusterNodes.putIfAbsent(clusterId, now);
+ hasChanged |= inactiveClusterNodes.putIfAbsent(clusterId, now)==null;
}
+ return hasChanged;
}
/**
@@ -2436,6 +2450,15 @@
return blobGC;
}
+ void setClusterStateChangeListener(ClusterStateChangeListener clusterStateChangeListener) {
+ this.clusterStateChangeListener = clusterStateChangeListener;
+ }
+
+ void signalClusterStateChange() {
+ if (clusterStateChangeListener!=null) {
+ clusterStateChangeListener.handleClusterStateChange();
+ }
+ }
//-----------------------------< DocumentNodeStoreMBean >---------------------------------
public DocumentNodeStoreMBean getMBean() {
@@ -2607,10 +2630,16 @@
@Override
protected void execute(@Nonnull DocumentNodeStore nodeStore) {
- if (nodeStore.renewClusterIdLease()) {
- nodeStore.updateClusterState();
+ // first renew the clusterId lease
+ nodeStore.renewClusterIdLease();
+
+ // then, independently if the lease had to be updated or not, check the status:
+ if (nodeStore.updateClusterState()) {
+ // then inform the discovery lite listener - if it is registered
+ nodeStore.signalClusterStateChange();
}
}
+
}
public BlobStore getBlobStore() {
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (revision 1696301)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (working copy)
@@ -278,8 +278,16 @@
return recover(suspects.iterator(), clusterId);
} finally {
Utils.closeIfCloseable(suspects);
+
// Relinquish the lock on the recovery for the cluster on the clusterInfo
+ //TODO: in case recover throws a RuntimeException (or Error..) then
+ // the recovery might have failed, yet the instance is marked
+ // as 'recovered' (by setting the state to NONE).
+ // is this really fine here? or should we not retry - or at least
+ // log the throwable?
missingLastRevUtil.releaseRecoveryLock(clusterId);
+
+ nodeStore.signalClusterStateChange();
}
}
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (revision 1696301)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (working copy)
@@ -214,6 +214,17 @@
lastRev = null;
}
}
+ Revision writtenRootRev = pending.get("/");
+ if (writtenRootRev!=null) {
+ int cid = writtenRootRev.getClusterId();
+ if (store.getDocumentStore().find(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, String.valueOf(cid))!=null) {
+ UpdateOp update = new UpdateOp(String.valueOf(cid), false);
+ update.equals(Document.ID, null, String.valueOf(cid));
+ update.set(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY, writtenRootRev.toString());
+ store.getDocumentStore().findAndUpdate(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, update);
+ }
+ }
+
stats.write = clock.getTime() - time;
return stats;
}
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java (revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java (working copy)
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Test helper class that is capable to simply creating ClusterView and ClusterViewDocument objs **/
+class ClusterViewBuilder {
+
+ private final Set activeIds = new HashSet();
+ private final Set recoveringIds = new HashSet();
+ private final Set backlogIds = new HashSet();
+ private final Set inactiveIds = new HashSet();
+ private final long viewSeqNum;
+ private final String clusterViewId;
+ private final int myId;
+
+ ClusterViewBuilder(long viewSeqNum, String clusterViewId, int myId) {
+ this.viewSeqNum = viewSeqNum;
+ this.clusterViewId = clusterViewId;
+ this.myId = myId;
+ }
+
+ public ClusterViewBuilder active(int... instanceIds) {
+ for (int i = 0; i < instanceIds.length; i++) {
+ int anId = instanceIds[i];
+ activeIds.add(anId);
+ }
+ return this;
+ }
+ public ClusterViewBuilder recovering(int... instanceIds) {
+ for (int i = 0; i < instanceIds.length; i++) {
+ int anId = instanceIds[i];
+ recoveringIds.add(anId);
+ }
+ return this;
+ }
+ public ClusterViewBuilder backlogs(int... instanceIds) {
+ for (int i = 0; i < instanceIds.length; i++) {
+ int anId = instanceIds[i];
+ backlogIds.add(anId);
+ }
+ return this;
+ }
+ public ClusterViewBuilder inactive(int... instanceIds) {
+ for (int i = 0; i < instanceIds.length; i++) {
+ int anId = instanceIds[i];
+ inactiveIds.add(anId);
+ }
+ return this;
+ }
+
+ public ClusterViewDocument asDoc() {
+ /*
+ * "_id" : "clusterView",
+ "seqNum" : NumberLong(1),
+ "inactive" : null,
+ "clusterViewHistory" : {
+
+ },
+ "deactivating" : null,
+ "created" : "2015-06-30T08:21:29.393+0200",
+ "clusterViewId" : "882f8926-1112-493a-81a0-f946087b2986",
+ "active" : "1",
+ "creator" : 1,
+ "_modCount" : NumberLong(1)
+ */
+ Document doc = new Document();
+ doc.put(ClusterViewDocument.VIEW_SEQ_NUM_KEY, viewSeqNum);
+ doc.put(ClusterViewDocument.INACTIVE_KEY, asArrayStr(inactiveIds));
+ doc.put(ClusterViewDocument.RECOVERING_KEY, asArrayStr(recoveringIds));
+ doc.put(ClusterViewDocument.ACTIVE_KEY, asArrayStr(activeIds));
+ doc.put(ClusterViewDocument.CLUSTER_VIEW_ID_KEY, clusterViewId);
+ ClusterViewDocument clusterViewDoc = new ClusterViewDocument(doc);
+ return clusterViewDoc;
+ }
+ public ClusterView asView() {
+ return ClusterView.fromDocument(myId, asDoc(), backlogIds);
+ }
+
+ private String asArrayStr(Set ids) {
+ return ClusterViewDocument.arrayToCsv(asArray(ids));
+ }
+
+ private Integer[] asArray(Set set) {
+ if (set.size()==0) {
+ return null;
+ } else {
+ return set.toArray(new Integer[set.size()]);
+ }
+ }
+
+}
Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java (revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java (working copy)
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.junit.Test;
+
+public class ClusterViewDocumentTest {
+
+ @Test
+ public void testConstructor() {
+ ClusterViewDocument doc = new ClusterViewBuilder(1, "2", 3).active(3,4).asDoc();
+ assertNotNull(doc);
+ assertEquals("2", doc.getClusterViewId());
+ assertEquals(0, doc.getRecoveringIds().size());
+ assertEquals(0, doc.getInactiveIds().size());
+ assertEquals(2, doc.getActiveIds().size());
+ assertTrue(doc.getActiveIds().contains(3));
+ assertTrue(doc.getActiveIds().contains(4));
+
+ doc = new ClusterViewBuilder(1, "2", 3).active(3,4).backlogs(5).inactive(5,6).asDoc();
+ assertNotNull(doc);
+ assertEquals("2", doc.getClusterViewId());
+ assertEquals(0, doc.getRecoveringIds().size());
+ assertEquals(2, doc.getInactiveIds().size());
+ assertEquals(2, doc.getActiveIds().size());
+ assertTrue(doc.getActiveIds().contains(3));
+ assertTrue(doc.getActiveIds().contains(4));
+ assertTrue(doc.getInactiveIds().contains(5));
+ assertTrue(doc.getInactiveIds().contains(6));
+
+ doc = new ClusterViewBuilder(11, "x", 4).active(3,4,5).recovering(6).inactive(7,8).asDoc();
+ assertNotNull(doc);
+ assertEquals(11, doc.getViewSeqNum());
+ assertEquals("x", doc.getClusterViewId());
+ assertEquals(1, doc.getRecoveringIds().size());
+ assertEquals(2, doc.getInactiveIds().size());
+ assertEquals(3, doc.getActiveIds().size());
+ assertTrue(doc.getActiveIds().contains(3));
+ assertTrue(doc.getActiveIds().contains(4));
+ assertTrue(doc.getActiveIds().contains(5));
+ assertTrue(doc.getRecoveringIds().contains(6));
+ assertTrue(doc.getInactiveIds().contains(7));
+ assertTrue(doc.getInactiveIds().contains(8));
+
+ }
+
+ @Test
+ public void testReadUpdate() throws Exception {
+ final int localClusterId = 11;
+ final DocumentNodeStore ns = createMK(localClusterId).nodeStore;
+
+ try{
+ ClusterViewDocument.readOrUpdate(ns, null, null, null);
+ fail("should complain");
+ } catch(Exception ok) {
+ // ok
+ }
+
+ try{
+ ClusterViewDocument.readOrUpdate(ns, new HashSet(), null, null);
+ fail("should complain");
+ } catch(Exception ok) {
+ // ok
+ }
+
+ Set activeIds = new HashSet();
+ activeIds.add(2);
+ Set recoveringIds = null;
+ Set inactiveIds = null;
+ // first ever view:
+ ClusterViewDocument doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ final String id = doc.getClusterViewId();
+ assertTrue(id!=null && id.length()>0);
+ String createdAt = doc.getCreatedAt();
+ assertTrue(createdAt!=null && createdAt.length()>0);
+ int createdBy = doc.getCreatedBy();
+ assertEquals(localClusterId, createdBy);
+ assertEquals(1, doc.getViewSeqNum());
+ assertEquals(1, doc.getActiveIds().size());
+ assertTrue(doc.getActiveIds().contains(2));
+ assertEquals(0, doc.getRecoveringIds().size());
+ assertEquals(0, doc.getInactiveIds().size());
+
+ // now let's check if it doesn't change anything when we're not doing any update
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(1, doc.getViewSeqNum());
+
+ // and now add a new active id
+ activeIds.add(3);
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(id, doc.getClusterViewId());
+ createdAt = doc.getCreatedAt();
+ assertTrue(createdAt!=null && createdAt.length()>0);
+ createdBy = doc.getCreatedBy();
+ assertEquals(localClusterId, createdBy);
+ assertEquals(2, doc.getViewSeqNum());
+ assertEquals(2, doc.getActiveIds().size());
+ assertTrue(doc.getActiveIds().contains(2));
+ assertTrue(doc.getActiveIds().contains(3));
+ assertEquals(0, doc.getRecoveringIds().size());
+ assertEquals(0, doc.getInactiveIds().size());
+
+ // now let's check if it doesn't change anything when we're not doing any update
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(2, doc.getViewSeqNum());
+
+ // and now add a new recovering id
+ recoveringIds = new HashSet();
+ recoveringIds.add(4);
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(id, doc.getClusterViewId());
+ createdAt = doc.getCreatedAt();
+ assertTrue(createdAt!=null && createdAt.length()>0);
+ createdBy = doc.getCreatedBy();
+ assertEquals(localClusterId, createdBy);
+ assertEquals(3, doc.getViewSeqNum());
+ assertEquals(2, doc.getActiveIds().size());
+ assertTrue(doc.getActiveIds().contains(2));
+ assertTrue(doc.getActiveIds().contains(3));
+ assertEquals(1, doc.getRecoveringIds().size());
+ assertTrue(doc.getRecoveringIds().contains(4));
+ assertEquals(0, doc.getInactiveIds().size());
+
+ // now let's check if it doesn't change anything when we're not doing any update
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(3, doc.getViewSeqNum());
+
+ // and now move that one to inactive
+ recoveringIds = new HashSet();
+ inactiveIds = new HashSet();
+ inactiveIds.add(4);
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(id, doc.getClusterViewId());
+ createdAt = doc.getCreatedAt();
+ assertTrue(createdAt!=null && createdAt.length()>0);
+ createdBy = doc.getCreatedBy();
+ assertEquals(localClusterId, createdBy);
+ assertEquals(4, doc.getViewSeqNum());
+ assertEquals(2, doc.getActiveIds().size());
+ assertTrue(doc.getActiveIds().contains(2));
+ assertTrue(doc.getActiveIds().contains(3));
+ assertEquals(0, doc.getRecoveringIds().size());
+ assertEquals(1, doc.getInactiveIds().size());
+ assertTrue(doc.getInactiveIds().contains(4));
+
+ // now let's check if it doesn't change anything when we're not doing any update
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(4, doc.getViewSeqNum());
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(4, doc.getViewSeqNum());
+ doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+ assertEquals(4, doc.getViewSeqNum());
+ }
+
+ private static DocumentMK createMK(int clusterId){
+ return create(new MemoryDocumentStore(), clusterId);
+ }
+
+ private static DocumentMK create(DocumentStore ds, int clusterId){
+ return new DocumentMK.Builder()
+ .setAsyncDelay(0)
+ .setDocumentStore(ds)
+ .setClusterId(clusterId)
+ .setPersistentCache("target/persistentCache,time")
+ .open();
+ }
+
+}
Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java (revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java (working copy)
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.commons.json.JsonObject;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
+import org.junit.Test;
+
+/** Simple paranoia tests for constructor and getters of ClusterViewImpl **/
+public class ClusterViewTest {
+
+ @Test
+ public void testConstructor() throws Exception {
+ final Integer viewId = 3;
+ final String clusterViewId = UUID.randomUUID().toString();
+ final Integer instanceId = 2;
+ final Set emptyInstanceIds = new HashSet();
+ final Set instanceIds = new HashSet();
+ instanceIds.add(1);
+ final Set deactivating = new HashSet();
+ final Set inactive = new HashSet();
+ try{
+ new ClusterView(-1, true, clusterViewId, instanceId, instanceIds, deactivating, inactive);
+ fail("should complain");
+ } catch(IllegalStateException e) {
+ // ok
+ }
+ try{
+ new ClusterView(viewId, true, null, instanceId, instanceIds, deactivating, inactive);
+ fail("should complain");
+ } catch(IllegalStateException e) {
+ // ok
+ }
+ try{
+ new ClusterView(viewId, true, clusterViewId, -1, instanceIds, deactivating, inactive);
+ fail("should complain");
+ } catch(IllegalStateException e) {
+ // ok
+ }
+ try{
+ new ClusterView(viewId, true, clusterViewId, instanceId, emptyInstanceIds, deactivating, inactive);
+ fail("should complain");
+ } catch(IllegalStateException e) {
+ // ok
+ }
+ try{
+ new ClusterView(viewId, true, clusterViewId, instanceId, null, deactivating, inactive);
+ fail("should complain");
+ } catch(IllegalStateException e) {
+ // ok
+ }
+ try{
+ new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, null, inactive);
+ fail("should complain");
+ } catch(Exception e) {
+ // ok
+ }
+ try{
+ new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, null);
+ fail("should complain");
+ } catch(Exception e) {
+ // ok
+ }
+ final Set nonEmptyDeactivating = new HashSet();
+ nonEmptyDeactivating.add(3);
+ new ClusterView(viewId, false, clusterViewId, instanceId, instanceIds, nonEmptyDeactivating, inactive);
+ new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, nonEmptyDeactivating, inactive);
+ // should not complain about:
+ new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, inactive);
+ }
+
+ @Test
+ public void testGetters() throws Exception {
+ final Integer viewId = 3;
+ final String clusterViewId = UUID.randomUUID().toString();
+ final Set instanceIds = new HashSet();
+ instanceIds.add(1);
+ final Integer instanceId = 2;
+ final Set deactivating = new HashSet();
+ final Set inactive = new HashSet();
+ final ClusterView cv = new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, inactive);
+ assertNotNull(cv);
+ assertTrue(cv.asDescriptorValue().length()>0);
+ assertTrue(cv.toString().length()>0);
+ }
+
+ @Test
+ public void testOneActiveOnly() throws Exception {
+ String clusterViewId = UUID.randomUUID().toString();
+ ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 21);
+ ClusterView view = builder.active(21).asView();
+
+ // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]}
+ JsonObject o = asJsonObject(view);
+ Map props = o.getProperties();
+ assertEquals("10", props.get("seq"));
+ assertEquals(clusterViewId, unwrapString(props.get("id")));
+ assertEquals("21", props.get("me"));
+ assertEquals(asJsonArray(21), props.get("active"));
+ assertEquals(asJsonArray(), props.get("deactivating"));
+ assertEquals(asJsonArray(), props.get("inactive"));
+ }
+
+ @Test
+ public void testOneActiveOneInactive() throws Exception {
+ String clusterViewId = UUID.randomUUID().toString();
+ ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+ ClusterView view = builder.active(2).inactive(3).asView();
+
+ // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]}
+ JsonObject o = asJsonObject(view);
+ Map props = o.getProperties();
+ assertEquals("10", props.get("seq"));
+ assertEquals(clusterViewId, unwrapString(props.get("id")));
+ assertEquals("2", props.get("me"));
+ assertEquals(asJsonArray(2), props.get("active"));
+ assertEquals(asJsonArray(), props.get("deactivating"));
+ assertEquals(asJsonArray(3), props.get("inactive"));
+ }
+
+ @Test
+ public void testSeveralActiveOneInactive() throws Exception {
+ String clusterViewId = UUID.randomUUID().toString();
+ ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+ ClusterView view = builder.active(2,5,6).inactive(3).asView();
+
+ // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]}
+ JsonObject o = asJsonObject(view);
+ Map props = o.getProperties();
+ assertEquals("10", props.get("seq"));
+ assertEquals("true", props.get("final"));
+ assertEquals(clusterViewId, unwrapString(props.get("id")));
+ assertEquals("2", props.get("me"));
+ assertEquals(asJsonArray(2,5,6), props.get("active"));
+ assertEquals(asJsonArray(), props.get("deactivating"));
+ assertEquals(asJsonArray(3), props.get("inactive"));
+ }
+
+ @Test
+ public void testOneActiveSeveralInactive() throws Exception {
+ String clusterViewId = UUID.randomUUID().toString();
+ ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+ ClusterView view = builder.active(2).inactive(3,4,5,6).asView();
+
+ // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]}
+ JsonObject o = asJsonObject(view);
+ Map props = o.getProperties();
+ assertEquals("10", props.get("seq"));
+ assertEquals("true", props.get("final"));
+ assertEquals(clusterViewId, unwrapString(props.get("id")));
+ assertEquals("2", props.get("me"));
+ assertEquals(asJsonArray(2), props.get("active"));
+ assertEquals(asJsonArray(), props.get("deactivating"));
+ assertEquals(asJsonArray(3,4,5,6), props.get("inactive"));
+ }
+
+ @Test
+ public void testWithRecoveringOnly() throws Exception {
+ String clusterViewId = UUID.randomUUID().toString();
+ ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+ ClusterView view = builder.active(2,3).recovering(4).inactive(5,6).asView();
+
+ JsonObject o = asJsonObject(view);
+ Map props = o.getProperties();
+ assertEquals("10", props.get("seq"));
+ assertEquals("true", props.get("final"));
+ assertEquals(clusterViewId, unwrapString(props.get("id")));
+ assertEquals("2", props.get("me"));
+ assertEquals(asJsonArray(2,3), props.get("active"));
+ assertEquals(asJsonArray(4), props.get("deactivating"));
+ assertEquals(asJsonArray(5,6), props.get("inactive"));
+ }
+
+ @Test
+ public void testWithRecoveringAndBacklog() throws Exception {
+ String clusterViewId = UUID.randomUUID().toString();
+ ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+ ClusterView view = builder.active(2,3).recovering(4).inactive(5,6).backlogs(5).asView();
+
+ JsonObject o = asJsonObject(view);
+ Map props = o.getProperties();
+ assertEquals("10", props.get("seq"));
+ assertEquals(clusterViewId, unwrapString(props.get("id")));
+ assertEquals("2", props.get("me"));
+ assertEquals("false", props.get("final"));
+ assertEquals(asJsonArray(2,3), props.get("active"));
+ assertEquals(asJsonArray(4,5), props.get("deactivating"));
+ assertEquals(asJsonArray(6), props.get("inactive"));
+ }
+
+ @Test
+ public void testBacklogButNotInactive() throws Exception {
+ String clusterViewId = UUID.randomUUID().toString();
+ ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+ try{
+ ClusterView view = builder.active(2,3).backlogs(5).asView();
+ fail("should complain");
+ } catch(Exception ok) {
+ // ok
+ }
+ }
+
+ private JsonObject asJsonObject(final ClusterView view) {
+ final String json = view.asDescriptorValue();
+ System.out.println(json);
+ JsopTokenizer t = new JsopTokenizer(json);
+ t.read('{');
+ JsonObject o = JsonObject.create(t);
+ return o;
+ }
+
+ private String unwrapString(String stringWithQuotes) {
+ //TODO: I'm not really sure why the JsonObject parses this string including the "
+ // perhaps that's rather a bug ..
+ assertTrue(stringWithQuotes.startsWith("\""));
+ assertTrue(stringWithQuotes.endsWith("\""));
+ return stringWithQuotes.substring(1, stringWithQuotes.length()-1);
+ }
+
+ static String asJsonArray(int... ids) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (int i = 0; i < ids.length; i++) {
+ int anId = ids[i];
+ if (i!=0) {
+ sb.append(",");
+ }
+ sb.append(String.valueOf(anId));
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+}
Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java (revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java (working copy)
@@ -0,0 +1,1002 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jcr.PropertyType;
+import javax.jcr.Value;
+import javax.jcr.ValueFormatException;
+
+import junitx.util.PrivateAccessor;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Descriptors;
+import org.apache.jackrabbit.oak.commons.json.JsonObject;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.mongodb.DB;
+
+/**
+ * Tests for the DocumentDiscoveryLiteService
+ */
+public class DocumentDiscoveryLiteServiceTest {
+
+ /** container for what should represent an instance, but is not a complete one, hence 'simplified'.
+ * it contains most importantly the DocuemntNodeStore and the discoveryLite service
+ */
+ class SimplifiedInstance {
+
+ private DocumentDiscoveryLiteService service;
+ private DocumentNodeStore ns;
+ private final Descriptors descriptors;
+ private Map registeredServices;
+ private final long lastRevInterval;
+ private volatile boolean lastRevStopped = false;
+ private volatile boolean writeSimulationStopped = false;
+ private Thread lastRevThread;
+ private Thread writeSimulationThread;
+ public String workingDir;
+
+ SimplifiedInstance(DocumentDiscoveryLiteService service,
+ DocumentNodeStore ns, Descriptors descriptors,
+ Map registeredServices, long lastRevInterval,
+ String workingDir) {
+ this.service = service;
+ this.ns = ns;
+ this.workingDir = workingDir;
+ this.descriptors = descriptors;
+ this.registeredServices = registeredServices;
+ this.lastRevInterval = lastRevInterval;
+ if (lastRevInterval>0) {
+ startLastRevThread();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SimplifiedInstance[cid="+ns.getClusterId()+"]";
+ }
+
+ private void startLastRevThread() {
+ lastRevStopped = false;
+ lastRevThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ while(!lastRevStopped) {
+ SimplifiedInstance.this.ns.getLastRevRecoveryAgent().performRecoveryIfNeeded();
+ try {
+ Thread.sleep(SimplifiedInstance.this.lastRevInterval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ });
+ lastRevThread.setDaemon(true);
+ lastRevThread.setName("lastRevThread[cid="+ns.getClusterId()+"]");
+ lastRevThread.start();
+ }
+
+ void stopLastRevThread() throws InterruptedException {
+ lastRevStopped = true;
+ lastRevThread.join();
+ }
+
+ boolean isFinal() throws Exception {
+ final JsonObject clusterViewObj = getClusterViewObj();
+ if (clusterViewObj==null) {
+ throw new IllegalStateException("should always have that final flag set");
+ }
+
+ String finalStr = clusterViewObj.getProperties().get("final");
+
+ return Boolean.valueOf(finalStr);
+ }
+
+ boolean hasActiveIds(String clusterViewStr, int... expected) throws Exception {
+ return hasIds(clusterViewStr, "active", expected);
+ }
+
+ boolean hasDeactivatingIds(String clusterViewStr, int... expected) throws Exception {
+ return hasIds(clusterViewStr, "deactivating", expected);
+ }
+
+ boolean hasInactiveIds(String clusterViewStr, int... expected) throws Exception {
+ return hasIds(clusterViewStr, "inactive", expected);
+ }
+
+ private boolean hasIds(final String clusterViewStr, final String key, int... expectedIds)
+ throws Exception {
+ final JsonObject clusterViewObj = asJsonObject(clusterViewStr);
+ String actualIdsStr = clusterViewObj==null ? null : clusterViewObj.getProperties().get(key);
+
+ boolean actualEmpty = actualIdsStr==null || actualIdsStr.length()==0 || actualIdsStr.equals("[]");
+ boolean expectedEmpty = expectedIds==null || expectedIds.length==0;
+
+ if (actualEmpty && expectedEmpty) {
+ return true;
+ }
+ if (actualEmpty != expectedEmpty) {
+ return false;
+ }
+
+ final List actualList = Arrays.asList(ClusterViewDocument.csvToIntegerArray(actualIdsStr.substring(1, actualIdsStr.length()-1)));
+ if (expectedIds.length!=actualList.size()) {
+ return false;
+ }
+ for (int i = 0; i < expectedIds.length; i++) {
+ int anExpectedId = expectedIds[i];
+ if (!actualList.contains(anExpectedId)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ JsonObject getClusterViewObj() throws Exception {
+ final String json = getClusterViewStr();
+ return asJsonObject(json);
+ }
+
+ private JsonObject asJsonObject(final String json) {
+ if (json==null) {
+ return null;
+ }
+ JsopTokenizer t = new JsopTokenizer(json);
+ t.read('{');
+ JsonObject o = JsonObject.create(t);
+ return o;
+ }
+
+ String getClusterViewStr() throws Exception {
+ return getDescriptor(DocumentDiscoveryLiteService.OAK_DISCOVERYLITE_CLUSTERVIEW);
+ }
+
+ String getDescriptor(String key) throws Exception {
+ final Value value = descriptors.getValue(key);
+ if (value==null) {
+ return null;
+ }
+ if (value.getType()!=PropertyType.STRING) {
+ return null;
+ }
+ try{
+ return value.getString();
+ } catch(ValueFormatException vfe) {
+ return null;
+ }
+ }
+
+ public void dispose() {
+ logger.info("Disposing "+this);
+ try {
+ stopSimulatingWrites();
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+ if (lastRevThread!=null) {
+ try{
+ stopLastRevThread();
+ } catch(InterruptedException ok) {
+ fail("interrupted");
+ }
+ lastRevThread = null;
+ }
+ if (service!=null) {
+ service.deactivate();
+ service = null;
+ }
+ if (ns!=null) {
+ ns.dispose();
+ ns = null;
+ }
+ if (registeredServices!=null) {
+ registeredServices.clear();
+ registeredServices = null;
+ }
+ }
+
+ /** shutdown simulates the normal, graceful, shutdown
+ * @throws InterruptedException */
+ public void shutdown() throws InterruptedException {
+ stopSimulatingWrites();
+ stopLastRevThread();
+ ns.dispose();
+ service.deactivate();
+ }
+
+ /** crash simulates a kill -9, sort of
+ * @throws Throwable */
+ public void crash() throws Throwable {
+ logger.info("crash: stopping simulating writes...");
+ stopSimulatingWrites();
+ logger.info("crash: stopping lastrev thread...");
+ stopLastRevThread();
+ logger.info("crash: stopped lastrev thread, now setting least to end within 1 sec");
+
+ boolean renewed = setLeaseTime(1000 /* 1 sec */);
+ if (!renewed) {
+ logger.info("halt");
+ fail("did not renew clusterid lease");
+ }
+
+ logger.info("crash: now stopping background read/update");
+ stopAllBackgroundThreads();
+ // but don't do the following from DocumentNodeStore.dispose():
+ // * don't do the last internalRunBackgroundUpdateOperations - as we're trying to simulate a crash here
+ // * don't dispose clusterNodeInfo to leave the node in active state
+
+ // the DocumentDiscoveryLiteService currently can simply be deactivated, doesn't differ much from crashing
+ service.deactivate();
+ logger.info("crash: crash simulation done.");
+ }
+
+ /**
+ * very hacky way of doing the following:
+ * make sure this instance's clusterNodes entry is marked with a very short (1 sec off) lease end time
+ * so that the crash detection doesn't take a minute (as it would by default)
+ */
+ private boolean setLeaseTime(final int leaseTime)
+ throws NoSuchFieldException {
+ ns.getClusterInfo().setLeaseTime(leaseTime);
+ PrivateAccessor.setField(ns.getClusterInfo(), "leaseEndTime", System.currentTimeMillis()+(leaseTime/2));
+ boolean renewed = ns.renewClusterIdLease();
+ return renewed;
+ }
+
+ private AtomicBoolean getIsDisposed() throws NoSuchFieldException {
+ AtomicBoolean isDisposed = (AtomicBoolean) PrivateAccessor.getField(ns, "isDisposed");
+ return isDisposed;
+ }
+
+ private void stopAllBackgroundThreads() throws NoSuchFieldException {
+ // get all those background threads...
+ Thread backgroundReadThread = (Thread) PrivateAccessor.getField(ns, "backgroundReadThread");
+ assertNotNull(backgroundReadThread);
+ Thread backgroundUpdateThread = (Thread) PrivateAccessor.getField(ns, "backgroundUpdateThread");
+ assertNotNull(backgroundUpdateThread);
+ Thread leaseUpdateThread = (Thread) PrivateAccessor.getField(ns, "leaseUpdateThread");
+ assertNotNull(leaseUpdateThread);
+
+ // start doing what DocumentNodeStore.dispose() would do - except do it very fine controlled, basically:
+ // make sure to stop backgroundReadThread, backgroundUpdateThread and leaseUpdateThread
+ // but then nothing else.
+ final AtomicBoolean isDisposed = getIsDisposed();
+ assertFalse(isDisposed.getAndSet(true));
+ // notify background threads waiting on isDisposed
+ synchronized (isDisposed) {
+ isDisposed.notifyAll();
+ }
+ try {
+ backgroundReadThread.join(5000);
+ assertTrue(!backgroundReadThread.isAlive());
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ try {
+ backgroundUpdateThread.join(5000);
+ assertTrue(!backgroundUpdateThread.isAlive());
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ try {
+ leaseUpdateThread.join(5000);
+ assertTrue(!leaseUpdateThread.isAlive());
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ public void stopBgReadThread() throws NoSuchFieldException {
+ final Thread backgroundReadThread = (Thread) PrivateAccessor.getField(ns, "backgroundReadThread");
+ assertNotNull(backgroundReadThread);
+ final Runnable bgReadRunnable = (Runnable) PrivateAccessor.getField(backgroundReadThread, "target");
+ assertNotNull(bgReadRunnable);
+ final AtomicBoolean bgReadIsDisposed = new AtomicBoolean(false);
+ PrivateAccessor.setField(bgReadRunnable, "isDisposed", bgReadIsDisposed);
+ assertFalse(bgReadIsDisposed.getAndSet(true));
+ try {
+ backgroundReadThread.join(5000);
+ assertTrue(!backgroundReadThread.isAlive());
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // big of heavy work, but now the backgroundReadThread is stopped and all the others are still running
+ }
+
+ public void addNode(String path) throws CommitFailedException {
+ NodeBuilder root = ns.getRoot().builder();
+ NodeBuilder child = root;
+ String[] split = path.split("/");
+ for(int i=1; i mks = Lists.newArrayList();
+ private MemoryDocumentStore ds;
+ private MemoryBlobStore bs;
+
+ final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private List allInstances = new LinkedList();
+
+ @Test
+ public void testActivateDeactivate() throws Exception {
+ // test the variant where we're not running with DocumentNodeStore
+ DocumentDiscoveryLiteService discoveryLite = new DocumentDiscoveryLiteService();
+ ComponentContext c = mock(ComponentContext.class);
+ discoveryLite.activate(c);
+ verify(c, times(1)).disableComponent(DocumentDiscoveryLiteService.COMPONENT_NAME);
+
+ // then test normal start with a DocumentNodeStore
+ DocumentMK mk1 = createMK(1, 0);
+ discoveryLite = new DocumentDiscoveryLiteService();
+ PrivateAccessor.setField(discoveryLite, "nodeStore", mk1.nodeStore);
+ BundleContext bc = mock(BundleContext.class);
+ c = mock(ComponentContext.class);
+ when(c.getBundleContext()).thenReturn(bc);
+ discoveryLite.activate(c);
+ verify(c, times(0)).disableComponent(DocumentDiscoveryLiteService.COMPONENT_NAME);
+ discoveryLite.deactivate();
+ }
+
+ /** Borrowed from http://stackoverflow.com/questions/3301635/change-private-static-final-field-using-java-reflection */
+ static void setFinalStatic(Field field, Object newValue) throws Exception {
+ field.setAccessible(true);
+
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+ field.set(null, newValue);
+ }
+
+ // subsequent tests should get a DocumentDiscoveryLiteService setup from the start
+ private DocumentNodeStore createNodeStore(String workingDir) throws SecurityException, Exception {
+ // ensure that we always get a fresh cluster[node]id
+ System.setProperty("user.dir", workingDir);
+ setFinalStatic(ClusterNodeInfo.class.getDeclaredField("WORKING_DIR"), workingDir);
+
+ // then create the DocumentNodeStore
+ DocumentMK mk1 = createMK(0 /* to make sure the clusterNodes collection is used **/, 500 /* asyncDelay: background interval*/);
+
+ logger.info("createNodeStore: created DocumentNodeStore with cid="+mk1.nodeStore.getClusterId()+", workingDir="+workingDir);
+ return mk1.nodeStore;
+ }
+
+ private SimplifiedInstance createInstance() throws Exception {
+ final String workingDir = UUID.randomUUID().toString();
+ return createInstance(workingDir);
+ }
+
+ private SimplifiedInstance createInstance(String workingDir) throws SecurityException, Exception {
+ DocumentNodeStore ns = createNodeStore(workingDir);
+ return createInstance(ns, workingDir);
+ }
+ private SimplifiedInstance createInstance(DocumentNodeStore ns, String workingDir) throws NoSuchFieldException {
+ DocumentDiscoveryLiteService discoveryLite = new DocumentDiscoveryLiteService();
+ PrivateAccessor.setField(discoveryLite, "nodeStore", ns);
+ BundleContext bc = mock(BundleContext.class);
+ ComponentContext c = mock(ComponentContext.class);
+ when(c.getBundleContext()).thenReturn(bc);
+ final Map registeredServices = new HashMap();
+ when(bc.registerService(anyString(), anyObject(), (Properties)anyObject())).then(new Answer() {
+ @Override
+ public ServiceRegistration answer(InvocationOnMock invocation) {
+ registeredServices.put((String) invocation.getArguments()[0], invocation.getArguments()[1]);
+ return null;
+ }
+ });
+ discoveryLite.activate(c);
+ Descriptors d = (Descriptors) registeredServices.get(Descriptors.class.getName());
+ final SimplifiedInstance result = new SimplifiedInstance(discoveryLite, ns, d, registeredServices, 500, workingDir);
+ allInstances.add(result);
+ logger.info("Created "+result);
+ return result;
+ }
+
+ private void waitFor(Expectation expectation, int timeout, String msg) throws Exception {
+ final long tooLate = System.currentTimeMillis() + timeout;
+ while(true) {
+ final String fulfillmentResult = expectation.fulfilled();
+ if (fulfillmentResult==null) {
+ // everything's fine
+ return;
+ }
+ if (System.currentTimeMillis()>tooLate) {
+ fail("expectation not fulfilled within "+timeout+"ms: "+msg+", fulfillment result: "+fulfillmentResult);
+ }
+ Thread.sleep(100);
+ }
+ }
+
+ @Test
+ public void testOneNode() throws Exception {
+ final SimplifiedInstance s1 = createInstance();
+ final ViewExpectation expectation = new ViewExpectation(s1);
+ expectation.setActiveIds(s1.ns.getClusterId());
+ waitFor(expectation, 2000, "see myself as active");
+ }
+
+ @Test
+ public void testTwoNodesWithCleanShutdown() throws Exception {
+ final SimplifiedInstance s1 = createInstance();
+ final SimplifiedInstance s2 = createInstance();
+ final ViewExpectation expectation1 = new ViewExpectation(s1);
+ final ViewExpectation expectation2 = new ViewExpectation(s2);
+ expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+ expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+ waitFor(expectation1, 2000, "first should see both as active");
+ waitFor(expectation2, 2000, "second should see both as active");
+
+ s2.shutdown();
+ final ViewExpectation expectation1AfterShutdown = new ViewExpectation(s1);
+ expectation1AfterShutdown.setActiveIds(s1.ns.getClusterId());
+ expectation1AfterShutdown.setInactiveIds(s2.ns.getClusterId());
+ waitFor(expectation1AfterShutdown, 2000, "first should only see itself after shutdown");
+ }
+
+ @Test
+ public void testTwoNodesWithCrash() throws Throwable {
+ final SimplifiedInstance s1 = createInstance();
+ final SimplifiedInstance s2 = createInstance();
+ final ViewExpectation expectation1 = new ViewExpectation(s1);
+ final ViewExpectation expectation2 = new ViewExpectation(s2);
+ expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+ expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+ waitFor(expectation1, 2000, "first should see both as active");
+ waitFor(expectation2, 2000, "second should see both as active");
+
+ s2.crash();
+
+ final ViewExpectation expectation1AfterShutdown = new ViewExpectation(s1);
+ expectation1AfterShutdown.setActiveIds(s1.ns.getClusterId());
+ expectation1AfterShutdown.setInactiveIds(s2.ns.getClusterId());
+ waitFor(expectation1AfterShutdown, 2000, "first should only see itself after shutdown");
+ }
+
+ @Test
+ public void testTwoNodesWithCrashAndLongduringRecovery() throws Throwable {
+ doTestTwoNodesWithCrashAndLongduringDeactivation(false);
+ }
+
+ @Test
+ public void testTwoNodesWithCrashAndLongduringRecoveryAndBacklog() throws Throwable {
+ doTestTwoNodesWithCrashAndLongduringDeactivation(true);
+ }
+
+ void doTestTwoNodesWithCrashAndLongduringDeactivation(boolean withBacklog) throws Throwable {
+ final int TEST_WAIT_TIMEOUT = 10000;
+ final SimplifiedInstance s1 = createInstance();
+ final SimplifiedInstance s2 = createInstance();
+ final ViewExpectation expectation1 = new ViewExpectation(s1);
+ final ViewExpectation expectation2 = new ViewExpectation(s2);
+ expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+ expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+ waitFor(expectation1, TEST_WAIT_TIMEOUT, "first should see both as active");
+ waitFor(expectation2, TEST_WAIT_TIMEOUT, "second should see both as active");
+
+ // before crashing s2, make sure that s1's lastRevRecovery thread doesn't run
+ s1.stopLastRevThread();
+ if (withBacklog) {
+ // plus also stop s1's backgroundReadThread - in case we want to test backlog handling
+ s1.stopBgReadThread();
+
+ // and then, if we want to do backlog testing, then s2 should write something
+ // before it crashes, so here it comes:
+ s2.addNode("/foo/bar");
+ s2.setProperty("/foo/bar", "prop", "value");
+ }
+
+ // then crash s2
+ s2.crash();
+
+ // then wait 2 sec
+ Thread.sleep(2000);
+
+ // at this stage, while s2 has crashed, we have stopped s1's lastRevRecoveryThread, so we should still see both as active
+ logger.info(s1.getClusterViewStr());
+ final ViewExpectation expectation1AfterCrashBeforeLastRevRecovery = new ViewExpectation(s1);
+ expectation1AfterCrashBeforeLastRevRecovery.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+ waitFor(expectation1AfterCrashBeforeLastRevRecovery, TEST_WAIT_TIMEOUT, "first should still see both as active");
+
+ // the next part is a bit tricky: we want to fine-control the lastRevRecoveryThread's acquire/release locking.
+ // the chosen way to do this is to make heavy use of mockito and two semaphores:
+ // when acquireRecoveryLock is called, that thread should wait for the waitBeforeLocking semaphore to be released
+ final MissingLastRevSeeker missingLastRevUtil =
+ (MissingLastRevSeeker) PrivateAccessor.getField(s1.ns.getLastRevRecoveryAgent(), "missingLastRevUtil");
+ assertNotNull(missingLastRevUtil);
+ MissingLastRevSeeker mockedLongduringMissingLastRevUtil = mock(MissingLastRevSeeker.class, delegatesTo(missingLastRevUtil));
+ final Semaphore waitBeforeLocking = new Semaphore(0);
+ when(mockedLongduringMissingLastRevUtil.acquireRecoveryLock(anyInt())).then(new Answer() {
+
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ logger.info("going to waitBeforeLocking");
+ waitBeforeLocking.acquire();
+ logger.info("done with waitBeforeLocking");
+ return missingLastRevUtil.acquireRecoveryLock((Integer)invocation.getArguments()[0]);
+ }
+ });
+ PrivateAccessor.setField(s1.ns.getLastRevRecoveryAgent(), "missingLastRevUtil", mockedLongduringMissingLastRevUtil);
+
+ // so let's start the lastRevThread again and wait for that waitBeforeLocking semaphore to be hit
+ s1.startLastRevThread();
+ waitFor(new Expectation() {
+
+ @Override
+ public String fulfilled() throws Exception {
+ if (!waitBeforeLocking.hasQueuedThreads()) {
+ return "no thread queued";
+ }
+ return null;
+ }
+
+ }, TEST_WAIT_TIMEOUT, "lastRevRecoveryThread should acquire a lock");
+
+ // at this stage the crashed s2 is still not in recovery mode, so let's check:
+ logger.info(s1.getClusterViewStr());
+ final ViewExpectation expectation1AfterCrashBeforeLastRevRecoveryLocking = new ViewExpectation(s1);
+ expectation1AfterCrashBeforeLastRevRecoveryLocking.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+ waitFor(expectation1AfterCrashBeforeLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see both as active");
+
+ // one thing, before we let the waitBeforeLocking go, setup the release semaphore/mock:
+ final Semaphore waitBeforeUnlocking = new Semaphore(0);
+ Mockito.doAnswer(new Answer() {
+ public Void answer(InvocationOnMock invocation) throws InterruptedException {
+ logger.info("Going to waitBeforeUnlocking");
+ waitBeforeUnlocking.acquire();
+ logger.info("Done with waitBeforeUnlocking");
+ missingLastRevUtil.releaseRecoveryLock((Integer)invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(mockedLongduringMissingLastRevUtil).releaseRecoveryLock(anyInt());
+
+ // let go (or tschaedere loh)
+ waitBeforeLocking.release();
+
+ // then, right after we let the waitBeforeLocking semaphore go, we should see s2 in recovery mode
+ final ViewExpectation expectation1AfterCrashWhileLastRevRecoveryLocking = new ViewExpectation(s1);
+ expectation1AfterCrashWhileLastRevRecoveryLocking.setActiveIds(s1.ns.getClusterId());
+ expectation1AfterCrashWhileLastRevRecoveryLocking.setDeactivatingIds(s2.ns.getClusterId());
+ waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering");
+
+ // ok, meanwhile, the lastRevRecoveryAgent should have hit the ot
+ waitFor(new Expectation() {
+
+ @Override
+ public String fulfilled() throws Exception {
+ if (!waitBeforeUnlocking.hasQueuedThreads()) {
+ return "no thread queued";
+ }
+ return null;
+ }
+
+ }, TEST_WAIT_TIMEOUT, "lastRevRecoveryThread should want to release a lock");
+
+ // so then, we should still see the same state
+ waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering");
+
+ logger.info("Waiting 1,5sec");
+ Thread.sleep(1500);
+ logger.info("Waiting done");
+
+ // first, lets check to see what the view looks like - should be unchanged:
+ waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering");
+
+ // let waitBeforeUnlocking go
+ logger.info("releasing waitBeforeUnlocking, state: "+s1.getClusterViewStr());
+ waitBeforeUnlocking.release();
+ logger.info("released waitBeforeUnlocking");
+
+ if (!withBacklog) {
+ final ViewExpectation expectationWithoutBacklog = new ViewExpectation(s1);
+ expectationWithoutBacklog.setActiveIds(s1.ns.getClusterId());
+ expectationWithoutBacklog.setInactiveIds(s2.ns.getClusterId());
+ waitFor(expectationWithoutBacklog, TEST_WAIT_TIMEOUT, "finally we should see s2 as completely inactive");
+ } else {
+ // wait just 2 sec to see if the bgReadThread is really stopped
+ logger.info("sleeping 2 sec");
+ Thread.sleep(2000);
+ logger.info("sleeping 2 sec done, state: "+s1.getClusterViewStr());
+
+ // when that's the case, check the view - it should now be in a special 'final=false' mode
+ final ViewExpectation expectationBeforeBgRead = new ViewExpectation(s1);
+ expectationBeforeBgRead.setActiveIds(s1.ns.getClusterId());
+ expectationBeforeBgRead.setDeactivatingIds(s2.ns.getClusterId());
+ expectationBeforeBgRead.setFinal(false);
+ waitFor(expectationBeforeBgRead, TEST_WAIT_TIMEOUT, "first should only see itself after shutdown");
+
+ // ook, now we explicitly do a background read to get out of the backlog situation
+ s1.ns.runBackgroundReadOperations();
+
+ final ViewExpectation expectationAfterBgRead = new ViewExpectation(s1);
+ expectationAfterBgRead.setActiveIds(s1.ns.getClusterId());
+ expectationAfterBgRead.setInactiveIds(s2.ns.getClusterId());
+ waitFor(expectationAfterBgRead, TEST_WAIT_TIMEOUT, "finally we should see s2 as completely inactive");
+ }
+ }
+
+ /** This test creates a large number of documentnodestores which it starts, runs, stops
+ * in a random fashion, always testing to make sure the clusterView is correct
+ */
+ @Test
+ public void testLargeStartStopFiesta() throws Throwable {
+ final List instances = new LinkedList();
+ final Map inactiveIds = new HashMap();
+ final Random random = new Random();
+ final int LOOP_CNT = 50; // with too many loops have also seen mongo connections becoming starved thus test failed
+ final int CHECK_EVERY = 3;
+ final int MAX_NUM_INSTANCES = 8;
+ for(int i=0; i0) {
+ logger.info("Case 0 - reactivating an instance...");
+ final int n = random.nextInt(inactiveIds.size());
+ final Integer cid = new LinkedList(inactiveIds.keySet()).get(n);
+ final String reactivatedWorkingDir = inactiveIds.remove(cid);
+ if (reactivatedWorkingDir==null) {
+ fail("reactivatedWorkingDir null for n="+n+", cid="+cid+", other inactives: "+inactiveIds);
+ }
+ assertNotNull(reactivatedWorkingDir);
+ logger.info("Case 0 - reactivated instance "+cid+", workingDir="+reactivatedWorkingDir);
+ workingDir = reactivatedWorkingDir;
+ logger.info("Case 0: creating instance");
+ final SimplifiedInstance newInstance = createInstance(workingDir);
+ newInstance.setLeastTimeout(5000);
+ newInstance.startSimulatingWrites(500);
+ logger.info("Case 0: created instance: "+newInstance.ns.getClusterId());
+ if (newInstance.ns.getClusterId()!=cid) {
+ logger.info("Case 0: reactivated instance did not take over cid - probably a testing artifact. expected cid: {}, actual cid: {}", cid, newInstance.ns.getClusterId());
+ inactiveIds.put(cid, reactivatedWorkingDir);
+ // remove the newly reactivated from the inactives - although it shouldn't be there, it might!
+ inactiveIds.remove(newInstance.ns.getClusterId());
+ }
+ instances.add(newInstance);
+ }
+ break;
+ }
+ case 1: {
+ // creates a new instance
+ if (instances.size()1) {
+ // before shutting down: make sure we have a stable view (we could otherwise not correctly startup too)
+ checkFiestaState(instances, inactiveIds.keySet());
+ final SimplifiedInstance instance = instances.remove(random.nextInt(instances.size()));
+ assertNotNull(instance.workingDir);
+ logger.info("Case 3: Shutdown instance: "+instance.ns.getClusterId());
+ inactiveIds.put(instance.ns.getClusterId(), instance.workingDir);
+ instance.shutdown();
+ }
+ break;
+ }
+ case 4: {
+ // crash instance
+ if (instances.size()>1) {
+ // before crashing make sure we have a stable view (we could otherwise not correctly startup too)
+ checkFiestaState(instances, inactiveIds.keySet());
+ final SimplifiedInstance instance = instances.remove(random.nextInt(instances.size()));
+ assertNotNull(instance.workingDir);
+ logger.info("Case 4: Crashing instance: "+instance.ns.getClusterId());
+ inactiveIds.put(instance.ns.getClusterId(), instance.workingDir);
+ instance.addNode("/"+instance.ns.getClusterId()+"/stuffForRecovery/"+random.nextInt(10000));
+ instance.crash();
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ private void dumpChildren(DocumentNodeState root) {
+ logger.info("testEmptyParentRecovery: root: "+root);
+ Iterator it = root.getChildNodeNames().iterator();
+ while(it.hasNext()) {
+ String n = it.next();
+ logger.info("testEmptyParentRecovery: a child: '"+n+"'");
+ }
+ }
+
+ private void checkFiestaState(final List instances, Set inactiveIds) throws Exception {
+ final List activeIds = new LinkedList();
+ for (Iterator it = instances.iterator(); it.hasNext();) {
+ SimplifiedInstance anInstance = it.next();
+ activeIds.add(anInstance.ns.getClusterId());
+ }
+ for (Iterator it = instances.iterator(); it.hasNext();) {
+ SimplifiedInstance anInstance = it.next();
+
+ final ViewExpectation e = new ViewExpectation(anInstance);
+ e.setActiveIds(activeIds.toArray(new Integer[activeIds.size()]));
+ e.setInactiveIds(inactiveIds.toArray(new Integer[inactiveIds.size()]));
+ waitFor(e, 20000, "checkFiestaState failed for "+anInstance+", with instances: "+instances+", and inactiveIds: "+inactiveIds);
+ }
+ }
+
+ @Before
+ @After
+ public void clear() {
+ for(SimplifiedInstance i : allInstances) {
+ i.dispose();
+ }
+ for (DocumentMK mk : mks) {
+ mk.dispose();
+ }
+ mks.clear();
+ if (MONGO_DB) {
+ DB db = MongoUtils.getConnection().getDB();
+ MongoUtils.dropCollections(db);
+ }
+ }
+
+ private DocumentMK createMK(int clusterId, int asyncDelay) {
+ if (MONGO_DB) {
+ DB db = MongoUtils.getConnection().getDB();
+ return register(new DocumentMK.Builder().setMongoDB(db)
+ .setLeaseCheck(false)
+ .setClusterId(clusterId).setAsyncDelay(asyncDelay).open());
+ } else {
+ if (ds == null) {
+ ds = new MemoryDocumentStore();
+ }
+ if (bs == null) {
+ bs = new MemoryBlobStore();
+ }
+ return createMK(clusterId, asyncDelay, ds, bs);
+ }
+ }
+
+ private DocumentMK createMK(int clusterId, int asyncDelay,
+ DocumentStore ds, BlobStore bs) {
+ return register(new DocumentMK.Builder().setDocumentStore(ds)
+ .setBlobStore(bs).setClusterId(clusterId)
+ .setLeaseCheck(false)
+ .setAsyncDelay(asyncDelay).open());
+ }
+
+ private DocumentMK register(DocumentMK mk) {
+ mks.add(mk);
+ return mk;
+ }
+
+}
Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property