From e61dfd1517db69a820471af6818fd767da3b6cbd Mon Sep 17 00:00:00 2001 From: AKuznetsov Date: Sat, 28 Feb 2015 23:10:53 +0700 Subject: [PATCH] IGNITE-187 Refactoring of node attributes. --- .../apache/ignite/internal/GridKernalContext.java | 30 +++ .../ignite/internal/GridKernalContextImpl.java | 23 ++ .../org/apache/ignite/internal/IgniteKernal.java | 242 +++++++++------------ .../ignite/internal/managers/GridManager.java | 14 +- .../internal/managers/GridManagerAdapter.java | 64 +++--- .../managers/checkpoint/GridCheckpointManager.java | 2 + .../managers/collision/GridCollisionManager.java | 2 + .../managers/communication/GridIoManager.java | 2 + .../managers/deployment/GridDeploymentManager.java | 2 + .../managers/discovery/GridDiscoveryManager.java | 32 ++- .../eventstorage/GridEventStorageManager.java | 2 + .../managers/failover/GridFailoverManager.java | 2 + .../managers/indexing/GridIndexingManager.java | 2 + .../loadbalancer/GridLoadBalancerManager.java | 2 + .../managers/swapspace/GridSwapSpaceManager.java | 2 + .../ignite/internal/processors/GridProcessor.java | 10 - .../internal/processors/GridProcessorAdapter.java | 5 - .../processors/cache/GridCacheProcessor.java | 56 +++-- .../processors/clock/GridClockSyncProcessor.java | 9 +- .../internal/processors/igfs/IgfsProcessor.java | 102 ++++----- .../processors/rest/GridRestProcessor.java | 64 +++--- .../processors/streamer/GridStreamProcessor.java | 31 +-- .../org/apache/ignite/plugin/PluginProvider.java | 3 +- ...tyFunctionExcludeNeighborsAbstractSelfTest.java | 1 + 24 files changed, 339 insertions(+), 365 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 100ad28..16e1189 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -508,4 +508,34 @@ public interface GridKernalContext extends Iterable { * @return Exception registry. */ public IgniteExceptionRegistry exceptionRegistry(); + + /** + * Get node attribute by name. + * + * @param key Attribute name. + * @return Attribute value. + */ + public Object nodeAttribute(String key); + + /** + * Check if node has specified attribute. + * + * @param key Attribute name. + * @return {@code true} If node has attribute with specified name. + */ + public boolean hasNodeAttribute(String key); + + /** + * Add attribute to node attributes. + * + * @param key Attribute name. + * @param val Attribute value. + * @return Previous attribute value associated with attribute name. + */ + public Object addNodeAttribute(String key, Object val); + + /** + * @return Node attributes. + */ + public Map nodeAttributes(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 395ad52..da082b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -275,6 +275,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringExclude protected ExecutorService restExecSvc; + /** */ + @GridToStringExclude + private Map attrs = new HashMap<>(150); /** */ private IgniteEx grid; @@ -854,6 +857,26 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public Object nodeAttribute(String key) { + return attrs.get(key); + } + + /** {@inheritDoc} */ + @Override public boolean hasNodeAttribute(String key) { + return attrs.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public Object addNodeAttribute(String key, Object val) { + return attrs.put(key, val); + } + + /** {@inheritDoc} */ + @Override public Map nodeAttributes() { + return attrs; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 9c92edd..92864d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -477,16 +477,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** - * @param attrs Current attributes. * @param name New attribute name. * @param val New attribute value. * @throws IgniteCheckedException If duplicated SPI name found. */ - private void add(Map attrs, String name, @Nullable Serializable val) throws IgniteCheckedException { - assert attrs != null; + private void add(String name, @Nullable Serializable val) throws IgniteCheckedException { assert name != null; - if (attrs.put(name, val) != null) { + if (ctx.addNodeAttribute(name, val) != null) { if (name.endsWith(ATTR_SPI_CLASS)) // User defined duplicated names for the different SPIs. throw new IgniteCheckedException("Failed to set SPI attribute. Duplicated SPI name found: " + @@ -661,8 +659,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Ack configuration. ackSpis(); - Map attrs = createNodeAttributes(cfg, BUILD_TSTAMP_STR); - // Spin out SPIs & managers. try { ctx = new GridKernalContextImpl(log, @@ -678,6 +674,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { igfsExecSvc, restExecSvc); + fillNodeAttributes(); + cluster = new IgniteClusterImpl(ctx); U.onGridStart(); @@ -690,7 +688,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { scheduler = new IgniteSchedulerImpl(ctx); - startProcessor(ctx, rsrcProc, attrs); + startProcessor(rsrcProc); // Inject resources into lifecycle beans. if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) { @@ -706,71 +704,71 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Starts lifecycle aware components. U.startLifecycleAware(lifecycleAwares(cfg)); - addHelper(ctx, IGFS_HELPER.create(F.isEmpty(cfg.getIgfsConfiguration()))); + addHelper(IGFS_HELPER.create(F.isEmpty(cfg.getIgfsConfiguration()))); - startProcessor(ctx, new IgnitePluginProcessor(ctx, cfg), attrs); + startProcessor(new IgnitePluginProcessor(ctx, cfg)); // Off-heap processor has no dependencies. - startProcessor(ctx, new GridOffHeapProcessor(ctx), attrs); + startProcessor(new GridOffHeapProcessor(ctx)); // Closure processor should be started before all others // (except for resource processor), as many components can depend on it. - startProcessor(ctx, new GridClosureProcessor(ctx), attrs); + startProcessor(new GridClosureProcessor(ctx)); // Start some other processors (order & place is important). - startProcessor(ctx, new GridPortProcessor(ctx), attrs); - startProcessor(ctx, new GridJobMetricsProcessor(ctx), attrs); + startProcessor(new GridPortProcessor(ctx)); + startProcessor(new GridJobMetricsProcessor(ctx)); // Timeout processor needs to be started before managers, // as managers may depend on it. - startProcessor(ctx, new GridTimeoutProcessor(ctx), attrs); + startProcessor(new GridTimeoutProcessor(ctx)); // Start security processors. - startProcessor(ctx, createComponent(GridSecurityProcessor.class, ctx), attrs); + startProcessor(createComponent(GridSecurityProcessor.class, ctx)); // Start SPI managers. // NOTE: that order matters as there are dependencies between managers. - startManager(ctx, new GridIoManager(ctx), attrs); - startManager(ctx, new GridCheckpointManager(ctx), attrs); + startManager(new GridIoManager(ctx)); + startManager(new GridCheckpointManager(ctx)); - startManager(ctx, new GridEventStorageManager(ctx), attrs); - startManager(ctx, new GridDeploymentManager(ctx), attrs); - startManager(ctx, new GridLoadBalancerManager(ctx), attrs); - startManager(ctx, new GridFailoverManager(ctx), attrs); - startManager(ctx, new GridCollisionManager(ctx), attrs); - startManager(ctx, new GridSwapSpaceManager(ctx), attrs); - startManager(ctx, new GridIndexingManager(ctx), attrs); + startManager(new GridEventStorageManager(ctx)); + startManager(new GridDeploymentManager(ctx)); + startManager(new GridLoadBalancerManager(ctx)); + startManager(new GridFailoverManager(ctx)); + startManager(new GridCollisionManager(ctx)); + startManager(new GridSwapSpaceManager(ctx)); + startManager(new GridIndexingManager(ctx)); - ackSecurity(ctx); + ackSecurity(); // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. - startProcessor(ctx, new GridClockSyncProcessor(ctx), attrs); - startProcessor(ctx, new GridAffinityProcessor(ctx), attrs); - startProcessor(ctx, createComponent(GridSegmentationProcessor.class, ctx), attrs); - startProcessor(ctx, createComponent(GridPortableProcessor.class, ctx), attrs); - startProcessor(ctx, new GridQueryProcessor(ctx), attrs); - startProcessor(ctx, new GridCacheProcessor(ctx), attrs); - startProcessor(ctx, new GridTaskSessionProcessor(ctx), attrs); - startProcessor(ctx, new GridJobProcessor(ctx), attrs); - startProcessor(ctx, new GridTaskProcessor(ctx), attrs); - startProcessor(ctx, (GridProcessor)SCHEDULE.createOptional(ctx), attrs); - startProcessor(ctx, new GridRestProcessor(ctx), attrs); - startProcessor(ctx, new GridDataLoaderProcessor(ctx), attrs); - startProcessor(ctx, new GridStreamProcessor(ctx), attrs); - startProcessor(ctx, (GridProcessor) IGFS.create(ctx, F.isEmpty(cfg.getIgfsConfiguration())), attrs); - startProcessor(ctx, new GridContinuousProcessor(ctx), attrs); - startProcessor(ctx, (GridProcessor)(cfg.isPeerClassLoadingEnabled() ? + startProcessor(new GridClockSyncProcessor(ctx)); + startProcessor(new GridAffinityProcessor(ctx)); + startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); + startProcessor(createComponent(GridPortableProcessor.class, ctx)); + startProcessor(new GridQueryProcessor(ctx)); + startProcessor(new GridCacheProcessor(ctx)); + startProcessor(new GridTaskSessionProcessor(ctx)); + startProcessor(new GridJobProcessor(ctx)); + startProcessor(new GridTaskProcessor(ctx)); + startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); + startProcessor(new GridRestProcessor(ctx)); + startProcessor(new GridDataLoaderProcessor(ctx)); + startProcessor(new GridStreamProcessor(ctx)); + startProcessor((GridProcessor) IGFS.create(ctx, F.isEmpty(cfg.getIgfsConfiguration()))); + startProcessor(new GridContinuousProcessor(ctx)); + startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ? IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled. - IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)), attrs); - startProcessor(ctx, new GridServiceProcessor(ctx), attrs); - startProcessor(ctx, new DataStructuresProcessor(ctx), attrs); + IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null))); + startProcessor(new GridServiceProcessor(ctx)); + startProcessor(new DataStructuresProcessor(ctx)); // Start plugins. for (PluginProvider provider : ctx.plugins().allProviders()) { ctx.add(new GridPluginComponent(provider)); - provider.start(ctx.plugins().pluginContextForProvider(provider), attrs); + provider.start(ctx.plugins().pluginContextForProvider(provider)); } gw.writeLock(); @@ -779,7 +777,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { gw.setState(STARTED); // Start discovery manager last to make sure that grid is fully initialized. - startManager(ctx, new GridDiscoveryManager(ctx), attrs); + startManager(new GridDiscoveryManager(ctx)); } finally { gw.writeUnlock(); @@ -789,7 +787,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { checkPhysicalRam(); // Suggest configuration optimizations. - suggestOptimizations(ctx, cfg); + suggestOptimizations(cfg); // Notify discovery manager the first to make sure that topology is discovered. ctx.discovery().onKernalStart(); @@ -884,12 +882,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { private long lastCompletedCnt; @Override protected void safeRun() { - ExecutorService e = execSvc; - - if (!(e instanceof ThreadPoolExecutor)) + if (!(execSvc instanceof ThreadPoolExecutor)) return; - ThreadPoolExecutor exec = (ThreadPoolExecutor)e; + ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc; long completedCnt = exec.getCompletedTaskCount(); @@ -951,10 +947,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { int pubPoolIdleThreads = 0; int pubPoolQSize = 0; - ExecutorService pubExec = execSvc; - - if (pubExec instanceof ThreadPoolExecutor) { - ThreadPoolExecutor exec = (ThreadPoolExecutor)pubExec; + if (execSvc instanceof ThreadPoolExecutor) { + ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc; int poolSize = exec.getPoolSize(); @@ -967,10 +961,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { int sysPoolIdleThreads = 0; int sysPoolQSize = 0; - ExecutorService sysExec = sysExecSvc; - - if (sysExec instanceof ThreadPoolExecutor) { - ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExec; + if (sysExecSvc instanceof ThreadPoolExecutor) { + ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc; int poolSize = exec.getPoolSize(); @@ -1075,10 +1067,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** - * @param ctx Context. * @param cfg Configuration to check for possible performance issues. */ - private void suggestOptimizations(GridKernalContext ctx, IgniteConfiguration cfg) { + private void suggestOptimizations(IgniteConfiguration cfg) { GridPerformanceSuggestions perf = ctx.performance(); if (ctx.collision().enabled()) @@ -1104,25 +1095,21 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * Creates attributes map and fills it in. * - * @param cfg Grid configuration. - * @param build Build string. - * @return Map of all node attributes. * @throws IgniteCheckedException thrown if was unable to set up attribute. */ @SuppressWarnings({"SuspiciousMethodCalls", "unchecked", "TypeMayBeWeakened"}) - private Map createNodeAttributes(IgniteConfiguration cfg, String build) throws IgniteCheckedException { - Map attrs = new HashMap<>(); - + private void fillNodeAttributes() throws IgniteCheckedException { final String[] incProps = cfg.getIncludeProperties(); try { // Stick all environment settings into node attributes. - attrs.putAll(F.view(System.getenv(), new P1() { - @Override public boolean apply(String name) { - return incProps == null || U.containsStringArray(incProps, name, true) || - U.isVisorNodeStartProperty(name) || U.isVisorRequiredProperty(name); - } - })); + for (Map.Entry sysEntry : System.getenv().entrySet()) { + String name = sysEntry.getKey(); + + if (incProps == null || U.containsStringArray(incProps, name, true) || + U.isVisorNodeStartProperty(name) || U.isVisorRequiredProperty(name)) + ctx.addNodeAttribute(name, sysEntry.getValue()); + } if (log.isDebugEnabled()) log.debug("Added environment properties to node attributes."); @@ -1143,13 +1130,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { U.isVisorRequiredProperty(name); } }).entrySet()) { - Object val = attrs.get(e.getKey()); + String key = (String)e.getKey(); + + Object val = ctx.nodeAttribute(key); if (val != null && !val.equals(e.getValue())) - U.warn(log, "System property will override environment variable with the same name: " - + e.getKey()); + U.warn(log, "System property will override environment variable with the same name: " + key); - attrs.put((String)e.getKey(), e.getValue()); + ctx.addNodeAttribute(key, e.getValue()); } if (log.isDebugEnabled()) @@ -1177,23 +1165,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { "Ignite is starting on loopback address..."); // Stick in network context into attributes. - add(attrs, ATTR_IPS, (ips.isEmpty() ? "" : ips)); - add(attrs, ATTR_MACS, (macs.isEmpty() ? "" : macs)); + add(ATTR_IPS, (ips.isEmpty() ? "" : ips)); + add(ATTR_MACS, (macs.isEmpty() ? "" : macs)); // Stick in some system level attributes - add(attrs, ATTR_JIT_NAME, U.getCompilerMx() == null ? "" : U.getCompilerMx().getName()); - add(attrs, ATTR_BUILD_VER, VER_STR); - add(attrs, ATTR_BUILD_DATE, build); - add(attrs, ATTR_COMPATIBLE_VERS, (Serializable)compatibleVersions()); - add(attrs, ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName()); - add(attrs, ATTR_USER_NAME, System.getProperty("user.name")); - add(attrs, ATTR_GRID_NAME, gridName); + add(ATTR_JIT_NAME, U.getCompilerMx() == null ? "" : U.getCompilerMx().getName()); + add(ATTR_BUILD_VER, VER_STR); + add(ATTR_BUILD_DATE, BUILD_TSTAMP_STR); + add(ATTR_COMPATIBLE_VERS, (Serializable)compatibleVersions()); + add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName()); + add(ATTR_USER_NAME, System.getProperty("user.name")); + add(ATTR_GRID_NAME, gridName); - add(attrs, ATTR_PEER_CLASSLOADING, cfg.isPeerClassLoadingEnabled()); - add(attrs, ATTR_DEPLOYMENT_MODE, cfg.getDeploymentMode()); - add(attrs, ATTR_LANG_RUNTIME, getLanguage()); + add(ATTR_PEER_CLASSLOADING, cfg.isPeerClassLoadingEnabled()); + add(ATTR_DEPLOYMENT_MODE, cfg.getDeploymentMode()); + add(ATTR_LANG_RUNTIME, getLanguage()); - add(attrs, ATTR_JVM_PID, U.jvmPid()); + add(ATTR_JVM_PID, U.jvmPid()); // Build a string from JVM arguments, because parameters with spaces are split. SB jvmArgs = new SB(512); @@ -1207,11 +1195,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { jvmArgs.a(arg); } // Add it to attributes. - add(attrs, ATTR_JVM_ARGS, jvmArgs.toString()); + add(ATTR_JVM_ARGS, jvmArgs.toString()); // Check daemon system property and override configuration if it's set. if (isDaemon()) - add(attrs, ATTR_DAEMON, "true"); + add(ATTR_DAEMON, "true"); // In case of the parsing error, JMX remote disabled or port not being set // node attribute won't be set. @@ -1220,7 +1208,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (portStr != null) try { - add(attrs, ATTR_JMX_PORT, Integer.parseInt(portStr)); + add(ATTR_JMX_PORT, Integer.parseInt(portStr)); } catch (NumberFormatException ignore) { // No-op. @@ -1228,49 +1216,46 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } // Whether restart is enabled and stick the attribute. - add(attrs, ATTR_RESTART_ENABLED, Boolean.toString(isRestartEnabled())); + add(ATTR_RESTART_ENABLED, Boolean.toString(isRestartEnabled())); // Save port range, port numbers will be stored by rest processor at runtime. if (cfg.getConnectorConfiguration() != null) - add(attrs, ATTR_REST_PORT_RANGE, cfg.getConnectorConfiguration().getPortRange()); + add(ATTR_REST_PORT_RANGE, cfg.getConnectorConfiguration().getPortRange()); // Stick in SPI versions and classes attributes. - addAttributes(attrs, cfg.getCollisionSpi()); - addAttributes(attrs, cfg.getSwapSpaceSpi()); - addAttributes(attrs, cfg.getDiscoverySpi()); - addAttributes(attrs, cfg.getFailoverSpi()); - addAttributes(attrs, cfg.getCommunicationSpi()); - addAttributes(attrs, cfg.getEventStorageSpi()); - addAttributes(attrs, cfg.getCheckpointSpi()); - addAttributes(attrs, cfg.getLoadBalancingSpi()); - addAttributes(attrs, cfg.getDeploymentSpi()); + addSpiAttributes(cfg.getCollisionSpi()); + addSpiAttributes(cfg.getSwapSpaceSpi()); + addSpiAttributes(cfg.getDiscoverySpi()); + addSpiAttributes(cfg.getFailoverSpi()); + addSpiAttributes(cfg.getCommunicationSpi()); + addSpiAttributes(cfg.getEventStorageSpi()); + addSpiAttributes(cfg.getCheckpointSpi()); + addSpiAttributes(cfg.getLoadBalancingSpi()); + addSpiAttributes(cfg.getDeploymentSpi()); // Set user attributes for this node. if (cfg.getUserAttributes() != null) { for (Map.Entry e : cfg.getUserAttributes().entrySet()) { - if (attrs.containsKey(e.getKey())) + if (ctx.hasNodeAttribute(e.getKey())) U.warn(log, "User or internal attribute has the same name as environment or system " + "property and will take precedence: " + e.getKey()); - attrs.put(e.getKey(), e.getValue()); + ctx.addNodeAttribute(e.getKey(), e.getValue()); } } - - return attrs; } /** * Add SPI version and class attributes into node attributes. * - * @param attrs Node attributes map to add SPI attributes to. * @param spiList Collection of SPIs to get attributes from. * @throws IgniteCheckedException Thrown if was unable to set up attribute. */ - private void addAttributes(Map attrs, IgniteSpi... spiList) throws IgniteCheckedException { + private void addSpiAttributes(IgniteSpi... spiList) throws IgniteCheckedException { for (IgniteSpi spi : spiList) { Class spiCls = spi.getClass(); - add(attrs, U.spiAttribute(spi, ATTR_SPI_CLASS), spiCls.getName()); + add(U.spiAttribute(spi, ATTR_SPI_CLASS), spiCls.getName()); } } @@ -1387,23 +1372,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** - * @param ctx Kernal context. * @param mgr Manager to start. - * @param attrs SPI attributes to set. * @throws IgniteCheckedException Throw in case of any errors. */ - private void startManager(GridKernalContextImpl ctx, GridManager mgr, Map attrs) - throws IgniteCheckedException { - mgr.addSpiAttributes(attrs); - - // Set all node attributes into discovery manager, - // so they can be distributed to all nodes. - if (mgr instanceof GridDiscoveryManager) - ((GridDiscoveryManager)mgr).setNodeAttributes(attrs, VER); - - // Add manager to registry before it starts to avoid - // cases when manager is started but registry does not - // have it yet. + private void startManager(GridManager mgr) throws IgniteCheckedException { + // Add manager to registry before it starts to avoid cases when manager is started + // but registry does not have it yet. ctx.add(mgr); try { @@ -1415,19 +1389,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** - * @param ctx Kernal context. * @param proc Processor to start. - * @param attrs Attributes. * @throws IgniteCheckedException Thrown in case of any error. */ - private void startProcessor(GridKernalContextImpl ctx, GridProcessor proc, Map attrs) - throws IgniteCheckedException { + private void startProcessor(GridProcessor proc) throws IgniteCheckedException { ctx.add(proc); try { proc.start(); - - proc.addAttributes(attrs); } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Failed to start processor: " + proc, e); @@ -1437,10 +1406,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * Add helper. * - * @param ctx Context. * @param helper Helper. */ - private void addHelper(GridKernalContextImpl ctx, Object helper) { + private void addHelper(Object helper) { ctx.addHelper(helper); } @@ -1772,7 +1740,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (gw.tryWriteLock(10)) break; } - catch (InterruptedException e) { + catch (InterruptedException ignored) { // Preserve interrupt status & ignore. // Note that interrupted flag is cleared. interrupted = true; @@ -2087,10 +2055,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * Prints security status. - * - * @param ctx Kernal context. */ - private void ackSecurity(GridKernalContext ctx) { + private void ackSecurity() { assert log != null; if (log.isInfoEnabled()) @@ -2197,7 +2163,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @SuppressWarnings("unchecked") @Override public String executeTask(String taskName, String arg) throws JMException { try { - return compute().execute(taskName, arg); + return compute().execute(taskName, arg); } catch (IgniteException e) { throw U.jmException(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManager.java index 15148e0..236d83b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManager.java @@ -17,12 +17,9 @@ package org.apache.ignite.internal.managers; -import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; -import java.util.*; - /** * This interface defines life-cycle for kernal manager. Managers provide layer of indirection * between kernal and SPI modules. Kernel never calls SPI modules directly but @@ -31,16 +28,7 @@ import java.util.*; @GridToStringExclude public interface GridManager extends GridComponent { /** - * Adds attributes from underlying SPI to map of all attributes. - * - * @param attrs Map of all attributes gotten from SPI's so far. - * @throws IgniteCheckedException Wrapper for exception thrown by underlying SPI. - */ - public void addSpiAttributes(Map attrs) throws IgniteCheckedException; - - /** - * @return Returns {@code true} if at least one SPI does not have a {@code NO-OP} - * implementation, {@code false} otherwise. + * @return {@code true} if at least one SPI does not have a {@code NO-OP} implementation, {@code false} otherwise. */ public boolean enabled(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 35e8989..ce80bfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -90,6 +90,38 @@ public abstract class GridManagerAdapter implements GridMan log = ctx.log(getClass()); } + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + for (T spi : spis) { + // Inject all spi resources. + ctx.resource().inject(spi); + + // Inject SPI internal objects. + inject(spi); + + try { + Map retval = spi.getNodeAttributes(); + + if (retval != null) { + for (Map.Entry e : retval.entrySet()) { + if (ctx.hasNodeAttribute(e.getKey())) + throw new IgniteCheckedException("SPI attribute collision for attribute [spi=" + spi + + ", attr=" + e.getKey() + ']' + + ". Attribute set by one SPI implementation has the same name (name collision) as " + + "attribute set by other SPI implementation. Such overriding is not allowed. " + + "Please check your Ignite configuration and/or SPI implementation to avoid " + + "attribute name collisions."); + + ctx.addNodeAttribute(e.getKey(), e.getValue()); + } + } + } + catch (IgniteSpiException e) { + throw new IgniteCheckedException("Failed to get SPI attributes.", e); + } + } + } + /** * Gets wrapped SPI. * @@ -130,38 +162,6 @@ public abstract class GridManagerAdapter implements GridMan return spis; } - /** {@inheritDoc} */ - @Override public final void addSpiAttributes(Map attrs) throws IgniteCheckedException { - for (T spi : spis) { - // Inject all spi resources. - ctx.resource().inject(spi); - - // Inject SPI internal objects. - inject(spi); - - try { - Map retval = spi.getNodeAttributes(); - - if (retval != null) { - for (Map.Entry e : retval.entrySet()) { - if (attrs.containsKey(e.getKey())) - throw new IgniteCheckedException("SPI attribute collision for attribute [spi=" + spi + - ", attr=" + e.getKey() + ']' + - ". Attribute set by one SPI implementation has the same name (name collision) as " + - "attribute set by other SPI implementation. Such overriding is not allowed. " + - "Please check your Ignite configuration and/or SPI implementation to avoid " + - "attribute name collisions."); - - attrs.put(e.getKey(), e.getValue()); - } - } - } - catch (IgniteSpiException e) { - throw new IgniteCheckedException("Failed to get SPI attributes.", e); - } - } - } - /** * @param spi SPI whose internal objects need to be injected. * @throws IgniteCheckedException If injection failed. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java index 291930b..e3776a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java @@ -75,6 +75,8 @@ public class GridCheckpointManager extends GridManagerAdapter { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { + super.start(); + if (ctx.config().isDaemon()) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java index 2f00fb7..5ac0026 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java @@ -44,6 +44,8 @@ public class GridCollisionManager extends GridManagerAdapter { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { + super.start(); + if (ctx.config().isDaemon()) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 968e93a..b2d4b40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -179,6 +179,8 @@ public class GridIoManager extends GridManagerAdapter 0, "discoveryStartupDelay > 0"); startSpi(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java index 3d3bdc8..bbfbcb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java @@ -81,6 +81,8 @@ public class GridDeploymentManager extends GridManagerAdapter { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { + super.start(); + GridProtocolHandler.registerDeploymentManager(this); assertParameter(ctx.config().getDeploymentMode() != null, "ctx.config().getDeploymentMode() != null"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 68f0a4a..b0d1fe1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -53,6 +53,7 @@ import java.util.zip.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.IgniteNodeAttributes.*; +import static org.apache.ignite.internal.IgniteVersionUtils.*; import static org.apache.ignite.plugin.segmentation.GridSegmentationPolicy.*; /** @@ -189,14 +190,10 @@ public class GridDiscoveryManager extends GridManagerAdapter { } } - /** - * Sets local node attributes into discovery SPI. - * - * @param attrs Attributes to set. - * @param ver Version. - */ - public void setNodeAttributes(Map attrs, IgniteProductVersion ver) { - // TODO GG-7574 move to metrics processor? + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + super.start(); + long totSysMemory = -1; try { @@ -206,13 +203,12 @@ public class GridDiscoveryManager extends GridManagerAdapter { // No-op. } - attrs.put(IgniteNodeAttributes.ATTR_PHY_RAM, totSysMemory); + ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_PHY_RAM, totSysMemory); - getSpi().setNodeAttributes(attrs, ver); - } + DiscoverySpi spi = getSpi(); + + spi.setNodeAttributes(ctx.nodeAttributes(), VER); - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { discoOrdered = discoOrdered(); histSupported = historySupported(); @@ -236,10 +232,10 @@ public class GridDiscoveryManager extends GridManagerAdapter { new IgniteThread(metricsUpdater).start(); - getSpi().setMetricsProvider(createMetricsProvider()); + spi.setMetricsProvider(createMetricsProvider()); if (ctx.security().enabled()) { - getSpi().setAuthenticator(new DiscoverySpiNodeAuthenticator() { + spi.setAuthenticator(new DiscoverySpiNodeAuthenticator() { @Override public SecurityContext authenticateNode(ClusterNode node, GridSecurityCredentials cred) { try { return ctx.security().authenticateNode(node, cred); @@ -255,7 +251,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { }); } - getSpi().setListener(new DiscoverySpiListener() { + spi.setListener(new DiscoverySpiListener() { @Override public void onDiscovery( int type, long topVer, @@ -316,7 +312,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { } }); - getSpi().setDataExchange(new DiscoverySpiDataExchange() { + spi.setDataExchange(new DiscoverySpiDataExchange() { @Override public Map collect(UUID nodeId) { assert nodeId != null; @@ -368,7 +364,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { checkAttributes(discoCache().remoteNodes()); - locNode = getSpi().getLocalNode(); + locNode = spi.getLocalNode(); topVer.setIfGreater(locNode.order()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 82af8bf..d936a86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -222,6 +222,8 @@ public class GridEventStorageManager extends GridManagerAdapter /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { + super.start(); + Map, int[]> evtLsnrs = ctx.config().getLocalEventListeners(); if (evtLsnrs != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java index 714cccb..26cf308 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java @@ -39,6 +39,8 @@ public class GridFailoverManager extends GridManagerAdapter { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { + super.start(); + startSpi(); if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java index 3fe7839..ebe72de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java @@ -45,6 +45,8 @@ public class GridIndexingManager extends GridManagerAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ @Override public void start() throws IgniteCheckedException { + super.start(); + if (ctx.config().isDaemon()) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/loadbalancer/GridLoadBalancerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/loadbalancer/GridLoadBalancerManager.java index ac961b7..338d346 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/loadbalancer/GridLoadBalancerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/loadbalancer/GridLoadBalancerManager.java @@ -45,6 +45,8 @@ public class GridLoadBalancerManager extends GridManagerAdapter { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { + super.start(); + if (ctx.config().isDaemon()) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessor.java index 5d9181e..49de3a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessor.java @@ -17,22 +17,12 @@ package org.apache.ignite.internal.processors; -import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; -import java.util.*; - /** * Interface for all processors. */ @GridToStringExclude public interface GridProcessor extends GridComponent { - /** - * Adds attributes from this component to map of all node attributes. - * - * @param attrs Map of all attributes. - * @throws IgniteCheckedException If failed. - */ - public void addAttributes(Map attrs) throws IgniteCheckedException; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java index f3d0a81..cbd8991 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java @@ -113,11 +113,6 @@ public abstract class GridProcessorAdapter implements GridProcessor { } /** {@inheritDoc} */ - @Override public void addAttributes(Map attrs) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e99c706..a4e8e52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -896,6 +896,32 @@ public class GridCacheProcessor extends GridProcessorAdapter { transactions = new IgniteTransactionsImpl(sharedCtx); + if (!(ctx.isDaemon() || F.isEmpty(ctx.config().getCacheConfiguration()))) { + GridCacheAttributes[] attrVals = new GridCacheAttributes[ctx.config().getCacheConfiguration().length]; + + Map interceptors = new HashMap<>(); + + int i = 0; + + for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { + assert caches.containsKey(cfg.getName()) : cfg.getName(); + + GridCacheContext ctx = caches.get(cfg.getName()).context(); + + attrVals[i++] = new GridCacheAttributes(cfg, ctx.store().configuredStore()); + + if (cfg.getInterceptor() != null) + interceptors.put(cfg.getName(), cfg.getInterceptor().getClass().getName()); + } + + ctx.addNodeAttribute(ATTR_CACHE, attrVals); + + ctx.addNodeAttribute(ATTR_TX_CONFIG, ctx.config().getTransactionConfiguration()); + + if (!interceptors.isEmpty()) + ctx.addNodeAttribute(ATTR_CACHE_INTERCEPTORS, interceptors); + } + if (log.isDebugEnabled()) log.debug("Started cache processor."); } @@ -926,36 +952,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { ); } - /** {@inheritDoc} */ - @Override public void addAttributes(Map attrs) throws IgniteCheckedException { - if (ctx.isDaemon() || F.isEmpty(ctx.config().getCacheConfiguration())) - return; - - GridCacheAttributes[] attrVals = new GridCacheAttributes[ctx.config().getCacheConfiguration().length]; - - Map interceptors = new HashMap<>(); - - int i = 0; - - for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { - assert caches.containsKey(cfg.getName()) : cfg.getName(); - - GridCacheContext ctx = caches.get(cfg.getName()).context(); - - attrVals[i++] = new GridCacheAttributes(cfg, ctx.store().configuredStore()); - - if (cfg.getInterceptor() != null) - interceptors.put(cfg.getName(), cfg.getInterceptor().getClass().getName()); - } - - attrs.put(ATTR_CACHE, attrVals); - - attrs.put(ATTR_TX_CONFIG, ctx.config().getTransactionConfiguration()); - - if (!interceptors.isEmpty()) - attrs.put(ATTR_CACHE_INTERCEPTORS, interceptors); - } - /** * Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java index 5beef88..19b4f5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java @@ -109,14 +109,9 @@ public class GridClockSyncProcessor extends GridProcessorAdapter { timeCoord0.onDiscoveryEvent(discoEvt); } }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED); - } - - /** {@inheritDoc} */ - @Override public void addAttributes(Map attrs) throws IgniteCheckedException { - super.addAttributes(attrs); - attrs.put(ATTR_TIME_SERVER_HOST, srv.host()); - attrs.put(ATTR_TIME_SERVER_PORT, srv.port()); + ctx.addNodeAttribute(ATTR_TIME_SERVER_HOST, srv.host()); + ctx.addNodeAttribute(ATTR_TIME_SERVER_PORT, srv.port()); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java index 847cd50..4c161ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java @@ -98,6 +98,54 @@ public class IgfsProcessor extends IgfsProcessorAdapter { if (log.isDebugEnabled()) log.debug("IGFS processor started."); + + IgniteConfiguration gridCfg = ctx.config(); + + // Node doesn't have IGFS if it: + // is daemon; + // doesn't have configured IGFS; + // doesn't have configured caches. + if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getIgfsConfiguration()) || + F.isEmpty(gridCfg.getCacheConfiguration())) + return; + + final Map cacheCfgs = new HashMap<>(); + + F.forEach(gridCfg.getCacheConfiguration(), new CI1() { + @Override public void apply(CacheConfiguration c) { + cacheCfgs.put(c.getName(), c); + } + }); + + Collection attrVals = new ArrayList<>(); + + assert gridCfg.getIgfsConfiguration() != null; + + for (IgfsConfiguration igfsCfg : gridCfg.getIgfsConfiguration()) { + CacheConfiguration cacheCfg = cacheCfgs.get(igfsCfg.getDataCacheName()); + + if (cacheCfg == null) + continue; // No cache for the given IGFS configuration. + + CacheAffinityKeyMapper affMapper = cacheCfg.getAffinityMapper(); + + if (!(affMapper instanceof IgfsGroupDataBlocksKeyMapper)) + // Do not create IGFS attributes for such a node nor throw error about invalid configuration. + // Configuration will be validated later, while starting IgfsProcessor. + continue; + + attrVals.add(new IgfsAttributes( + igfsCfg.getName(), + igfsCfg.getBlockSize(), + ((IgfsGroupDataBlocksKeyMapper)affMapper).groupSize(), + igfsCfg.getMetaCacheName(), + igfsCfg.getDataCacheName(), + igfsCfg.getDefaultMode(), + igfsCfg.getPathModes(), + igfsCfg.isFragmentizerEnabled())); + } + + ctx.addNodeAttribute(ATTR_IGFS, attrVals.toArray(new IgfsAttributes[attrVals.size()])); } /** {@inheritDoc} */ @@ -191,60 +239,6 @@ public class IgfsProcessor extends IgfsProcessorAdapter { return new IgfsJobImpl(job, igfsName, path, start, len, recRslv); } - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void addAttributes(Map attrs) throws IgniteCheckedException { - super.addAttributes(attrs); - - IgniteConfiguration gridCfg = ctx.config(); - - // Node doesn't have IGFS if it: - // is daemon; - // doesn't have configured IGFS; - // doesn't have configured caches. - if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getIgfsConfiguration()) || - F.isEmpty(gridCfg.getCacheConfiguration())) - return; - - final Map cacheCfgs = new HashMap<>(); - - F.forEach(gridCfg.getCacheConfiguration(), new CI1() { - @Override public void apply(CacheConfiguration c) { - cacheCfgs.put(c.getName(), c); - } - }); - - Collection attrVals = new ArrayList<>(); - - assert gridCfg.getIgfsConfiguration() != null; - - for (IgfsConfiguration igfsCfg : gridCfg.getIgfsConfiguration()) { - CacheConfiguration cacheCfg = cacheCfgs.get(igfsCfg.getDataCacheName()); - - if (cacheCfg == null) - continue; // No cache for the given IGFS configuration. - - CacheAffinityKeyMapper affMapper = cacheCfg.getAffinityMapper(); - - if (!(affMapper instanceof IgfsGroupDataBlocksKeyMapper)) - // Do not create IGFS attributes for such a node nor throw error about invalid configuration. - // Configuration will be validated later, while starting IgfsProcessor. - continue; - - attrVals.add(new IgfsAttributes( - igfsCfg.getName(), - igfsCfg.getBlockSize(), - ((IgfsGroupDataBlocksKeyMapper)affMapper).groupSize(), - igfsCfg.getMetaCacheName(), - igfsCfg.getDataCacheName(), - igfsCfg.getDefaultMode(), - igfsCfg.getPathModes(), - igfsCfg.isFragmentizerEnabled())); - } - - attrs.put(ATTR_IGFS, attrVals.toArray(new IgfsAttributes[attrVals.size()])); - } - /** * @param name Cache name. * @return Masked name accounting for {@code nulls}. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 8c9ef1d..97d3aa0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -55,6 +55,9 @@ public class GridRestProcessor extends GridProcessorAdapter { private static final String HTTP_PROTO_CLS = "org.apache.ignite.internal.processors.rest.protocols.http.jetty.GridJettyRestProtocol"; + /** */ + public static final byte[] ZERO_BYTES = new byte[0]; + /** Protocols. */ private final Collection protos = new ArrayList<>(); @@ -177,13 +180,8 @@ public class GridRestProcessor extends GridProcessorAdapter { GridRestResponse res = new GridRestResponse(STATUS_SECURITY_CHECK_FAILED, e.getMessage()); - try { - updateSession(req, subjCtx); - res.sessionTokenBytes(new byte[0]); - } - catch (IgniteCheckedException e1) { - U.warn(log, "Cannot update response session token: " + e1.getMessage()); - } + updateSession(req, subjCtx); + res.sessionTokenBytes(ZERO_BYTES); return new GridFinishedFuture<>(ctx, res); } @@ -223,13 +221,8 @@ public class GridRestProcessor extends GridProcessorAdapter { assert res != null; if (ctx.security().enabled()) { - try { - updateSession(req, subjCtx0); - res.sessionTokenBytes(new byte[0]); - } - catch (IgniteCheckedException e) { - U.warn(log, "Cannot update response session token: " + e.getMessage()); - } + updateSession(req, subjCtx0); + res.sessionTokenBytes(ZERO_BYTES); } interceptResponse(res, req); @@ -260,6 +253,25 @@ public class GridRestProcessor extends GridProcessorAdapter { // Start protocols. startTcpProtocol(); startHttpProtocol(); + + for (GridRestProtocol proto : protos) { + Collection> props = proto.getProperties(); + + if (props != null) { + for (IgniteBiTuple p : props) { + String key = p.getKey(); + + if (key == null) + continue; + + if (ctx.hasNodeAttribute(key)) + throw new IgniteCheckedException( + "Node attribute collision for attribute [processor=GridRestProcessor, attr=" + key + ']'); + + ctx.addNodeAttribute(key, p.getValue()); + } + } + } } } @@ -307,28 +319,6 @@ public class GridRestProcessor extends GridProcessorAdapter { } } - /** {@inheritDoc} */ - @Override public void addAttributes(Map attrs) throws IgniteCheckedException { - for (GridRestProtocol proto : protos) { - Collection> props = proto.getProperties(); - - if (props != null) { - for (IgniteBiTuple p : props) { - String key = p.getKey(); - - if (key == null) - continue; - - if (attrs.containsKey(key)) - throw new IgniteCheckedException( - "Node attribute collision for attribute [processor=GridRestProcessor, attr=" + key + ']'); - - attrs.put(key, p.getValue()); - } - } - } - } - /** * Applies {@link ConnectorMessageInterceptor} * from {@link ConnectorConfiguration#getMessageInterceptor()} ()} @@ -514,7 +504,7 @@ public class GridRestProcessor extends GridProcessorAdapter { * @param req REST request. * @param sCtx Security context. */ - private void updateSession(GridRestRequest req, SecurityContext sCtx) throws IgniteCheckedException { + private void updateSession(GridRestRequest req, SecurityContext sCtx) { if (sCtx != null) { UUID id = req.clientId(); sesMap.put(id, sCtx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java index 030a3ea..fb6cb85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java @@ -255,6 +255,18 @@ public class GridStreamProcessor extends GridProcessorAdapter { "assign unique name to each streamer): " + c.getName()); } } + + if (F.isEmpty(cfg)) + return; + + GridStreamerAttributes[] arr = new GridStreamerAttributes[cfg.length]; + + int i = 0; + + for (StreamerConfiguration c : cfg) + arr[i++] = new GridStreamerAttributes(c); + + ctx.addNodeAttribute(ATTR_STREAMER, arr); } /** {@inheritDoc} */ @@ -290,25 +302,6 @@ public class GridStreamProcessor extends GridProcessorAdapter { s.stop(cancel); } - /** {@inheritDoc} */ - @Override public void addAttributes(Map attrs) throws IgniteCheckedException { - super.addAttributes(attrs); - - StreamerConfiguration[] cfg = ctx.config().getStreamerConfiguration(); - - if (F.isEmpty(cfg)) - return; - - GridStreamerAttributes[] arr = new GridStreamerAttributes[cfg.length]; - - int i = 0; - - for (StreamerConfiguration c : cfg) - arr[i++] = new GridStreamerAttributes(c); - - attrs.put(ATTR_STREAMER, arr); - } - /** * @return Default no-name streamer. */ diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java index e41d6f7..1d965f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java @@ -67,10 +67,9 @@ public interface PluginProvider { * Starts grid component. * * @param ctx Plugin context. - * @param attrs Attributes. * @throws IgniteCheckedException Throws in case of any errors. */ - public void start(PluginContext ctx, Map attrs) throws IgniteCheckedException; + public void start(PluginContext ctx) throws IgniteCheckedException; /** * Stops grid component. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java index 7470174..d5fcddf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java @@ -53,6 +53,7 @@ public abstract class GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { IgniteConfiguration c = super.getConfiguration(gridName); + // Override node attributes in discovery spi. TcpDiscoverySpi spi = new TcpDiscoverySpi() { @Override public void setNodeAttributes(Map attrs, IgniteProductVersion ver) { super.setNodeAttributes(attrs, ver); -- 1.8.5.2.msysgit.0