From 95c90f0b2b8e34c7e73be51a0a7e10c4b31bc216 Mon Sep 17 00:00:00 2001 From: Dave Harvey Date: Mon, 17 Sep 2018 18:23:23 -0400 Subject: [PATCH] # p2p_two_hops --- .../deployment/GridDeploymentClassLoader.java | 51 ++++++++++++--- .../deployment/GridDeploymentCommunication.java | 74 +++++++++++++++------- 2 files changed, 94 insertions(+), 31 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentClassLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentClassLoader.java index ca9ce328b6..71a23ac846 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentClassLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentClassLoader.java @@ -596,9 +596,15 @@ class GridDeploymentClassLoader extends ClassLoader implements GridDeploymentInf continue; } + + // Must not send to the participants excluded from recursion, if any. + if (this.comm.nodeOnRecursionExclusionList(node)) { + continue; + } try { - GridDeploymentResponse res = comm.sendResourceRequest(path, ldrId, node, endTime); + // Tell the destination not to recurse to any of the nodes we are about to directly search. + GridDeploymentResponse res = comm.sendResourceRequest(path, ldrId, node, endTime, nodeListCp); if (res == null) { String msg = "Failed to send class-loading request to node (is node alive?) [node=" + @@ -615,8 +621,19 @@ class GridDeploymentClassLoader extends ClassLoader implements GridDeploymentInf continue; } - if (res.success()) + if (res.success()) { + + // For the next time, start with the node that just succeeded. + if (nodeList.peekFirst() != nodeId) { + synchronized (mux) { + if (nodeList.remove(nodeId)) { + nodeList.addFirst(nodeId); + } + } + } + return res.byteSource(); + } // In case of shared resources/classes all nodes should have it. if (log.isDebugEnabled()) @@ -628,8 +645,9 @@ class GridDeploymentClassLoader extends ClassLoader implements GridDeploymentInf missedRsrcs.add(path); } - throw new ClassNotFoundException("Failed to peer load class [class=" + name + ", nodeClsLdrs=" + - nodeLdrMapCp + ", parentClsLoader=" + getParent() + ", reason=" + res.errorMessage() + ']'); + // Continue to ask other participants, since this node may not have recursed to find the + // class due to exclusions we passed, or because its path to the class was lost due to + // a node failure. } catch (IgniteCheckedException e) { // This thread should be interrupted again in communication if it @@ -736,10 +754,16 @@ class GridDeploymentClassLoader extends ClassLoader implements GridDeploymentInf continue; } - + + // Must not send to the participants excluded from recursion + if (this.comm.nodeOnRecursionExclusionList(node)) { + continue; + } + try { // Request is sent with timeout that is why we can use synchronization here. - GridDeploymentResponse res = comm.sendResourceRequest(name, ldrId, node, endTime); + // Tell the destination not to recurse to any of the nodes we are about to directly search. + GridDeploymentResponse res = comm.sendResourceRequest(name, ldrId, node, endTime, nodeListCp); if (res == null) { U.warn(log, "Failed to get resource from node (is node alive?) [nodeId=" + @@ -767,10 +791,21 @@ class GridDeploymentClassLoader extends ClassLoader implements GridDeploymentInf node.id() + ", clsLdrId=" + ldrId + ", resName=" + name + ", parentClsLdr=" + getParent() + ", msg=" + res.errorMessage() + ']'); - // Do not ask other nodes in case of shared mode all of them should have the resource. - return null; + // Continue to ask other participants, since this node may not have recursed to find the + // class due to exclusions we passed, or because its path to the class was lost due to + // a node failure. } else { + + // For the next time, start with the node that just succeeded. + if (nodeList.peekFirst() != nodeId) { + synchronized (mux) { + if (nodeList.remove(nodeId)) { + nodeList.addFirst(nodeId); + } + } + } + return new ByteArrayInputStream(res.byteSource().internalArray(), 0, res.byteSource().size()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java index 2a5f7cae12..0934b664be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.HashSet; +import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -337,50 +338,77 @@ class GridDeploymentCommunication { GridIoPolicy.P2P_POOL); } } - + + /** + * Returns true if the given node was excluded by the node that sent us the + * request we are serving. + * {@linkplain GridDeploymentRequest request} (which is not an + * {@linkplain GridDeploymentRequest#isUndeploy undeploy}) being processed by this thread. + * + * @param node The node to query about + * @return true if the node is excluded + * + * @pre node != null + */ + boolean nodeOnRecursionExclusionList(ClusterNode node) { + assert node != null; + + // activeReqNodeIds is thread local. + Collection nodeIds = activeReqNodeIds.get(); + + + return (nodeIds != null && nodeIds.contains(node.id())); + } + /** * Sends request to the remote node and wait for response. If there is - * no response until threshold time, method returns null. + * no response until threshold time, method returns null. The receiver + * may forward the request to other nodes, but not to nodes that have been + * searched already, or nodes we intend to search directly. * + * The exclusion list severely weakens the potential n-squared behavior when a + * resource does not exist. If the resource does not exist, all nodes + * in the graph must be touched at least once. To avoid all multiple touches + * for the resource-not-found case, the response would need to include the + * nodes forwarded to, so they could be added to the exclusion list. However + * that seems like overkill for the kinds of graphs peer class loading will generate. * * @param rsrcName Resource name. * @param clsLdrId Class loader ID. - * @param dstNode Remote node request should be sent to. + * @param dstNode Remote node request should be sent to. + * {@linkplain GridDeploymentCommunication.nodeOnRecursionExclusionList + * nodeOnRecursionExclusionList(dstNode) must be false.} * @param threshold Time in milliseconds when request is decided to * be obsolete. + * @param exclusionList - Nodes to avoid recursively searching, in addition to + * the node(s) that forwarded this request initially. These nodes will + * end up on the receiver's exclusion list, in addition to nodes on our + * current exclusion list. * @return Either response value or {@code null} if timeout occurred. * @throws IgniteCheckedException Thrown if there is no connection with remote node. + * */ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"}) GridDeploymentResponse sendResourceRequest(final String rsrcName, IgniteUuid clsLdrId, - final ClusterNode dstNode, long threshold) throws IgniteCheckedException { + final ClusterNode dstNode, long threshold, Collection nodesToSkip) throws IgniteCheckedException { assert rsrcName != null; assert dstNode != null; assert clsLdrId != null; - - Collection nodeIds = activeReqNodeIds.get(); - - if (nodeIds != null && nodeIds.contains(dstNode.id())) { - if (log.isDebugEnabled()) - log.debug("Node attempts to load resource from one of the requesters " + - "[rsrcName=" + rsrcName + ", dstNodeId=" + dstNode.id() + - ", requesters=" + nodeIds + ']'); - - GridDeploymentResponse fake = new GridDeploymentResponse(); - - fake.success(false); - fake.errorMessage("Node attempts to load resource from one of the requesters " + - "[rsrcName=" + rsrcName + ", dstNodeId=" + dstNode.id() + - ", requesters=" + nodeIds + ']'); - - return fake; - } + assert nodesToSkip != null; + assert(!nodeOnRecursionExclusionList(dstNode)); Object resTopic = TOPIC_CLASSLOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId())); GridDeploymentRequest req = new GridDeploymentRequest(resTopic, clsLdrId, rsrcName, false); + + // Receiver should not forward to nodes that originated request nor + // nodes this node intends to send to. + Set nodeIds = new HashSet(); + if (activeReqNodeIds.get() != null) { + nodeIds.addAll(activeReqNodeIds.get()); + } + nodeIds.addAll(nodesToSkip); - // Send node IDs chain with request. req.nodeIds(nodeIds); final Object qryMux = new Object(); -- 2.15.1