Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java (revision ) @@ -24,6 +24,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.igfs.client.IgfsClientManager; import org.jetbrains.annotations.Nullable; /** @@ -39,6 +40,9 @@ /** Managers. */ private List mgrs = new LinkedList<>(); + /** Closure manager. */ + private final IgfsClientManager cloMgr; + /** Meta manager. */ private final IgfsMetaManager metaMgr; @@ -55,8 +59,11 @@ private final IgfsEx igfs; /** + * Constructor. + * * @param ctx Kernal context. * @param cfg IGFS configuration. + * @param cloMgr Closure manager. * @param metaMgr Meta manager. * @param dataMgr Data manager. * @param srvMgr Server manager. @@ -66,6 +73,7 @@ public IgfsContext( GridKernalContext ctx, FileSystemConfiguration cfg, + IgfsClientManager cloMgr, IgfsMetaManager metaMgr, IgfsDataManager dataMgr, IgfsServerManager srvMgr, @@ -74,6 +82,8 @@ this.ctx = ctx; this.cfg = cfg; + this.cloMgr = cloMgr; + this.metaMgr = add(metaMgr); this.dataMgr = add(dataMgr); this.srvMgr = add(srvMgr); @@ -108,6 +118,13 @@ */ public List managers() { return mgrs; + } + + /** + * @return Closure manager. + */ + public IgfsClientManager closure() { + return cloMgr; } /** \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInOperation.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInOperation.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInOperation.java (revision ) @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.UUID; + +/** + * IGFS client closure incoming operation descriptor. + */ +public class IgfsClientInOperation { + /** Target node ID. */ + private final UUID nodeId; + + /** Request. */ + private final IgfsClientRequest req; + + /** + * Constructor. + * + * @param nodeId Target node ID. + * @param req Request. + */ + public IgfsClientInOperation(UUID nodeId, IgfsClientRequest req) { + this.nodeId = nodeId; + this.req = req; + } + + /** + * @return Target node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Target operation. + */ + public IgfsClientRequest request() { + return req; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientInOperation.class, this); + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java (revision ) @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +import java.nio.ByteBuffer; + +/** + * IGFS client closure execute response. + */ +public class IgfsClientResponse implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Message ID. */ + private long msgId; + + /** Response type. */ + private IgfsClientResponseType typ; + + /** Result. */ + @GridToStringInclude + private Object res; + + /** Result bytes. */ + @GridToStringExclude + private byte[] resBytes; + + /** + * Default constructor. + */ + public IgfsClientResponse() { + // No-op. + } + + /** + * Constructor. + * + * @param msgId Message ID. + * @param typ Type. + * @param res Result. + * @param resBytes Result bytes. + */ + public IgfsClientResponse(long msgId, IgfsClientResponseType typ, @Nullable Object res, + @Nullable byte[] resBytes) { + this.msgId = msgId; + this.typ = typ; + this.res = res; + this.resBytes = resBytes; + } + + /** + * @return Message ID. + */ + public long messageId() { + return msgId; + } + + /** + * @return Type. + */ + public IgfsClientResponseType type() { + return typ; + } + + /** + * @return Result. + */ + @Nullable public Object result() { + return res; + } + + /** + * @return Result bytes. + */ + @Nullable public byte[] resultBytes() { + return resBytes; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -28; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return (byte)(typ == IgfsClientResponseType.NULL || typ == IgfsClientResponseType.MARSH_ERR ? 2 : 3); + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("msgId", msgId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("typ", typ.ordinal())) + return false; + + writer.incrementState(); + + default: { + if (typ == IgfsClientResponseType.BOOL) { + if (!writer.writeBoolean("res", (boolean)res)) + return false; + } + else if (typ == IgfsClientResponseType.OBJ || typ == IgfsClientResponseType.ERR) { + if (!writer.writeByteArray("resBytes", resBytes)) + return false; + } + + writer.incrementState(); + } + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + msgId = reader.readLong("msgId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + int typOrd; + + typOrd = reader.readInt("typ"); + + if (!reader.isLastRead()) + return false; + + typ = IgfsClientResponseType.fromOrdinal(typOrd); + + reader.incrementState(); + + default: { + if (typ == IgfsClientResponseType.BOOL) { + res = reader.readBoolean("res"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + else if (typ == IgfsClientResponseType.OBJ || typ == IgfsClientResponseType.ERR) { + resBytes = reader.readByteArray("resBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + } + } + + return reader.afterMessageRead(IgfsClientResponse.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientResponse.class, this); + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRenameCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRenameCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRenameCallable.java (revision ) @@ -24,12 +24,18 @@ import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** * IGFS client rename callable. */ public class IgfsClientRenameCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 9; + /** */ private static final long serialVersionUID = 0L; @@ -51,13 +57,13 @@ * @param destPath Destination path. */ public IgfsClientRenameCallable(@Nullable String igfsName, IgfsPath srcPath, IgfsPath destPath) { - super(igfsName, srcPath); + super(TYPE_ID, igfsName, srcPath); this.destPath = destPath; } /** {@inheritDoc} */ - @Override protected Void call0(IgfsContext ctx) throws Exception { + @Override public Void call0(IgfsContext ctx) throws Exception { ctx.igfs().rename(path, destPath); return null; @@ -71,6 +77,28 @@ /** {@inheritDoc} */ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException { destPath = IgfsUtils.readPath(reader); + } + + /** {@inheritDoc} */ + @Override protected byte fieldsCount0() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected boolean writeTo0(MessageWriter writer, int fieldId) { + assert fieldId == 0; + + return writer.writeString("destPath", destPath.toString()); + } + + /** {@inheritDoc} */ + @Override protected void readFrom0(MessageReader reader, int fieldId) { + assert fieldId == 0; + + String dstPathStr = reader.readString("destPath"); + + if (reader.isLastRead()) + destPath = new IgfsPath(dstPathStr); } /** {@inheritDoc} */ Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientMkdirsCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientMkdirsCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientMkdirsCallable.java (revision ) @@ -24,6 +24,9 @@ import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; import java.util.Map; @@ -32,6 +35,9 @@ * IGFS client mkdirs callable. */ public class IgfsClientMkdirsCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 8; + /** */ private static final long serialVersionUID = 0L; @@ -53,13 +59,13 @@ * @param props Properties. */ public IgfsClientMkdirsCallable(@Nullable String igfsName, IgfsPath path, @Nullable Map props) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); this.props = props; } /** {@inheritDoc} */ - @Override protected Void call0(IgfsContext ctx) throws Exception { + @Override public Void call0(IgfsContext ctx) throws Exception { ctx.igfs().mkdirs(path, props); return null; @@ -73,6 +79,25 @@ /** {@inheritDoc} */ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException { props = IgfsUtils.readProperties(reader); + } + + /** {@inheritDoc} */ + @Override protected byte fieldsCount0() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected boolean writeTo0(MessageWriter writer, int fieldId) { + assert fieldId == 0; + + return writer.writeMap("props", props, MessageCollectionItemType.STRING, MessageCollectionItemType.STRING); + } + + /** {@inheritDoc} */ + @Override protected void readFrom0(MessageReader reader, int fieldId) { + assert fieldId == 0; + + props = reader.readMap("recursive", MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false); } /** {@inheritDoc} */ Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSizeCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSizeCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSizeCallable.java (revision ) @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.igfs.client; -import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.util.typedef.internal.S; @@ -27,6 +26,9 @@ * IGFS client size callable. */ public class IgfsClientSizeCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 11; + /** */ private static final long serialVersionUID = 0L; @@ -44,11 +46,11 @@ * @param path Path. */ public IgfsClientSizeCallable(@Nullable String igfsName, IgfsPath path) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); } /** {@inheritDoc} */ - @Override protected Long call0(IgfsContext ctx) throws Exception { + @Override public Long call0(IgfsContext ctx) throws Exception { return ctx.igfs().size(path); } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientListPathsCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientListPathsCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientListPathsCallable.java (revision ) @@ -28,6 +28,9 @@ * IGFS client list paths callable. */ public class IgfsClientListPathsCallable extends IgfsClientAbstractCallable> { + /** Type ID. */ + public static final short TYPE_ID = 7; + /** */ private static final long serialVersionUID = 0L; @@ -45,11 +48,11 @@ * @param path Path. */ public IgfsClientListPathsCallable(@Nullable String igfsName, IgfsPath path) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); } /** {@inheritDoc} */ - @Override protected Collection call0(IgfsContext ctx) throws Exception { + @Override public Collection call0(IgfsContext ctx) throws Exception { return ctx.igfs().listPaths(path); } Index: modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java (revision ) @@ -318,7 +318,12 @@ else clientLog = IgfsLogger.disabledLogger(); + try { - modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); + modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); + } + catch (IgniteCheckedException ice) { + throw new IOException(ice); + } boolean initSecondary = paths.defaultMode() == PROXY; \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java (revision ) @@ -33,6 +33,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.processors.igfs.client.IgfsClientManager; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.ipc.IpcServerEndpoint; import org.apache.ignite.internal.util.typedef.C1; @@ -83,11 +84,16 @@ private final ConcurrentMap igfsCache = new ConcurrentHashMap8<>(); + /** Client closure manager. */ + private final IgfsClientManager cloMgr; + /** * @param ctx Kernal context. */ public IgfsProcessor(GridKernalContext ctx) { super(ctx); + + cloMgr = new IgfsClientManager(ctx); } /** {@inheritDoc} */ @@ -127,10 +133,11 @@ IgfsContext igfsCtx = new IgfsContext( ctx, cfg0, - new IgfsMetaManager(cfg0.isRelaxedConsistency(), metaClient), - new IgfsDataManager(), - new IgfsServerManager(), - new IgfsFragmentizerManager()); + cloMgr, + new IgfsMetaManager(ctx, cfg0.isRelaxedConsistency(), metaClient), + new IgfsDataManager(ctx), + new IgfsServerManager(ctx), + new IgfsFragmentizerManager(ctx)); // Start managers first. for (IgfsManager mgr : igfsCtx.managers()) @@ -139,6 +146,9 @@ igfsCache.put(maskName(cfg0.getName()), igfsCtx); } + // Start closure manager last. + cloMgr.start(null); + if (log.isDebugEnabled()) log.debug("IGFS processor started."); @@ -201,10 +211,14 @@ for (IgfsContext igfsCtx : igfsCache.values()) for (IgfsManager mgr : igfsCtx.managers()) mgr.onKernalStart(); + + cloMgr.onKernalStart(); } /** {@inheritDoc} */ @Override public void stop(boolean cancel) { + cloMgr.stop(cancel); + // Stop IGFS instances. for (IgfsContext igfsCtx : igfsCache.values()) { if (log.isDebugEnabled()) @@ -229,6 +243,8 @@ /** {@inheritDoc} */ @Override public void onKernalStop(boolean cancel) { + cloMgr.onKernalStop(cancel); + for (IgfsContext igfsCtx : igfsCache.values()) { if (log.isDebugEnabled()) log.debug("Stopping igfs: " + igfsCtx.configuration().getName()); @@ -388,8 +404,6 @@ if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs)) return; - - assert rmtAttrs != null && locAttrs != null; for (IgfsAttributes rmtAttr : rmtAttrs) for (IgfsAttributes locAttr : locAttrs) { \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsOneClientNodeTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsOneClientNodeTest.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsOneClientNodeTest.java (revision ) @@ -100,6 +100,7 @@ /** * @throws Exception If failed. */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void testStartIgfs() throws Exception { final IgfsImpl igfs = (IgfsImpl) grid(0).fileSystem("igfs"); @@ -108,7 +109,7 @@ IgfsAbstractSelfTest.create(igfs, new IgfsPath[]{new IgfsPath("/dir")}, null); return null; } - }, IgfsException.class, "Failed to execute operation because there are no IGFS metadata nodes."); + }, IgfsException.class, "Failed to execute operation because there are no IGFS metadata nodes [igfs=igfs]"); GridTestUtils.assertThrows(log, new Callable() { @Override public Object call() throws Exception { @@ -118,7 +119,7 @@ return null; } - }, IgfsException.class, "Failed to execute operation because there are no IGFS metadata nodes."); + }, IgfsException.class, "Failed to execute operation because there are no IGFS metadata nodes [igfs=igfs]"); GridTestUtils.assertThrows(log, new Callable() { @Override public Object call() throws Exception { @@ -128,6 +129,6 @@ return null; } - }, IgfsException.class, "Failed to execute operation because there are no IGFS metadata nodes."); + }, IgfsException.class, "Failed to execute operation because there are no IGFS metadata nodes [igfs=igfs]"); } } \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java (revision ) @@ -43,8 +43,8 @@ * will not be affected. Other properties will be added or overwritten. Passed properties with {@code null} values * will be removed from the stored properties or ignored if they don't exist in the file info. *

- * When working in {@code DUAL_SYNC} or {@code DUAL_ASYNC} modes only the following properties will be propagated - * to the secondary file system: + * When working in {@code DUAL_SYNC} or {@code DUAL_ASYNC} modes with Hadoop secondary file system only the + * following properties will be updated on the secondary file system: *

    *
  • {@code usrName} - file owner name;
  • *
  • {@code grpName} - file owner group;
  • \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java (revision ) @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.igfs; +import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; @@ -66,6 +67,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT; import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -819,11 +821,13 @@ * * @param dfltMode The root mode. Must always be not null. * @param modes The subdirectory modes. + * @param dualParentsContainingPrimaryChildren The set to store parents into. * @return Descending list of filtered and checked modes. - * @throws IgniteCheckedException On error or + * @throws IgniteCheckedException On error. */ public static ArrayList> preparePathModes(final IgfsMode dfltMode, - @Nullable List> modes) throws IgniteCheckedException { + @Nullable List> modes, Set dualParentsContainingPrimaryChildren) + throws IgniteCheckedException { if (modes == null) return null; @@ -856,6 +860,10 @@ // Add to the 1st position (deep first). resModes.add(0, mode); + + // Store primary paths inside dual paths in separate collection: + if (mode.getValue() == PRIMARY) + dualParentsContainingPrimaryChildren.add(mode.getKey().parent()); break; } \ No newline at end of file Index: modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java (revision ) @@ -309,7 +309,12 @@ else clientLog = IgfsLogger.disabledLogger(); + try { - modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); + modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); + } + catch (IgniteCheckedException ice) { + throw new IOException(ice); + } boolean initSecondary = paths.defaultMode() == PROXY; \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java (revision ) @@ -0,0 +1,567 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.igfs.IgfsContext; +import org.apache.ignite.internal.processors.igfs.IgfsImpl; +import org.apache.ignite.internal.processors.igfs.IgfsManager; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; + +/** + * Manager to handle IGFS client closures. + */ +public class IgfsClientManager extends IgfsManager { + /** Outgoing operations. */ + private final Map outOps = new ConcurrentHashMap<>(); + + /** Marshaller. */ + private final Marshaller marsh; + + /** RW lock for synchronization. */ + private final StripedCompositeReadWriteLock rwLock = + new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2); + + /** Discovery listener. */ + private final GridLocalEventListener discoLsnr = new DiscoveryListener(); + + /** IO message listener. */ + private final MessageListener msgLsnr = new MessageListener(); + + /** Pending input operations received when manager is not started yet. */ + private final ConcurrentLinkedDeque pendingOps = new ConcurrentLinkedDeque<>(); + + /** Message ID generator. */ + private final AtomicLong msgIdGen = new AtomicLong(); + + /** Worker to process pending requests. */ + private PendingRequestsWorker pendingWorker; + + /** Whether manager is fully started and ready to process requests. */ + private volatile boolean ready; + + /** Stopping flag. */ + private volatile boolean stopping; + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public IgfsClientManager(GridKernalContext ctx) { + super(ctx); + + marsh = ctx.config().getMarshaller(); + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + ctx.io().addMessageListener(GridTopic.TOPIC_IGFS_CLI, msgLsnr); + + ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); + } + + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + rwLock.writeLock().lock(); + + try { + ready = true; + + if (!pendingOps.isEmpty()) { + pendingWorker = new PendingRequestsWorker(ctx.gridName(), "igfs-client-pending-request-worker", log); + + new IgniteThread(pendingWorker).start(); + } + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + ctx.event().removeLocalEventListener(discoLsnr); + + ctx.io().removeMessageListener(GridTopic.TOPIC_IGFS_CLI, msgLsnr); + + PendingRequestsWorker pendingWorker0; + + rwLock.writeLock().lock(); + + try { + stopping = true; + + pendingWorker0 = pendingWorker; + } + finally { + rwLock.writeLock().unlock(); + } + + if (pendingWorker0 != null) { + U.cancel(pendingWorker0); + U.join(pendingWorker0, log); + } + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + pendingOps.clear(); + outOps.clear(); + } + + /** + * Execute IGFS closure. + * + * @param igfsCtx IGFS context. + * @param clo Closure. + * @return Result. + */ + public T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable clo) { + return execute(igfsCtx, clo, IgfsClientNodeSelectionStrategy.RANDOM); + } + + /** + * Execute IGFS closure. + * + * @param igfsCtx IGFS context. + * @param clo Closure. + * @param strategy Node selection strategy. + * @return Result. + */ + public T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable clo, + IgfsClientNodeSelectionStrategy strategy) { + try { + return executeAsync(igfsCtx, clo, strategy).get(); + } + catch (IgniteCheckedException e) { + throw IgfsUtils.toIgfsException(e); + } + } + + /** + * Execute IGFS closure asynchronously. + * + * @param igfsCtx IGFS context. + * @param clo Closure. + * @return Future. + */ + public IgniteInternalFuture executeAsync(IgfsContext igfsCtx, IgfsClientAbstractCallable clo) { + return executeAsync(igfsCtx, clo, IgfsClientNodeSelectionStrategy.RANDOM); + } + + /** + * Internal execution logic. + * + * @param op Output operation. + */ + private void executeAsync0(IgfsClientOutOperation op) { + while (true) { + rwLock.readLock().lock(); + + try { + if (stopping) { + op.future().onDone(new IgfsException("Failed to execute IGFS task because node is stopping.")); + + return; + } + + // Get suitable node. + ClusterNode node; + + try { + node = selectNode(op.igfsContext(), op.strategy()); + } + catch (Exception e) { + op.future().onDone(e); + + return; + } + + assert node != null; + + op.nodeId(node.id()); + + // Add operation to pending set. + long msgId = msgIdGen.incrementAndGet(); + + outOps.put(msgId, op); + + // Send request. + try { + ctx.io().send(node, GridTopic.TOPIC_IGFS_CLI, new IgfsClientRequest(msgId, op.target()), + GridIoPolicy.PUBLIC_POOL); + + return; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send message to node, will retry [nodeId=" + node.id() + + ", err=" + e + ']'); + } + } + finally { + rwLock.readLock().unlock(); + } + } + } + + /** + * Execute IGFS closure asynchronously. + * + * @param igfsCtx IGFS context. + * @param clo Closure. + * @param strategy Node selection strategy. + * @return Future. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture executeAsync(IgfsContext igfsCtx, IgfsClientAbstractCallable clo, + IgfsClientNodeSelectionStrategy strategy) { + GridFutureAdapter fut = new GridFutureAdapter(); + + IgfsClientOutOperation op = new IgfsClientOutOperation(igfsCtx, clo, strategy, fut); + + executeAsync0(op); + + return fut; + } + + /** + * Select the most appropriate node for the task. + * + * @param igfsCtx IGFS context. + * @param strategy Strategy. + * @return Node. + */ + @Nullable private ClusterNode selectNode(IgfsContext igfsCtx, IgfsClientNodeSelectionStrategy strategy) { + FileSystemConfiguration igfsCfg = igfsCtx.configuration(); + + IgniteEx ignite = igfsCtx.kernalContext().grid(); + + switch (strategy) { + case RANDOM: + ClusterGroup cluster = ignite.cluster().forIgfsMetadataDataNodes( + igfsCfg.getName(), igfsCfg.getMetaCacheName()); + + Collection nodes = cluster.nodes(); + + if (nodes.isEmpty()) + throw new IgfsException("Failed to execute operation because there are no " + + "IGFS metadata nodes [igfs=" + igfsCtx.igfs().name() + ']'); + + return F.rand(nodes); + + default: + assert strategy == IgfsClientNodeSelectionStrategy.ROOT_ID; + + return ignite.affinity(igfsCfg.getMetaCacheName()).mapKeyToNode(IgfsUtils.ROOT_ID); + } + } + + /** + * Create closure response. + * + * @param msgId Message ID. + * @param res Response. + * @return Response. + */ + private IgfsClientResponse createResponse(long msgId, @Nullable Object res, @Nullable Throwable resErr) { + try { + if (resErr != null) + return new IgfsClientResponse(msgId, IgfsClientResponseType.ERR, null, + marsh.marshal(resErr)); + else { + if (res == null) + return new IgfsClientResponse(msgId, IgfsClientResponseType.NULL, null, null); + else if (res instanceof Boolean) + return new IgfsClientResponse(msgId, IgfsClientResponseType.BOOL, res, null); + else + return new IgfsClientResponse(msgId, IgfsClientResponseType.OBJ, null, + marsh.marshal(res)); + } + } + catch (Exception e) { + U.error(log, "Failed to marshal IGFS closure result [msgId=" + msgId + ", res=" + res + + ", resErr=" + resErr + ']', e); + + return new IgfsClientResponse(msgId, IgfsClientResponseType.MARSH_ERR, null, null); + } + } + + /** + * Handle node leave event. + * + * @param nodeId Node ID. + */ + private void onNodeLeft(UUID nodeId) { + Collection retryOps = new LinkedList<>(); + + rwLock.writeLock().lock(); + + try { + if (!stopping) { + // Get the list of affected requests. + Iterator> iter = outOps.entrySet().iterator(); + + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + + IgfsClientOutOperation op = entry.getValue(); + + if (F.eq(nodeId, op.nodeId())) { + iter.remove(); + + retryOps.add(op); + } + } + } + } + finally { + rwLock.writeLock().unlock(); + } + + // Re-send affected ops. + for (IgfsClientOutOperation retryOp : retryOps) { + retryOp.nodeId(null); + + executeAsync0(retryOp); + } + } + + /** + * Handle request. + * + * @param nodeId Node ID. + * @param req Request. + */ + private void onRequest(UUID nodeId, IgfsClientRequest req) { + rwLock.readLock().lock(); + + try { + if (stopping) + return; // Discovery listener on remote node will handle node leave. + + if (ready) + processRequest(nodeId, req); // Normal execution flow. + else + // Add to pending set if manager is not operational yet. + pendingOps.addLast(new IgfsClientInOperation(nodeId, req)); + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Handle response. + * + * @param resp Response. + */ + @SuppressWarnings("unchecked") + private void onResponse(IgfsClientResponse resp) { + rwLock.readLock().lock(); + + try { + IgfsClientOutOperation op = outOps.remove(resp.messageId()); + + // Op might be null in case of concurreny local node stop or remote node stop.= discovery notification. + if (op != null) { + // Restore result. + Object res = null; + Throwable err = null; + + try { + switch (resp.type()) { + case BOOL: + res = resp.result(); + + break; + + case OBJ: + res = marsh.unmarshal(resp.resultBytes(), U.resolveClassLoader(ctx.config())); + + break; + + case ERR: + err = marsh.unmarshal(resp.resultBytes(), U.resolveClassLoader(ctx.config())); + + break; + + case MARSH_ERR: + err = new IgfsException("Failed to marshal IGFS task result on remote node " + + "(see remote node logs for more information) [nodeId + " + op.nodeId() + ']'); + + break; + + default: + assert resp.type() == IgfsClientResponseType.NULL; + } + } + catch (Exception e) { + // Something went wrong during unmarshalling. + err = new IgfsException("Failed to unmarshal IGFS task result." , e); + } + + op.future().onDone(res, err); + } + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Actual request processing. Happens inside appropriate thread pool. + * + * @param nodeId Node ID. + * @param req Request. + */ + private void processRequest(UUID nodeId, IgfsClientRequest req) { + IgfsClientResponse resp; + + try { + IgfsClientAbstractCallable target = req.target(); + + IgfsImpl igfs = (IgfsImpl) ctx.igfs().igfs(target.igfsName()); + + if (igfs == null) + throw new IgfsException("IGFS with the given name is not configured on the node: " + target.igfsName()); + + Object res = target.call0(igfs.context()); + + resp = createResponse(req.messageId(), res, null); + } + catch (Exception e) { + // Wrap exception. + resp = createResponse(req.messageId(), null, e); + } + + // Send response. + try { + ctx.io().send(nodeId, GridTopic.TOPIC_IGFS_CLI, resp, GridIoPolicy.PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send IGFS client response [nodeId=" + nodeId + + ", msgId=" + req.messageId() + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientManager.class, this); + } + + /** + * Message listener. + */ + private class MessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + assert nodeId != null; + assert msg != null; + + if (msg instanceof IgfsClientRequest) + onRequest(nodeId, (IgfsClientRequest)msg); + else if (msg instanceof IgfsClientResponse) + onResponse((IgfsClientResponse)msg); + else + U.error(log, "IGFS client message listener received unknown message: " + msg); + } + } + + /** + * Discovery listener. + */ + private class DiscoveryListener implements GridLocalEventListener { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + switch (evt.type()) { + case EVT_NODE_LEFT: + case EVT_NODE_FAILED: + DiscoveryEvent evt0 = (DiscoveryEvent) evt; + + onNodeLeft(evt0.eventNode().id()); + + break; + + default: + assert false : "Unknown event: " + evt; + } + } + } + + /** + * Pending requests worker. + */ + private class PendingRequestsWorker extends GridWorker { + /** + * Consturctor. + * + * @param gridName Grid name. + * @param name WOrker name. + * @param log Logger. + */ + public PendingRequestsWorker(@Nullable String gridName, String name, IgniteLogger log) { + super(gridName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + IgfsClientInOperation inOp; + + while ((inOp = pendingOps.pollFirst()) != null) + processRequest(inOp.nodeId(), inOp.request()); + } + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolver.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolver.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolver.java (revision ) @@ -17,9 +17,12 @@ package org.apache.ignite.internal.processors.igfs; -import java.util.Collections; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; @@ -42,19 +45,25 @@ /** Cached modes per path. */ private Map modesCache; + /** Set to store parent dual paths that have primary children. */ + private final Set dualParentsWithPrimaryChildren; + /** * Constructor * * @param dfltMode Default IGFS mode. * @param modes List of configured modes. The order is significant as modes are added in order of occurrence. */ - public IgfsModeResolver(IgfsMode dfltMode, @Nullable List> modes) { + public IgfsModeResolver(IgfsMode dfltMode, @Nullable ArrayList> modes) + throws IgniteCheckedException { assert dfltMode != null; this.dfltMode = dfltMode; - this.modes = modes; + this.dualParentsWithPrimaryChildren = new HashSet<>(); + this.modes = IgfsUtils.preparePathModes(dfltMode, modes, dualParentsWithPrimaryChildren); + if (modes != null) modesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE); } @@ -94,10 +103,20 @@ } /** - * @return Unmodifiable copy of properly ordered modes prefixes + * @return Copy of properly ordered modes prefixes * or {@code null} if no modes set. */ - @Nullable public List> modesOrdered() { - return modes != null ? Collections.unmodifiableList(modes) : null; + @Nullable public ArrayList> modesOrdered() { + return modes != null ? new ArrayList<>(modes) : null; + } + + /** + * Answers if the given path has an immediate child of PRIMARY mode. + * + * @param path The path to query. + * @return If the given path has an immediate child of PRIMARY mode. + */ + public boolean hasPrimaryChild(IgfsPath path) { + return dualParentsWithPrimaryChildren.contains(path); } } \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java (revision ) @@ -362,8 +362,7 @@ * will not be affected. Other properties will be added or overwritten. Passed properties with {@code null} values * will be removed from the stored properties or ignored if they don't exist in the file info. *

    - * When working in {@code DUAL_SYNC} or {@code DUAL_ASYNC} modes only the following properties will be propagated - * to the secondary file system: + * When working in {@code DUAL_SYNC} or {@code DUAL_ASYNC} modes with Hadoop secondary file system only the following properties will be updated: *

      *
    • {@code usrName} - file owner name;
    • *
    • {@code grpName} - file owner group;
    • \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java (revision ) @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; + +/** + * IGFS client closure execute request. + */ +public class IgfsClientRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Base fields (all except of target) count. */ + private static final byte BASE_FIELDS_CNT = 2; + + /** Message ID. */ + private long msgId; + + /** Target callable. */ + @GridToStringInclude + private IgfsClientAbstractCallable target; + + /** + * Default constructor. + */ + public IgfsClientRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param msgId Message ID. + * @param target Target callable. + */ + public IgfsClientRequest(long msgId, IgfsClientAbstractCallable target) { + assert target != null; + + this.msgId = msgId; + this.target = target; + } + + /** + * @return Message ID. + */ + public long messageId() { + return msgId; + } + + /** + * @return Target callable. + */ + public IgfsClientAbstractCallable target() { + return target; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -27; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return (byte)(BASE_FIELDS_CNT + target.fieldsCount()); + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + byte fieldsCount = fieldsCount(); + + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount)) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("msgId", msgId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeShort("typeId", target.typeId())) + return false; + + writer.incrementState(); + + default: + while (writer.state() < fieldsCount) { + if (!target.writeTo(writer, writer.state() - BASE_FIELDS_CNT)) + return false; + + writer.incrementState(); + } + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + msgId = reader.readLong("msgId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + short typeId; + + typeId = reader.readShort("typeId"); + + if (!reader.isLastRead()) + return false; + + target = IgfsClientAbstractCallable.callableForTypeId(typeId); + + reader.incrementState(); + + default: + while (reader.state() < fieldsCount()) { + target.readFrom(reader, reader.state() - BASE_FIELDS_CNT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + } + + return reader.afterMessageRead(IgfsClientRequest.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientRequest.class, this); + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java (revision ) @@ -120,6 +120,8 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerRequest; import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse; import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage; +import org.apache.ignite.internal.processors.igfs.client.IgfsClientRequest; +import org.apache.ignite.internal.processors.igfs.client.IgfsClientResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; @@ -160,6 +162,16 @@ Message msg = null; switch (type) { + case -28: + msg = new IgfsClientResponse(); + + break; + + case -27: + msg = new IgfsClientRequest(); + + break; + case -26: msg = new TxLockList(); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientListFilesCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientListFilesCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientListFilesCallable.java (revision ) @@ -29,6 +29,9 @@ * IGFS client list files callable. */ public class IgfsClientListFilesCallable extends IgfsClientAbstractCallable> { + /** Type ID. */ + public static final short TYPE_ID = 6; + /** */ private static final long serialVersionUID = 0L; @@ -46,11 +49,11 @@ * @param path Path. */ public IgfsClientListFilesCallable(@Nullable String igfsName, IgfsPath path) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); } /** {@inheritDoc} */ - @Override protected Collection call0(IgfsContext ctx) throws Exception { + @Override public Collection call0(IgfsContext ctx) throws Exception { return ctx.igfs().listFiles(path); } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java (revision ) @@ -28,6 +28,7 @@ /** * Context for secondary file system create request. + * Note that it is never used for dual mode append operation. */ public class IgfsSecondaryFileSystemCreateContext { /** File system. */ @@ -68,6 +69,8 @@ */ public IgfsSecondaryFileSystemCreateContext(IgfsSecondaryFileSystem fs, IgfsPath path, boolean overwrite, boolean simpleCreate, @Nullable Map props, short replication, long blockSize, int bufSize) { + assert fs != null; + this.fs = fs; this.path = path; this.overwrite = overwrite; Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAffinityCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAffinityCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAffinityCallable.java (revision ) @@ -24,6 +24,8 @@ import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; import java.util.Collection; @@ -32,6 +34,9 @@ * IGFS client affinity callable. */ public class IgfsClientAffinityCallable extends IgfsClientAbstractCallable> { + /** Type ID. */ + public static final short TYPE_ID = 2; + /** */ private static final long serialVersionUID = 0L; @@ -61,7 +66,7 @@ * @param maxLen Maximum length. */ public IgfsClientAffinityCallable(@Nullable String igfsName, IgfsPath path, long start, long len, long maxLen) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); this.start = start; this.len = len; @@ -69,7 +74,7 @@ } /** {@inheritDoc} */ - @Override protected Collection call0(IgfsContext ctx) throws Exception { + @Override public Collection call0(IgfsContext ctx) throws Exception { return ctx.igfs().affinity(path, start, len, maxLen); } @@ -85,6 +90,47 @@ start = reader.readLong(); len = reader.readLong(); maxLen = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override protected byte fieldsCount0() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected boolean writeTo0(MessageWriter writer, int fieldId) { + switch (fieldId) { + case 0: + return writer.writeLong("start", start); + + case 1: + return writer.writeLong("len", len); + + default: + assert fieldId == 2; + + return writer.writeLong("maxLen", maxLen); + } + } + + /** {@inheritDoc} */ + @Override protected void readFrom0(MessageReader reader, int fieldId) { + switch (fieldId) { + case 0: + start = reader.readLong("start"); + + break; + + case 1: + len = reader.readLong("len"); + + break; + + default: + assert fieldId == 2; + + maxLen = reader.readLong("maxLen"); + } } /** {@inheritDoc} */ Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientDeleteCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientDeleteCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientDeleteCallable.java (revision ) @@ -23,12 +23,17 @@ import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** * IGFS client delete callable. */ public class IgfsClientDeleteCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 3; + /** */ private static final long serialVersionUID = 0L; @@ -50,13 +55,13 @@ * @param recursive Recursive flag. */ public IgfsClientDeleteCallable(@Nullable String igfsName, IgfsPath path, boolean recursive) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); this.recursive = recursive; } /** {@inheritDoc} */ - @Override protected Boolean call0(IgfsContext ctx) throws Exception { + @Override public Boolean call0(IgfsContext ctx) throws Exception { return ctx.igfs().delete(path, recursive); } @@ -68,6 +73,25 @@ /** {@inheritDoc} */ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException { recursive = reader.readBoolean(); + } + + /** {@inheritDoc} */ + @Override protected byte fieldsCount0() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected boolean writeTo0(MessageWriter writer, int fieldId) { + assert fieldId == 0; + + return writer.writeBoolean("recursive", recursive); + } + + /** {@inheritDoc} */ + @Override protected void readFrom0(MessageReader reader, int fieldId) { + assert fieldId == 0; + + recursive = reader.readBoolean("recursive"); } /** {@inheritDoc} */ Index: modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java (revision ) @@ -245,6 +245,13 @@ } /** + * @return Relaxed consistency flag. + */ + protected boolean initializeDefaultPathModes() { + return false; + } + + /** * @return Client flag. */ protected boolean client() { @@ -369,6 +376,8 @@ igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); igfsCfg.setRelaxedConsistency(relaxedConsistency()); + igfsCfg.setInitializeDefaultPathModes(initializeDefaultPathModes()); + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); dataCacheCfg.setName("dataCache"); @@ -1071,17 +1080,36 @@ * @throws Exception If failed. */ public void testRootPropertiesPersistAfterFormat() throws Exception { - igfs.update(new IgfsPath("/"), Collections.singletonMap("foo", "moo")); + if (dual && !(igfsSecondaryFileSystem instanceof IgfsSecondaryFileSystemImpl)) { + // In case of Hadoop dual mode only user name, group name, and permission properties are updated, + // an arbitrary named property is just ignored: + checkRootPropertyUpdate("foo", "moo", null); + checkRootPropertyUpdate(IgfsUtils.PROP_PERMISSION, "0777", "0777"); + } + else { + checkRootPropertyUpdate("foo", "moo", "moo"); + checkRootPropertyUpdate(IgfsUtils.PROP_PERMISSION, "0777", "0777"); + } + } + /** + * + * @throws Exception + */ + private void checkRootPropertyUpdate(String prop, String setVal, String expGetVal) throws Exception { + final IgfsPath rootPath = new IgfsPath("/"); + + igfs.update(rootPath, Collections.singletonMap(prop, setVal)); + igfs.format(); - IgfsFile file = igfs.info(new IgfsPath("/")); + IgfsFile file = igfs.info(rootPath); assert file != null; Map props = file.properties(); - assertEquals("moo", props.get("foo")); + assertEquals(expGetVal, props.get(prop)); } /** \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSummaryCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSummaryCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSummaryCallable.java (revision ) @@ -27,6 +27,9 @@ * IGFS client summary callable. */ public class IgfsClientSummaryCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 12; + /** */ private static final long serialVersionUID = 0L; @@ -44,11 +47,11 @@ * @param path Path. */ public IgfsClientSummaryCallable(@Nullable String igfsName, IgfsPath path) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); } /** {@inheritDoc} */ - @Override protected IgfsPathSummary call0(IgfsContext ctx) throws Exception { + @Override public IgfsPathSummary call0(IgfsContext ctx) throws Exception { return ctx.igfs().summary(path); } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java (revision ) @@ -18,13 +18,10 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.events.EventType; import org.apache.ignite.events.IgfsEvent; @@ -40,7 +37,7 @@ import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; @@ -150,16 +147,15 @@ /** Client flag. */ private final boolean client; - /** Compute facade for client tasks. */ - private IgniteCompute cliCompute; - /** * Constructor. * * @param relaxed Relaxed mode flag. * @param client Client flag. */ - public IgfsMetaManager(boolean relaxed, boolean client) { + public IgfsMetaManager(GridKernalContext ctx, boolean relaxed, boolean client) { + super(ctx); + this.relaxed = relaxed; this.client = client; } @@ -182,21 +178,21 @@ cfg = igfsCtx.configuration(); - evts = igfsCtx.kernalContext().event(); + evts = ctx.event(); sampling = new IgfsSamplingKey(cfg.getName()); - log = igfsCtx.kernalContext().log(IgfsMetaManager.class); + log = ctx.log(IgfsMetaManager.class); } /** {@inheritDoc} */ @SuppressWarnings("RedundantCast") @Override protected void onKernalStart0() throws IgniteCheckedException { - metaCache = igfsCtx.kernalContext().cache().getOrStartCache(cfg.getMetaCacheName()); + metaCache = ctx.cache().getOrStartCache(cfg.getMetaCacheName()); assert metaCache != null; - igfsCtx.kernalContext().cache().internalCache(cfg.getMetaCacheName()).preloader().startFuture() + ctx.cache().internalCache(cfg.getMetaCacheName()).preloader().startFuture() .listen(new CI1>() { @Override public void apply(IgniteInternalFuture f) { metaCacheStartLatch.countDown(); @@ -205,7 +201,7 @@ id2InfoPrj = (IgniteInternalCache)metaCache.cache(); - locNode = igfsCtx.kernalContext().discovery().localNode(); + locNode = ctx.discovery().localNode(); // Start background delete worker. if (!client) { @@ -247,40 +243,10 @@ * @return Result. */ T runClientTask(IgfsClientAbstractCallable task) { - try { - return clientCompute().call(task); + return igfsCtx.closure().execute(igfsCtx, task); - } + } - catch (ClusterTopologyException e) { - throw new IgfsException("Failed to execute operation because there are no IGFS metadata nodes." , e); - } - } /** - * Get compute facade for client tasks. - * - * @return Compute facade. - */ - private IgniteCompute clientCompute() { - assert client; - - IgniteCompute cliCompute0 = cliCompute; - - if (cliCompute0 == null) { - IgniteEx ignite = igfsCtx.kernalContext().grid(); - - ClusterGroup cluster = ignite.cluster().forIgfsMetadataDataNodes(cfg.getName(), cfg.getMetaCacheName()); - - cliCompute0 = ignite.compute(cluster); - - cliCompute = cliCompute0; - } - - assert cliCompute0 != null; - - return cliCompute0; - } - - /** * Gets file ID for specified path. * * @param path Path. @@ -1011,7 +977,7 @@ // Fire events. IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName); - IgfsUtils.sendEvents(igfsCtx.kernalContext(), srcPath, newPath, + IgfsUtils.sendEvents(ctx, srcPath, newPath, srcInfo.isFile() ? EVT_IGFS_FILE_RENAMED : EVT_IGFS_DIR_RENAMED); } } @@ -2916,7 +2882,7 @@ tx.commit(); - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE); + IgfsUtils.sendEvents(ctx, path, EventType.EVT_IGFS_FILE_OPENED_WRITE); return info; } @@ -3051,28 +3017,18 @@ if (secondaryCtx != null) { secondaryOut = secondaryCtx.create(); - IgfsFile secondaryFile = secondaryCtx.info(); - - if (secondaryFile == null) - throw fsException("Failed to open output stream to the file created in " + - "the secondary file system because it no longer exists: " + path); - else if (secondaryFile.isDirectory()) - throw fsException("Failed to open output stream to the file created in " + - "the secondary file system because the path points to a directory: " + path); - - newAccessTime = secondaryFile.accessTime(); - newModificationTime = secondaryFile.modificationTime(); - newProps = secondaryFile.properties(); - newLen = secondaryFile.length(); - newBlockSize = secondaryFile.blockSize(); + newAccessTime = 0L; + newModificationTime = 0L; + newProps = null; } else { newAccessTime = System.currentTimeMillis(); newModificationTime = newAccessTime; newProps = fileProps; + } + - newLen = 0L; - newBlockSize = blockSize; + newLen = 0L; + newBlockSize = blockSize; - } IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new IgfsMetaFileCreateProcessor(newAccessTime, newModificationTime, newProps, @@ -3081,7 +3037,7 @@ // Prepare result and commit. tx.commit(); - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE); + IgfsUtils.sendEvents(ctx, path, EventType.EVT_IGFS_FILE_OPENED_WRITE); return new IgfsCreateResult(newInfo, secondaryOut); } @@ -3254,18 +3210,9 @@ Map props; if (secondaryCtx != null) { - IgfsFile secondaryInfo = secondaryCtx.fileSystem().info(lastCreatedPath); - - if (secondaryInfo == null) - throw new IgfsException("Failed to perform operation because secondary file system path was " + - "modified concurrently: " + lastCreatedPath); - else if (secondaryInfo.isFile()) - throw new IgfsException("Failed to perform operation because secondary file system entity is " + - "not directory: " + lastCreatedPath); - - accessTime = secondaryInfo.accessTime(); - modificationTime = secondaryInfo.modificationTime(); - props = secondaryInfo.properties(); + accessTime = 0L; + modificationTime = 0L; + props = null; } else { accessTime = curTime; @@ -3293,18 +3240,9 @@ Map props; if (secondaryCtx != null) { - IgfsFile secondaryInfo = secondaryCtx.fileSystem().info(pathIds.path()); - - if (secondaryInfo == null) - throw new IgfsException("Failed to perform operation because secondary file system path was " + - "modified concurrnetly: " + pathIds.path()); - else if (secondaryInfo.isFile()) - throw new IgfsException("Failed to perform operation because secondary file system entity is " + - "not directory: " + lastCreatedPath); - - accessTime = secondaryInfo.accessTime(); - modificationTime = secondaryInfo.modificationTime(); - props = secondaryInfo.properties(); + accessTime = 0L; + modificationTime = 0L; + props = null; } else { accessTime = curTime; @@ -3322,28 +3260,18 @@ int newBlockSize; if (secondaryCtx != null) { - IgfsFile secondaryFile = secondaryCtx.info(); - - if (secondaryFile == null) - throw fsException("Failed to open output stream to the file created in " + - "the secondary file system because it no longer exists: " + pathIds.path()); - else if (secondaryFile.isDirectory()) - throw fsException("Failed to open output stream to the file created in " + - "the secondary file system because the path points to a directory: " + pathIds.path()); - - newAccessTime = secondaryFile.accessTime(); - newModificationTime = secondaryFile.modificationTime(); - newProps = secondaryFile.properties(); - newLen = secondaryFile.length(); - newBlockSize = secondaryFile.blockSize(); + newAccessTime = 0L; + newModificationTime = 0L; + newProps = null; } else { newAccessTime = curTime; newModificationTime = curTime; newProps = fileProps; + } + - newLen = 0L; - newBlockSize = blockSize; + newLen = 0L; + newBlockSize = blockSize; - } procMap.put(curId, new IgfsMetaFileCreateProcessor(newAccessTime, newModificationTime, newProps, newBlockSize, affKey, createFileLockId(false), evictExclude, newLen)); @@ -3368,18 +3296,18 @@ private void generateCreateEvents(List createdPaths, boolean file) { if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) { for (int i = 0; i < createdPaths.size() - 1; i++) - IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPaths.get(i), + IgfsUtils.sendEvents(ctx, createdPaths.get(i), EventType.EVT_IGFS_DIR_CREATED); } IgfsPath leafPath = createdPaths.get(createdPaths.size() - 1); if (file) { - IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_CREATED); - IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_OPENED_WRITE); + IgfsUtils.sendEvents(ctx, leafPath, EventType.EVT_IGFS_FILE_CREATED); + IgfsUtils.sendEvents(ctx, leafPath, EventType.EVT_IGFS_FILE_OPENED_WRITE); } else - IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED); + IgfsUtils.sendEvents(ctx, leafPath, EventType.EVT_IGFS_DIR_CREATED); } /** \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInfoCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInfoCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInfoCallable.java (revision ) @@ -27,6 +27,9 @@ * IGFS client info callable. */ public class IgfsClientInfoCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 5; + /** */ private static final long serialVersionUID = 0L; @@ -44,11 +47,11 @@ * @param path Path. */ public IgfsClientInfoCallable(@Nullable String igfsName, IgfsPath path) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); } /** {@inheritDoc} */ - @Override protected IgfsFile call0(IgfsContext ctx) throws Exception { + @Override public IgfsFile call0(IgfsContext ctx) throws Exception { return ctx.igfs().info(path); } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java (revision ) @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; @@ -99,13 +100,22 @@ /** Message topic. */ private Object topic; + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public IgfsFragmentizerManager(GridKernalContext ctx) { + super(ctx); + } + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { if (!igfsCtx.configuration().isFragmentizerEnabled()) return; // We care only about node leave and fail events. - igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { + ctx.event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(Event evt) { assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; @@ -121,7 +131,7 @@ topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName); - igfsCtx.kernalContext().io().addMessageListener(topic, fragmentizerWorker); + ctx.io().addMessageListener(topic, fragmentizerWorker); new IgniteThread(fragmentizerWorker).start(); } @@ -130,7 +140,7 @@ @Override protected void onKernalStart0() throws IgniteCheckedException { if (igfsCtx.configuration().isFragmentizerEnabled()) { // Check at startup if this node is a fragmentizer coordinator. - DiscoveryEvent locJoinEvt = igfsCtx.kernalContext().discovery().localJoinEvent(); + DiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent(); checkLaunchCoordinator(locJoinEvt); } @@ -191,7 +201,7 @@ return; } catch (IgniteCheckedException e) { - if (!igfsCtx.kernalContext().discovery().alive(nodeId)) + if (!ctx.discovery().alive(nodeId)) throw new ClusterTopologyCheckedException("Failed to send message (node left the grid) " + "[nodeId=" + nodeId + ", msg=" + msg + ']'); @@ -226,7 +236,7 @@ minNodeOrder = node.order(); } - ClusterNode locNode = igfsCtx.kernalContext().grid().localNode(); + ClusterNode locNode = ctx.grid().localNode(); if (locNode.order() == minNodeOrder) { if (log.isDebugEnabled()) @@ -263,7 +273,7 @@ */ @SuppressWarnings("fallthrough") private void processFragmentizerRequest(IgfsFragmentizerRequest req) throws IgniteCheckedException { - req.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null); + req.finishUnmarshal(ctx.config().getMarshaller(), null); Collection ranges = req.fragmentRanges(); IgniteUuid fileId = req.fileId(); @@ -356,11 +366,11 @@ * Constructor. */ protected FragmentizerCoordinator() { - super(igfsCtx.kernalContext().gridName(), "fragmentizer-coordinator", - igfsCtx.kernalContext().log(IgfsFragmentizerManager.class)); + super(ctx.gridName(), "fragmentizer-coordinator", + ctx.log(IgfsFragmentizerManager.class)); - igfsCtx.kernalContext().event().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED); - igfsCtx.kernalContext().io().addMessageListener(topic, this); + ctx.event().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED); + ctx.io().addMessageListener(topic, this); } /** {@inheritDoc} */ @@ -481,7 +491,7 @@ else if (msg instanceof IgfsSyncMessage) { IgfsSyncMessage sync = (IgfsSyncMessage)msg; - if (sync.response() && sync.order() == igfsCtx.kernalContext().grid().localNode().order()) { + if (sync.response() && sync.order() == ctx.grid().localNode().order()) { if (log.isDebugEnabled()) log.debug("Received fragmentizer sync response from remote node: " + nodeId); @@ -523,7 +533,7 @@ private void syncStart() throws InterruptedException { Collection startSync0 = startSync = new GridConcurrentHashSet<>( F.viewReadOnly( - igfsCtx.kernalContext().discovery().allNodes(), + ctx.discovery().allNodes(), F.node2id(), new P1() { @Override public boolean apply(ClusterNode n) { @@ -531,7 +541,7 @@ } })); - ClusterNode locNode = igfsCtx.kernalContext().grid().localNode(); + ClusterNode locNode = ctx.grid().localNode(); while (!startSync0.isEmpty()) { for (UUID nodeId : startSync0) { @@ -545,7 +555,7 @@ sendWithRetries(nodeId, syncReq); // Close window between message sending and discovery event. - if (!igfsCtx.kernalContext().discovery().alive(nodeId)) + if (!ctx.discovery().alive(nodeId)) startSync0.remove(nodeId); } catch (IgniteCheckedException e) { @@ -669,8 +679,7 @@ * Constructor. */ protected FragmentizerWorker() { - super(igfsCtx.kernalContext().gridName(), "fragmentizer-worker", - igfsCtx.kernalContext().log(IgfsFragmentizerManager.class)); + super(ctx.gridName(), "fragmentizer-worker", ctx.log(IgfsFragmentizerManager.class)); } /** {@inheritDoc} */ Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java (revision ) @@ -48,7 +48,7 @@ private IgfsMode dfltMode; /** Path modes. */ - private List> pathModes; + private ArrayList> pathModes; /** * Empty constructor required by {@link Externalizable}. @@ -65,7 +65,7 @@ * @param pathModes Path modes. * @throws IgniteCheckedException If failed. */ - public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List> pathModes) + public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable ArrayList> pathModes) throws IgniteCheckedException { this.dfltMode = dfltMode; this.pathModes = pathModes; @@ -91,7 +91,7 @@ /** * @return Path modes. */ - @Nullable public List> pathModes() { + @Nullable public ArrayList> pathModes() { return pathModes; } \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java (revision ) @@ -55,6 +55,12 @@ assert mode == DUAL_SYNC || mode == DUAL_ASYNC; } + /** {@inheritDoc} */ + @Override protected boolean initializeDefaultPathModes() { + // Enable default modes in order to test various modes. + return true; + } + /** * @throws Exception If failed. */ @@ -68,6 +74,13 @@ for (IgfsPath p : paths) assert igfs.exists(p); + assert igfs.modeResolver().resolveMode(gg) == mode; + assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "sync")) == IgfsMode.DUAL_SYNC; + assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "async")) == IgfsMode.DUAL_ASYNC; + assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "primary")) == IgfsMode.PRIMARY; + assert !igfsSecondary.exists("/ignite/primary"); // PRIMARY mode path must exist in upper level fs only. + + // All the child paths of "/ignite/" must be visible in listings: assert igfs.listFiles(gg).size() == 3; assert igfs.listPaths(gg).size() == 3; } \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java (revision ) @@ -33,6 +33,7 @@ import org.apache.ignite.igfs.IgfsOutOfSpaceException; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -143,8 +144,17 @@ new ConcurrentHashMap8<>(); /** + * Constructor. * + * @param ctx Kernal context. */ + public IgfsDataManager(GridKernalContext ctx) { + super(ctx); + } + + /** + * Await data cache initiaization. + */ void awaitInit() { try { dataCacheStartLatch.await(); @@ -162,7 +172,7 @@ topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName); - igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() { + ctx.io().addMessageListener(topic, new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object msg) { if (msg instanceof IgfsBlocksMessage) processBlocksMessage(nodeId, (IgfsBlocksMessage)msg); @@ -171,7 +181,7 @@ } }); - igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { + ctx.event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(Event evt) { assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; @@ -186,16 +196,15 @@ } }, EVT_NODE_LEFT, EVT_NODE_FAILED); - igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService(); + igfsSvc = ctx.getIgfsExecutorService(); - delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(), - "igfs-" + igfsName + "-delete-worker", log); + delWorker = new AsyncDeleteWorker(ctx.gridName(), "igfs-" + igfsName + "-delete-worker", log); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected void onKernalStart0() throws IgniteCheckedException { - dataCachePrj = igfsCtx.kernalContext().cache().getOrStartCache(igfsCtx.configuration().getDataCacheName()); + dataCachePrj = ctx.cache().getOrStartCache(igfsCtx.configuration().getDataCacheName()); assert dataCachePrj != null; @@ -203,7 +212,7 @@ metrics = igfsCtx.igfs().localMetrics(); - AffinityKeyMapper mapper = igfsCtx.kernalContext().cache() + AffinityKeyMapper mapper = ctx.cache() .internalCache(igfsCtx.configuration().getDataCacheName()).configuration().getAffinityMapper(); grpSize = mapper instanceof IgfsGroupDataBlocksKeyMapper ? @@ -213,7 +222,7 @@ assert grpBlockSize != 0; - igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName()).preloader() + ctx.cache().internalCache(igfsCtx.configuration().getDataCacheName()).preloader() .startFuture().listen(new CI1>() { @Override public void apply(IgniteInternalFuture f) { dataCacheStartLatch.countDown(); @@ -266,7 +275,7 @@ if (!dataCache.context().affinityNode()) return null; - UUID nodeId = igfsCtx.kernalContext().localNodeId(); + UUID nodeId = ctx.localNodeId(); if (prevAffKey != null && dataCache.affinity().mapKeyToNode(prevAffKey).isLocal()) return prevAffKey; @@ -296,7 +305,7 @@ */ private IgniteDataStreamer dataStreamer() { IgniteDataStreamer ldr = - igfsCtx.kernalContext().dataStream().dataStreamer(dataCachePrj.name()); + ctx.dataStream().dataStreamer(dataCachePrj.name()); FileSystemConfiguration cfg = igfsCtx.configuration(); @@ -331,7 +340,7 @@ final IgfsBlockKey key = blockKey(blockIdx, fileInfo); if (log.isDebugEnabled() && - dataCache.affinity().isPrimaryOrBackup(igfsCtx.kernalContext().discovery().localNode(), key)) { + dataCache.affinity().isPrimaryOrBackup(ctx.discovery().localNode(), key)) { log.debug("Reading non-local data block [path=" + path + ", fileInfo=" + fileInfo + ", blockIdx=" + blockIdx + ']'); } @@ -1022,7 +1031,7 @@ ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'); completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + - igfsCtx.kernalContext().localNodeId(), e)); + ctx.localNodeId(), e)); return; } @@ -1149,7 +1158,7 @@ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) { try { - ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null); + ackMsg.finishUnmarshal(ctx.config().getMarshaller(), null); } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal message (will ignore): " + ackMsg, e); \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSetTimesCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSetTimesCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSetTimesCallable.java (revision ) @@ -23,12 +23,17 @@ import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** * IGFS client set times callable. */ public class IgfsClientSetTimesCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 10; + /** */ private static final long serialVersionUID = 0L; @@ -55,14 +60,14 @@ */ public IgfsClientSetTimesCallable(@Nullable String igfsName, IgfsPath path, long accessTime, long modificationTime) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); this.accessTime = accessTime; this.modificationTime = modificationTime; } /** {@inheritDoc} */ - @Override protected Void call0(IgfsContext ctx) throws Exception { + @Override public Void call0(IgfsContext ctx) throws Exception { ctx.igfs().setTimes(path, accessTime, modificationTime); return null; @@ -78,6 +83,39 @@ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException { accessTime = reader.readLong(); modificationTime = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override protected byte fieldsCount0() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected boolean writeTo0(MessageWriter writer, int fieldId) { + switch (fieldId) { + case 0: + return writer.writeLong("accessTime", accessTime); + + default: + assert fieldId == 1; + + return writer.writeLong("modificationTime", modificationTime); + } + } + + /** {@inheritDoc} */ + @Override protected void readFrom0(MessageReader reader, int fieldId) { + switch (fieldId) { + case 0: + accessTime = reader.readLong("accessTime"); + + break; + + default: + assert fieldId == 1; + + modificationTime = reader.readLong("modificationTime"); + } } /** {@inheritDoc} */ Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientExistsCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientExistsCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientExistsCallable.java (revision ) @@ -26,6 +26,9 @@ * IGFS client exists callable. */ public class IgfsClientExistsCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 4; + /** */ private static final long serialVersionUID = 0L; @@ -43,11 +46,11 @@ * @param path Path. */ public IgfsClientExistsCallable(@Nullable String igfsName, IgfsPath path) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); } /** {@inheritDoc} */ - @Override protected Boolean call0(IgfsContext ctx) throws Exception { + @Override public Boolean call0(IgfsContext ctx) throws Exception { return ctx.igfs().exists(path); } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java (revision ) @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.jetbrains.annotations.Nullable; + +/** + * IGFS client closure response type. + */ +public enum IgfsClientResponseType { + /** Object. */ + OBJ, + + /** Null result. */ + NULL, + + /** Boolean result. */ + BOOL, + + /** Error. */ + ERR, + + /** Marshalling error. */ + MARSH_ERR; + + /** Enum values. */ + private static final IgfsClientResponseType[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static IgfsClientResponseType fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java (revision ) @@ -23,7 +23,7 @@ import java.io.OutputStream; /** - * IGFS file create result. + * IGFS file create or append result. */ public class IgfsCreateResult { /** File info in the primary file system. */ \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java (revision ) @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.igfs.client; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.binary.BinaryRawReader; import org.apache.ignite.binary.BinaryRawWriter; @@ -28,7 +29,11 @@ import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaIdsForPathCallable; +import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaInfoForPathCallable; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.resources.IgniteInstanceResource; import org.jetbrains.annotations.Nullable; @@ -39,8 +44,14 @@ /** */ private static final long serialVersionUID = 0L; + /** Base fields count. */ + private static final byte BASE_FIELDS_CNT = 2; + + /** Type ID. */ + private short typeId; + /** IGFS name. */ - protected String igfsName; + private String igfsName; /** Path for operation. */ protected IgfsPath path; @@ -50,6 +61,61 @@ private transient Ignite ignite; /** + * Create callable for the given type ID. + * + * @param typeId Type ID. + * @return Callable. + */ + public static IgfsClientAbstractCallable callableForTypeId(short typeId) { + switch (typeId) { + case IgfsClientMetaIdsForPathCallable.TYPE_ID: + return new IgfsClientMetaIdsForPathCallable(); + + case IgfsClientMetaInfoForPathCallable.TYPE_ID: + return new IgfsClientMetaInfoForPathCallable(); + + case IgfsClientAffinityCallable.TYPE_ID: + return new IgfsClientAffinityCallable(); + + case IgfsClientDeleteCallable.TYPE_ID: + return new IgfsClientDeleteCallable(); + + case IgfsClientExistsCallable.TYPE_ID: + return new IgfsClientExistsCallable(); + + case IgfsClientInfoCallable.TYPE_ID: + return new IgfsClientInfoCallable(); + + case IgfsClientListFilesCallable.TYPE_ID: + return new IgfsClientListFilesCallable(); + + case IgfsClientListPathsCallable.TYPE_ID: + return new IgfsClientListPathsCallable(); + + case IgfsClientMkdirsCallable.TYPE_ID: + return new IgfsClientMkdirsCallable(); + + case IgfsClientRenameCallable.TYPE_ID: + return new IgfsClientRenameCallable(); + + case IgfsClientSetTimesCallable.TYPE_ID: + return new IgfsClientSetTimesCallable(); + + case IgfsClientSizeCallable.TYPE_ID: + return new IgfsClientSizeCallable(); + + case IgfsClientSummaryCallable.TYPE_ID: + return new IgfsClientSummaryCallable(); + + case IgfsClientUpdateCallable.TYPE_ID: + return new IgfsClientUpdateCallable(); + + default: + throw new IgniteException("Unsupported IGFS callable type ID: " + typeId); + } + } + + /** * Default constructor. */ protected IgfsClientAbstractCallable() { @@ -59,15 +125,18 @@ /** * Constructor. * + * @param typeId Type ID. * @param igfsName IGFS name. * @param path Path. */ - protected IgfsClientAbstractCallable(@Nullable String igfsName, @Nullable IgfsPath path) { + protected IgfsClientAbstractCallable(short typeId, @Nullable String igfsName, @Nullable IgfsPath path) { + this.typeId = typeId; this.igfsName = igfsName; this.path = path; } /** {@inheritDoc} */ + // TODO. @Override public final T call() throws Exception { assert ignite != null; @@ -83,8 +152,15 @@ * @return Result. * @throws Exception If failed. */ - protected abstract T call0(IgfsContext ctx) throws Exception; + public abstract T call0(IgfsContext ctx) throws Exception; + /** + * @return IGFS name. + */ + public String igfsName() { + return igfsName; + } + /** {@inheritDoc} */ @Override public final void writeBinary(BinaryWriter writer) throws BinaryObjectException { BinaryRawWriter rawWriter = writer.rawWriter(); @@ -121,5 +197,95 @@ */ protected void readBinary0(BinaryRawReader rawReader) { // No-op. + } + + /** + * @return Type ID. + */ + public short typeId() { + return typeId; + } + + /** + * @return Fields count. + */ + public final byte fieldsCount() { + return (byte)(BASE_FIELDS_CNT + fieldsCount0()); + } + + /** + * @return Additional fields count of concrete class. + */ + protected byte fieldsCount0() { + return 0; + } + + /** + * Write callable to writer. + * + * @param writer Writer. + * @param fieldId Field ID. + * @return Result. + */ + public final boolean writeTo(MessageWriter writer, int fieldId) { + switch (fieldId) { + case 0: + return writer.writeString("igfsName", igfsName); + + case 1: + return writer.writeString("path", path.toString()); + + default: + return writeTo0(writer, fieldId - BASE_FIELDS_CNT); + } + } + + /** + * Write callable to writer (for child classes). + * + * @param writer Writer. + * @param fieldId Field ID. + * @return Result. + */ + protected boolean writeTo0(MessageWriter writer, int fieldId) { + assert false : "Should not be called."; + + return true; + } + + /** + * Read callable content from reader. + * + * @param reader Reader. + * @param fieldId Field ID. + */ + public final void readFrom(MessageReader reader, int fieldId) { + switch (fieldId) { + case 0: + igfsName = reader.readString("igfsName"); + + break; + + case 1: + String pathStr = reader.readString("path"); + + if (reader.isLastRead()) + path = new IgfsPath(pathStr); + + break; + + default: + readFrom0(reader, fieldId - BASE_FIELDS_CNT); + } + } + + /** + * Read callable content from reader (for child classes). + * + * @param reader Reader. + * @param fieldId Field ID. + */ + protected void readFrom0(MessageReader reader, int fieldId) { + assert false : "Should not be called."; } } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientNodeSelectionStrategy.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientNodeSelectionStrategy.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientNodeSelectionStrategy.java (revision ) @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.jetbrains.annotations.Nullable; + +/** + * IGFS node selection strategy. + */ +public enum IgfsClientNodeSelectionStrategy { + /** Pick random node. */ + RANDOM, + + /** Pick a node where root ID resides. */ + ROOT_ID; + + /** Enum values. */ + private static final IgfsClientNodeSelectionStrategy[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static IgfsClientNodeSelectionStrategy fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java (revision ) @@ -94,7 +94,10 @@ TOPIC_QUERY, /** */ - TOPIC_TX; + TOPIC_TX, + + /** Topic to handle IGFS closures. */ + TOPIC_IGFS_CLI; /** Enum values. */ private static final GridTopic[] VALS = values(); \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java (revision ) @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.igfs; +import java.util.Set; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -248,7 +249,7 @@ } } - modeRslvr = new IgfsModeResolver(dfltMode, IgfsUtils.preparePathModes(dfltMode, modes)); + modeRslvr = new IgfsModeResolver(dfltMode, modes); Object secondaryFsPayload = null; @@ -816,12 +817,14 @@ } } + if (!IgfsUtils.isDualMode(mode) || modeRslvr.hasPrimaryChild(path)) { - IgniteUuid fileId = meta.fileId(path); + IgniteUuid fileId = meta.fileId(path); - if (fileId != null) - files.addAll(meta.directoryListing(fileId).keySet()); - else if (mode == PRIMARY) - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + if (fileId != null) + files.addAll(meta.directoryListing(fileId).keySet()); + else if (mode == PRIMARY) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } return F.viewReadOnly(files, new C1() { @Override public IgfsPath apply(String e) { @@ -846,7 +849,7 @@ IgfsMode mode = resolveMode(path); - Collection files = new HashSet<>(); + Set files = new HashSet<>(); if (IgfsUtils.isDualMode(mode)) { assert secondaryFs != null; @@ -859,6 +862,9 @@ files.add(impl); } + + if (!modeRslvr.hasPrimaryChild(path)) + return files; } catch (Exception e) { U.error(log, "List files in DUAL mode failed [path=" + path + ']', e); @@ -1345,29 +1351,6 @@ return fut; } - /** - * Get file descriptor for specified path. - * - * @param path Path to file. - * @return Detailed file descriptor or {@code null}, if file does not exist. - * @throws IgniteCheckedException If failed. - */ - @Nullable private FileDescriptor getFileDescriptor(IgfsPath path) throws IgniteCheckedException { - assert path != null; - - List ids = meta.idsForPath(path); - - IgfsEntryInfo fileInfo = meta.info(ids.get(ids.size() - 1)); - - if (fileInfo == null) - return null; // File does not exist. - - // Resolve parent ID for removed file. - IgniteUuid parentId = ids.size() >= 2 ? ids.get(ids.size() - 2) : null; - - return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile()); - } - /** {@inheritDoc} */ @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) { @@ -1519,20 +1502,16 @@ case DUAL_SYNC: case DUAL_ASYNC: - info = meta.infoForPath(path); - - if (info == null) { - try { - IgfsFile status = secondaryFs.info(path); + try { + IgfsFile status = secondaryFs.info(path); - if (status != null) - return new IgfsFileImpl(status, data.groupBlockSize()); - } - catch (Exception e) { - U.error(log, "File info operation in DUAL mode failed [path=" + path + ']', e); + if (status != null) + return new IgfsFileImpl(status, data.groupBlockSize()); + } + catch (Exception e) { + U.error(log, "File info operation in DUAL mode failed [path=" + path + ']', e); - throw e; + throw e; - } } break; \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolverSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolverSelfTest.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolverSelfTest.java (revision ) @@ -17,8 +17,11 @@ package org.apache.ignite.internal.processors.igfs; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import junit.framework.TestCase; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.igfs.IgfsMode; @@ -38,8 +41,9 @@ /** {@inheritDoc} */ @Override protected void setUp() throws Exception { - reslvr = new IgfsModeResolver(DUAL_SYNC, Arrays.asList(new T2<>(new IgfsPath("/a/b/c/d"), PROXY), new T2<> - (new IgfsPath("/a/P/"), PRIMARY), new T2<>(new IgfsPath("/a/b/"), DUAL_ASYNC))); + reslvr = new IgfsModeResolver(DUAL_SYNC, new ArrayList<>(Arrays.asList(new T2<>( + new IgfsPath("/a/b/c/d"), PROXY), new T2<>(new IgfsPath("/a/P/"), PRIMARY), new T2<>(new IgfsPath("/a/b/"), + DUAL_ASYNC)))); } /** @@ -90,7 +94,7 @@ try { IgfsUtils.preparePathModes(DUAL_SYNC, Arrays.asList( new T2<>(new IgfsPath("/a/"), PRIMARY), - new T2<>(new IgfsPath("/a/b/"), DUAL_ASYNC))); + new T2<>(new IgfsPath("/a/b/"), DUAL_ASYNC)), new HashSet()); fail("IgniteCheckedException expected"); } @@ -102,7 +106,8 @@ for (IgfsMode m: IgfsMode.values()) { if (m != IgfsMode.PRIMARY) { try { - IgfsUtils.preparePathModes(PRIMARY, Arrays.asList(new T2<>(new IgfsPath("/a/"), DUAL_ASYNC))); + IgfsUtils.preparePathModes(PRIMARY, Arrays.asList(new T2<>(new IgfsPath("/a/"), DUAL_ASYNC)), + new HashSet()); fail("IgniteCheckedException expected"); } @@ -117,7 +122,7 @@ new T2<>(new IgfsPath("/a"), PRIMARY), new T2<>(new IgfsPath("/c/d/"), PRIMARY), new T2<>(new IgfsPath("/c/d/e/f"), PRIMARY) - )); + ), new HashSet()); assertNotNull(modes); assertEquals(2, modes.size()); assertEquals(modes, Arrays.asList( @@ -130,7 +135,7 @@ new T2<>(new IgfsPath("/a/b"), DUAL_ASYNC), new T2<>(new IgfsPath("/a/b/c"), DUAL_SYNC), new T2<>(new IgfsPath("/a/b/c/d"), DUAL_ASYNC) - )); + ), new HashSet()); assertNotNull(modes); assertEquals(modes.size(), 3); assertEquals(modes, Arrays.asList( @@ -138,5 +143,39 @@ new T2<>(new IgfsPath("/a/b/c"), DUAL_SYNC), new T2<>(new IgfsPath("/a/b"), DUAL_ASYNC) )); + } + + /** + * @throws Exception If failed. + */ + public void testDualParentsWithPrimaryChild() throws Exception { + Set set = new HashSet<>(); + + IgfsUtils.preparePathModes(DUAL_SYNC, Arrays.asList( + new T2<>(new IgfsPath("/a/b"), DUAL_ASYNC), + new T2<>(new IgfsPath("/a/b/c"), PRIMARY), + new T2<>(new IgfsPath("/a/b/x/y"), PRIMARY), + new T2<>(new IgfsPath("/a/b/x/z"), PRIMARY), + new T2<>(new IgfsPath("/m"), PRIMARY) + ), set); + assertEquals(set, new HashSet() {{ + add(new IgfsPath("/a/b")); + add(new IgfsPath("/a/b/x")); + add(new IgfsPath("/")); + }}); + + set = new HashSet<>(); + + IgfsUtils.preparePathModes(DUAL_ASYNC, Arrays.asList( + new T2<>(new IgfsPath("/a/b/x/y/z"), PRIMARY), + new T2<>(new IgfsPath("/a/b/c"), PRIMARY), + new T2<>(new IgfsPath("/a/k"), PRIMARY), + new T2<>(new IgfsPath("/a/z"), PRIMARY) + ), set); + assertEquals(set, new HashSet() {{ + add(new IgfsPath("/a/b")); + add(new IgfsPath("/a")); + add(new IgfsPath("/a/b/x/y")); + }}); } } \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java (revision ) @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.igfs.IgfsContext; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.UUID; + +/** + * IGFS client closure outgoing operation descriptor. + */ +public class IgfsClientOutOperation { + /** IGFS context. */ + private final IgfsContext igfsCtx; + + /** Target operation. */ + private final IgfsClientAbstractCallable target; + + /** Node selection strategy. */ + private final IgfsClientNodeSelectionStrategy strategy; + + /** Future completed when operation is ready. */ + private final GridFutureAdapter fut; + + /** Target node ID. */ + private UUID nodeId; + + /** + * Constructor. + * + * @param igfsCtx IGFS context. + * @param target Target operation. + * @param strategy Node selection strategy. + * @param fut Future completed when operation is ready. + */ + public IgfsClientOutOperation(IgfsContext igfsCtx, IgfsClientAbstractCallable target, + IgfsClientNodeSelectionStrategy strategy, GridFutureAdapter fut) { + this.igfsCtx = igfsCtx; + this.target = target; + this.strategy = strategy; + this.fut = fut; + } + + /** + * @return Target node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @param nodeId Node ID. + */ + public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + + /** + * @return IGFS context. + */ + public IgfsContext igfsContext() { + return igfsCtx; + } + + /** + * @return Target operation. + */ + public IgfsClientAbstractCallable target() { + return target; + } + + /** + * @return Node selection strategy. + */ + public IgfsClientNodeSelectionStrategy strategy() { + return strategy; + } + + /** + * @return Future completed when operation is ready. + */ + public GridFutureAdapter future() { + return fut; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientOutOperation.class, this); + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/meta/IgfsClientMetaIdsForPathCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/meta/IgfsClientMetaIdsForPathCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/meta/IgfsClientMetaIdsForPathCallable.java (revision ) @@ -31,6 +31,9 @@ * Get entry info for the given path. */ public class IgfsClientMetaIdsForPathCallable extends IgfsClientAbstractCallable> { + /** Type ID. */ + public static final short TYPE_ID = 0; + /** */ private static final long serialVersionUID = 0L; @@ -48,11 +51,11 @@ * @param path Path. */ public IgfsClientMetaIdsForPathCallable(@Nullable String igfsName, IgfsPath path) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); } /** {@inheritDoc} */ - @Override protected List call0(IgfsContext ctx) throws Exception { + @Override public List call0(IgfsContext ctx) throws Exception { IgfsMetaManager meta = ctx.meta(); return meta.idsForPath(path); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java (revision ) @@ -25,6 +25,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.util.ipc.IpcEndpointBindException; import org.apache.ignite.internal.util.ipc.IpcServerEndpoint; import org.apache.ignite.internal.util.typedef.C1; @@ -52,6 +53,15 @@ /** Kernal start latch. */ private CountDownLatch kernalStartLatch = new CountDownLatch(1); + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public IgfsServerManager(GridKernalContext ctx) { + super(ctx); + } + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { FileSystemConfiguration igfsCfg = igfsCtx.configuration(); @@ -163,8 +173,7 @@ * Constructor. */ private BindWorker() { - super(igfsCtx.kernalContext().gridName(), "bind-worker", - igfsCtx.kernalContext().log(IgfsServerManager.class)); + super(ctx.gridName(), "bind-worker", ctx.log(IgfsServerManager.class)); } /** Index: modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java (revision ) @@ -48,7 +48,6 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java (revision ) @@ -20,12 +20,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.jetbrains.annotations.Nullable; /** * Abstract class for IGFS managers. */ public abstract class IgfsManager { - /** IGFS context. */ + /** Kernal context. */ + protected final GridKernalContext ctx; + + /** IGFS context (not set for shared managers). */ protected IgfsContext igfsCtx; /** Logger. */ @@ -35,19 +40,26 @@ private AtomicBoolean starting = new AtomicBoolean(); /** + * Constructor. + * + * @param ctx Kernal context. + */ + protected IgfsManager(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** * Called when IGFS processor is started. * * @param igfsCtx IGFS context. */ - public void start(IgfsContext igfsCtx) throws IgniteCheckedException { + public void start(@Nullable IgfsContext igfsCtx) throws IgniteCheckedException { if (!starting.compareAndSet(false, true)) assert false : "Method start is called more than once for manager: " + this; - assert igfsCtx != null; - this.igfsCtx = igfsCtx; - log = igfsCtx.kernalContext().log(getClass()); + log = ctx.log(getClass()); start0(); @@ -129,27 +141,27 @@ * @return Start info. */ protected String startInfo() { - return "Cache manager started: " + getClass().getSimpleName(); + return "IGFS manager started: " + getClass().getSimpleName(); } /** * @return Stop info. */ protected String stopInfo() { - return "Cache manager stopped: " + getClass().getSimpleName(); + return "IGFS manager stopped: " + getClass().getSimpleName(); } /** * @return Start info. */ protected String kernalStartInfo() { - return "Cache manager received onKernalStart() callback: " + getClass().getSimpleName(); + return "IGFS manager received onKernalStart() callback: " + getClass().getSimpleName(); } /** * @return Stop info. */ protected String kernalStopInfo() { - return "Cache manager received onKernalStop() callback: " + getClass().getSimpleName(); + return "IGFS manager received onKernalStop() callback: " + getClass().getSimpleName(); } } \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientUpdateCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientUpdateCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientUpdateCallable.java (revision ) @@ -25,6 +25,9 @@ import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; import java.util.Map; @@ -33,6 +36,9 @@ * IGFS client update callable. */ public class IgfsClientUpdateCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 13; + /** */ private static final long serialVersionUID = 0L; @@ -54,13 +60,13 @@ * @param props Properties. */ public IgfsClientUpdateCallable(@Nullable String igfsName, IgfsPath path, @Nullable Map props) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); this.props = props; } /** {@inheritDoc} */ - @Override protected IgfsFile call0(IgfsContext ctx) throws Exception { + @Override public IgfsFile call0(IgfsContext ctx) throws Exception { return ctx.igfs().update(path, props); } @@ -72,6 +78,25 @@ /** {@inheritDoc} */ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException { props = IgfsUtils.readProperties(reader); + } + + /** {@inheritDoc} */ + @Override protected byte fieldsCount0() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected boolean writeTo0(MessageWriter writer, int fieldId) { + assert fieldId == 0; + + return writer.writeMap("props", props, MessageCollectionItemType.STRING, MessageCollectionItemType.STRING); + } + + /** {@inheritDoc} */ + @Override protected void readFrom0(MessageReader reader, int fieldId) { + assert fieldId == 0; + + props = reader.readMap("props", MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false); } /** {@inheritDoc} */ Index: modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java (revision ) @@ -41,6 +41,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; import org.apache.ignite.igfs.IgfsMode; @@ -123,8 +124,16 @@ * @param p The path. * @return The owner. */ - private static String getOwner(IgfsEx i, IgfsPath p) { - return i.info(p).property(IgfsUtils.PROP_USER_NAME); + private static String getOwner(final IgfsEx i, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure() { + @Override public String apply() { + IgfsFile f = i.info(p); + + assert f != null; + + return f.property(IgfsUtils.PROP_USER_NAME); + } + }); } /** \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/meta/IgfsClientMetaInfoForPathCallable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/meta/IgfsClientMetaInfoForPathCallable.java (revision de8684390cb2c5664f4bb8401f4169e930a07e10) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/meta/IgfsClientMetaInfoForPathCallable.java (revision ) @@ -29,6 +29,9 @@ * Get entry info for the given path. */ public class IgfsClientMetaInfoForPathCallable extends IgfsClientAbstractCallable { + /** Type ID. */ + public static final short TYPE_ID = 1; + /** */ private static final long serialVersionUID = 0L; @@ -46,11 +49,11 @@ * @param path Path. */ public IgfsClientMetaInfoForPathCallable(@Nullable String igfsName, IgfsPath path) { - super(igfsName, path); + super(TYPE_ID, igfsName, path); } /** {@inheritDoc} */ - @Override protected IgfsEntryInfo call0(IgfsContext ctx) throws Exception { + @Override public IgfsEntryInfo call0(IgfsContext ctx) throws Exception { IgfsMetaManager meta = ctx.meta(); return meta.infoForPath(path);