Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java (revision ) @@ -519,7 +519,7 @@ * @return Locked file info or {@code null} if file cannot be locked or doesn't exist. * @throws IgniteCheckedException If the file with such id does not exist, or on another failure. */ - public @Nullable IgfsEntryInfo lock(IgniteUuid fileId, boolean del) throws IgniteCheckedException { + @Nullable public IgfsEntryInfo lock(IgniteUuid fileId, boolean del) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { validTxState(false); \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java (revision ) @@ -22,14 +22,12 @@ import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPathNotFoundException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,7 +56,7 @@ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private IgfsEntryInfo fileInfo; - /** Space in file to write data. */ + /** Space in file to write data. How many bytes are waiting to be written since last flush. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private long space; @@ -68,8 +66,8 @@ /** Data length in remainder. */ private int remainderDataLen; - /** Write completion future. */ - private final IgniteInternalFuture writeCompletionFut; + /** "Aggregated" write completion future. */ + private GridCompoundFuture aggregateFut; /** IGFS mode. */ private final IgfsMode mode; @@ -123,8 +121,6 @@ this.metrics = metrics; streamRange = initialStreamRange(fileInfo); - - writeCompletionFut = data.writeStart(fileInfo); } /** @@ -164,46 +160,12 @@ } /** {@inheritDoc} */ - @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException { - int writeLen = block.remaining(); + @Override protected void storeDataBlocks(Object src, final int len) throws IOException, IgniteCheckedException { + preStoreDataBlocks(len); - preStoreDataBlocks(null, writeLen); + final int blockSize = fileInfo.blockSize(); - int blockSize = fileInfo.blockSize(); - // If data length is not enough to fill full block, fill the remainder and return. - if (remainderDataLen + writeLen < blockSize) { - if (remainder == null) - remainder = new byte[blockSize]; - else if (remainder.length != blockSize) { - assert remainderDataLen == remainder.length; - - byte[] allocated = new byte[blockSize]; - - U.arrayCopy(remainder, 0, allocated, 0, remainder.length); - - remainder = allocated; - } - - block.get(remainder, remainderDataLen, writeLen); - - remainderDataLen += writeLen; - } - else { - remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block, - false, streamRange, batch); - - remainderDataLen = remainder == null ? 0 : remainder.length; - } - } - - /** {@inheritDoc} */ - @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException { - preStoreDataBlocks(in, len); - - int blockSize = fileInfo.blockSize(); - - // If data length is not enough to fill full block, fill the remainder and return. if (remainderDataLen + len < blockSize) { if (remainder == null) remainder = new byte[blockSize]; @@ -217,13 +179,13 @@ remainder = allocated; } - in.readFully(remainder, remainderDataLen, len); + IgfsUtils.readData(src, remainder, remainderDataLen, len); remainderDataLen += len; } else { - remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len, - false, streamRange, batch); + remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, + src, len, false, streamRange, batch, aggregateFut); remainderDataLen = remainder == null ? 0 : remainder.length; } @@ -234,17 +196,12 @@ * * @param len Data length to be written. */ - private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException { - // Check if any exception happened while writing data. - if (writeCompletionFut.isDone()) { - assert ((GridFutureAdapter)writeCompletionFut).isFailed(); + private void preStoreDataBlocks(int len) throws IgniteCheckedException, IOException { + assert Thread.holdsLock(this); - if (in != null) - in.skipBytes(len); + if (aggregateFut == null) + aggregateFut = new GridCompoundFuture<>(); - writeCompletionFut.get(); - } - bytes += len; space += len; } @@ -270,19 +227,26 @@ throw new IOException("File was concurrently deleted: " + path); } + // This will store all the full blocks and update the remainder from the internal 'buf' ByteBuffer: super.flush(); try { if (remainder != null) { - data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0, - ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch); + ByteBuffer wrappedRemainderByteBuf = ByteBuffer.wrap(remainder, 0, remainderDataLen); + byte[] rem = data.storeDataBlocks(fileInfo, + fileInfo.length() + space, null/*remainder*/, 0/*remainderLen*/, + wrappedRemainderByteBuf, wrappedRemainderByteBuf.remaining(), + true/*flush*/, streamRange, batch, aggregateFut); + + assert rem == null; // The remainder must be absent. + remainder = null; remainderDataLen = 0; } if (space > 0) { - data.awaitAllAcksReceived(fileInfo.id()); + awaitAllPendingBlcoks(); IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange); @@ -295,12 +259,31 @@ space = 0; } + else + aggregateFut = null; } catch (IgniteCheckedException e) { throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e); } } + /** + * Awaits for the main block sequence future and (optional) the remainder future. + * + * @throws IgniteCheckedException On error. + */ + private void awaitAllPendingBlcoks() throws IgniteCheckedException { + assert Thread.holdsLock(this); + + if (aggregateFut != null) { + aggregateFut.markInitialized(); + + aggregateFut.get(); + + aggregateFut = null; + } + } + /** {@inheritDoc} */ @Override protected void onClose() throws IOException { onClose(false); @@ -337,9 +320,8 @@ IOException err = null; try { - data.writeClose(fileInfo); - - writeCompletionFut.get(); + // Wait for all blocks to be written: + awaitAllPendingBlcoks(); } catch (IgniteCheckedException e) { err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e); @@ -377,6 +359,8 @@ throw err; } else { + aggregateFut = null; + try { if (mode == DUAL_SYNC) batch.await(); \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java (revision ) @@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -178,26 +179,26 @@ rnd.nextBytes(data); - IgniteInternalFuture fut = mgr.writeStart(info); + final GridCompoundFuture f = new GridCompoundFuture<>(); - expectsStoreFail(info, data, "Not enough space reserved to store data"); + expectsStoreFail(info, data, "Not enough space reserved to store data", f); info = info.length(info.length() + data.length - 3); - expectsStoreFail(info, data, "Not enough space reserved to store data"); + expectsStoreFail(info, data, "Not enough space reserved to store data", f); info = info.length(info.length() + 3); IgfsFileAffinityRange range = new IgfsFileAffinityRange(); - byte[] remainder = mgr.storeDataBlocks(info, info.length(), null, 0, ByteBuffer.wrap(data), true, - range, null); + byte[] remainder = mgr.storeDataBlocks(info, info.length(), null, 0, ByteBuffer.wrap(data), data.length, + true, range, null, f); assert remainder == null; - mgr.writeClose(info); + f.markInitialized(); - fut.get(3000); + f.get(3000); for (int j = 0; j < NODES_CNT; j++) { GridCacheContext ctx = GridTestUtils.getFieldValue(grid(j).cachex(DATA_CACHE_NAME), @@ -269,27 +270,27 @@ info = info.length(info.length() + data.length + remainder.length); - IgniteInternalFuture fut = mgr.writeStart(info); - IgfsFileAffinityRange range = new IgfsFileAffinityRange(); - byte[] left = mgr.storeDataBlocks(info, info.length(), remainder, remainder.length, ByteBuffer.wrap(data), - false, range, null); + final GridCompoundFuture f = new GridCompoundFuture<>(); + byte[] left = mgr.storeDataBlocks(info, info.length(), remainder, remainder.length, + ByteBuffer.wrap(data), data.length, false, range, null, f); + assert left.length == blockSize / 2; byte[] remainder2 = new byte[blockSize / 2]; info = info.length(info.length() + remainder2.length); - byte[] left2 = mgr.storeDataBlocks(info, info.length(), left, left.length, ByteBuffer.wrap(remainder2), - false, range, null); + byte[] left2 = mgr.storeDataBlocks(info, info.length(), left, left.length, + ByteBuffer.wrap(remainder2), remainder2.length, false, range, null, f); assert left2 == null; - mgr.writeClose(info); + f.markInitialized(); - fut.get(3000); + f.get(3000); for (int j = 0; j < NODES_CNT; j++) { GridCacheContext ctx = GridTestUtils.getFieldValue(grid(j).cachex(DATA_CACHE_NAME), @@ -358,22 +359,23 @@ info = info.length(info.length() + data.length * writesCnt); - IgniteInternalFuture fut = mgr.writeStart(info); + final GridCompoundFuture fut = new GridCompoundFuture<>(); for (int j = 0; j < 64; j++) { Arrays.fill(data, (byte)(j / 4)); byte[] left = mgr.storeDataBlocks(info, (j + 1) * chunkSize, null, 0, ByteBuffer.wrap(data), - true, range, null); + data.length, true, range, null, fut); assert left == null : "No remainder should be returned if flush is true: " + Arrays.toString(left); } - mgr.writeClose(info); + fut.markInitialized(); + fut.get(3000); + assertTrue(range.regionEqual(new IgfsFileAffinityRange(0, writesCnt * chunkSize - 1, null))); - fut.get(3000); for (int j = 0; j < NODES_CNT; j++) { GridCacheContext ctx = GridTestUtils.getFieldValue(grid(j).cachex(DATA_CACHE_NAME), @@ -415,7 +417,6 @@ long t = System.currentTimeMillis(); - //IgfsFileInfo info = new IgfsFileInfo(blockSize, 0); IgfsEntryInfo info = IgfsUtils.createFile(IgniteUuid.randomUuid(), blockSize, 1024 * 1024, null, null, false, null, t, t); @@ -577,47 +578,13 @@ * @param data Data to store. * @param msg Expected failure message. */ - private void expectsStoreFail(final IgfsEntryInfo reserved, final byte[] data, @Nullable String msg) { + private void expectsStoreFail(final IgfsEntryInfo reserved, final byte[] data, @Nullable String msg, final GridCompoundFuture f) { GridTestUtils.assertThrows(log, new Callable() { @Override public Object call() throws Exception { IgfsFileAffinityRange range = new IgfsFileAffinityRange(); - mgr.storeDataBlocks(reserved, reserved.length(), null, 0, ByteBuffer.wrap(data), false, range, null); - - return null; - } - }, IgfsException.class, msg); - } - - /** - * Test expected failures for 'delete' operation. - * - * @param fileInfo File to delete data for. - * @param msg Expected failure message. - */ - private void expectsDeleteFail(final IgfsEntryInfo fileInfo, @Nullable String msg) { - GridTestUtils.assertThrows(log, new Callable() { - @Override public Object call() throws Exception { - mgr.delete(fileInfo); - - return null; - } - }, IgfsException.class, msg); - } - - /** - * Test expected failures for 'affinity' operation. - * - * @param info File info to resolve affinity nodes for. - * @param start Start position in the file. - * @param len File part length to get affinity for. - * @param msg Expected failure message. - */ - private void expectsAffinityFail(final IgfsEntryInfo info, final long start, final long len, - @Nullable String msg) { - GridTestUtils.assertThrows(log, new Callable() { - @Override public Object call() throws Exception { - mgr.affinity(info, start, len); + mgr.storeDataBlocks(reserved, reserved.length(), null, 0, + ByteBuffer.wrap(data), data.length, false, range, null, f); return null; } \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java (revision ) @@ -25,27 +25,20 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; -import org.apache.ignite.igfs.IgfsOutOfSpaceException; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; @@ -60,12 +53,10 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; -import java.io.DataInput; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -75,8 +66,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; @@ -88,10 +77,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -99,9 +84,6 @@ * Cache based file's data container. */ public class IgfsDataManager extends IgfsManager { - /** IGFS. */ - private IgfsEx igfs; - /** Data internal cache. */ private IgniteInternalCache dataCachePrj; @@ -120,33 +102,12 @@ /** Group size. */ private int grpSize; - /** Byte buffer writer. */ - private ByteBufferBlocksWriter byteBufWriter = new ByteBufferBlocksWriter(); - - /** Data input writer. */ - private DataInputBlocksWriter dataInputWriter = new DataInputBlocksWriter(); - - /** Pending writes future. */ - private ConcurrentMap pendingWrites = new ConcurrentHashMap8<>(); - /** Affinity key generator. */ private AtomicLong affKeyGen = new AtomicLong(); - /** IGFS executor service. */ - private ExecutorService igfsSvc; - - /** Request ID counter for write messages. */ - private AtomicLong reqIdCtr = new AtomicLong(); - - /** IGFS communication topic. */ - private Object topic; - /** Async file delete worker. */ private AsyncDeleteWorker delWorker; - /** Trash purge timeout. */ - private long trashPurgeTimeout; - /** On-going remote reads futures. */ private final ConcurrentHashMap8> rmtReadFuts = new ConcurrentHashMap8<>(); @@ -183,42 +144,10 @@ /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - igfs = igfsCtx.igfs(); - dataCacheStartLatch = new CountDownLatch(1); String igfsName = igfsCtx.configuration().getName(); - topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName); - - igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - if (msg instanceof IgfsBlocksMessage) - processBlocksMessage(nodeId, (IgfsBlocksMessage)msg); - else if (msg instanceof IgfsAckMessage) - processAckMessage(nodeId, (IgfsAckMessage)msg); - } - }); - - igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - if (igfsCtx.igfsNode(discoEvt.eventNode())) { - for (WriteCompletionFuture future : pendingWrites.values()) { - future.onError(discoEvt.eventNode().id(), - new ClusterTopologyCheckedException("Node left grid before write completed: " + evt.node().id())); - } - } - } - }, EVT_NODE_LEFT, EVT_NODE_FAILED); - - igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService(); - - trashPurgeTimeout = igfsCtx.configuration().getTrashPurgeTimeout(); - putExecSvc = igfsCtx.configuration().getDualModePutExecutorService(); if (putExecSvc != null) @@ -309,7 +238,8 @@ * @param prevAffKey Affinity key of previous block. * @return Affinity key. */ - public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) { + @SuppressWarnings("ConstantConditions") + @Nullable public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) { // Do not generate affinity key for non-affinity nodes. if (!dataCache.context().affinityNode()) return null; @@ -372,8 +302,6 @@ @Nullable public IgniteInternalFuture dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path, final long blockIdx, @Nullable final IgfsSecondaryFileSystemPositionedReadable secReader) throws IgniteCheckedException { - //assert validTxState(any); // Allow this method call for any transaction state. - assert fileInfo != null; assert blockIdx >= 0; @@ -472,109 +400,6 @@ } /** - * Registers write future in igfs data manager. - * - * @param fileInfo File info of file opened to write. - * @return Future that will be completed when all ack messages are received or when write failed. - */ - public IgniteInternalFuture writeStart(IgfsEntryInfo fileInfo) { - WriteCompletionFuture fut = new WriteCompletionFuture(fileInfo.id()); - - WriteCompletionFuture oldFut = pendingWrites.putIfAbsent(fileInfo.id(), fut); - - assert oldFut == null : "Opened write that is being concurrently written: " + fileInfo; - - if (log.isDebugEnabled()) - log.debug("Registered write completion future for file output stream [fileInfo=" + fileInfo + - ", fut=" + fut + ']'); - - return fut; - } - - /** - * Notifies data manager that no further writes will be performed on stream. - * - * @param fileInfo File info being written. - */ - public void writeClose(IgfsEntryInfo fileInfo) { - WriteCompletionFuture fut = pendingWrites.get(fileInfo.id()); - - if (fut != null) - fut.markWaitingLastAck(); - else { - if (log.isDebugEnabled()) - log.debug("Failed to find write completion future for file in pending write map (most likely it was " + - "failed): " + fileInfo); - } - } - - /** - * Store data blocks in file.
- * Note! If file concurrently deleted we'll get lost blocks. - * - * @param fileInfo File info. - * @param reservedLen Reserved length. - * @param remainder Remainder. - * @param remainderLen Remainder length. - * @param data Data to store. - * @param flush Flush flag. - * @param affinityRange Affinity range to update if file write can be colocated. - * @param batch Optional secondary file system worker batch. - * - * @return Remainder if data did not fill full block. - * @throws IgniteCheckedException If failed. - */ - @Nullable public byte[] storeDataBlocks( - IgfsEntryInfo fileInfo, - long reservedLen, - @Nullable byte[] remainder, - int remainderLen, - ByteBuffer data, - boolean flush, - IgfsFileAffinityRange affinityRange, - @Nullable IgfsFileWorkerBatch batch - ) throws IgniteCheckedException { - //assert validTxState(any); // Allow this method call for any transaction state. - - return byteBufWriter.storeDataBlocks(fileInfo, reservedLen, remainder, remainderLen, data, data.remaining(), - flush, affinityRange, batch); - } - - /** - * Store data blocks in file.
- * Note! If file concurrently deleted we'll got lost blocks. - * - * @param fileInfo File info. - * @param reservedLen Reserved length. - * @param remainder Remainder. - * @param remainderLen Remainder length. - * @param in Data to store. - * @param len Data length to store. - * @param flush Flush flag. - * @param affinityRange File affinity range to update if file cal be colocated. - * @param batch Optional secondary file system worker batch. - * @throws IgniteCheckedException If failed. - * @return Remainder of data that did not fit the block if {@code flush} flag is {@code false}. - * @throws IOException If store failed. - */ - @Nullable public byte[] storeDataBlocks( - IgfsEntryInfo fileInfo, - long reservedLen, - @Nullable byte[] remainder, - int remainderLen, - DataInput in, - int len, - boolean flush, - IgfsFileAffinityRange affinityRange, - @Nullable IgfsFileWorkerBatch batch - ) throws IgniteCheckedException, IOException { - //assert validTxState(any); // Allow this method call for any transaction state. - - return dataInputWriter.storeDataBlocks(fileInfo, reservedLen, remainder, remainderLen, in, len, flush, - affinityRange, batch); - } - - /** * Delete file's data from data cache. * * @param fileInfo File details to remove data for. @@ -974,6 +799,7 @@ * @param inTx Expected transaction state. * @return Transaction state is correct. */ + @SuppressWarnings("ConstantConditions") private boolean validTxState(boolean inTx) { boolean txState = inTx == (dataCachePrj.tx() != null); @@ -984,108 +810,30 @@ } /** - * @param fileId File ID. - * @param node Node to process blocks on. * @param blocks Blocks to put in cache. + * @param fut Compound future. * @throws IgniteCheckedException If batch processing failed. */ - private void processBatch(IgniteUuid fileId, final ClusterNode node, - final Map blocks) throws IgniteCheckedException { - final long batchId = reqIdCtr.getAndIncrement(); + private void processBatch(final Map blocks, GridCompoundFuture fut) + throws IgniteCheckedException { + assert fut != null; + assert !blocks.isEmpty(); - final WriteCompletionFuture completionFut = pendingWrites.get(fileId); - - if (completionFut == null) { - if (log.isDebugEnabled()) - log.debug("Missing completion future for file write request (most likely exception occurred " + - "which will be thrown upon stream close) [nodeId=" + node.id() + ", fileId=" + fileId + ']'); - - return; + for (Map.Entry e: blocks.entrySet()) + fut.add(dataCachePrj.putAsync(e.getKey(), e.getValue())); - } + } - // Throw exception if future is failed in the middle of writing. - if (completionFut.isDone()) - completionFut.get(); - - completionFut.onWriteRequest(node.id(), batchId); - - final UUID nodeId = node.id(); - - if (!node.isLocal()) { - final IgfsBlocksMessage msg = new IgfsBlocksMessage(fileId, batchId, blocks); - - try { - igfsCtx.send(nodeId, topic, msg, IGFS_POOL); - } - catch (IgniteCheckedException e) { - completionFut.onError(nodeId, e); - } - } - else { - callIgfsLocalSafe(new GridPlainCallable() { - @Override @Nullable public Object call() throws Exception { - storeBlocksAsync(blocks).listen(new CI1>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); - - completionFut.onWriteAck(nodeId, batchId); - } - catch (IgniteCheckedException e) { - completionFut.onError(nodeId, e); - } - } - }); - - return null; - } - }); - } - } - /** * If partial block write is attempted, both colocated and non-colocated keys are locked and data is appended * to correct block. * - * @param fileId File ID. * @param colocatedKey Block key. * @param startOff Data start offset within block. * @param data Data to write. * @throws IgniteCheckedException If update failed. */ - private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff, - byte[] data) throws IgniteCheckedException { - if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) { - try { - igfs.awaitDeletesAsync().get(trashPurgeTimeout); - } - catch (IgniteFutureTimeoutCheckedException ignore) { - // Ignore. - } - - // Additional size check. - if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) { - final WriteCompletionFuture completionFut = pendingWrites.get(fileId); - - if (completionFut == null) { - if (log.isDebugEnabled()) - log.debug("Missing completion future for file write request (most likely exception occurred " + - "which will be thrown upon stream close) [fileId=" + fileId + ']'); - - return; - } - - IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " + - "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() + - ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'); - - completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + - igfsCtx.kernalContext().localNodeId(), e)); - - return; - } - } - + private void processPartialBlockWrite(IgfsBlockKey colocatedKey, int startOff, byte[] data) + throws IgniteCheckedException { // No affinity key present, just concat and return. if (colocatedKey.affinityKey() == null) { dataCachePrj.invoke(colocatedKey, new UpdateProcessor(startOff, data)); @@ -1134,28 +882,6 @@ } /** - * Executes callable in IGFS executor service. If execution rejected, callable will be executed - * in caller thread. - * - * @param c Callable to execute. - */ - private void callIgfsLocalSafe(Callable c) { - try { - igfsSvc.submit(c); - } - catch (RejectedExecutionException ignored) { - // This exception will happen if network speed is too low and data comes faster - // than we can send it to remote nodes. - try { - c.call(); - } - catch (Exception e) { - log.warning("Failed to execute IGFS callable: " + c, e); - } - } - } - - /** * Put data block read from the secondary file system to the cache. * * @param key Key. @@ -1217,99 +943,6 @@ } /** - * @param blocks Blocks to write. - * @return Future that will be completed after put is done. - */ - @SuppressWarnings("unchecked") - private IgniteInternalFuture storeBlocksAsync(Map blocks) { - assert !blocks.isEmpty(); - - if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) { - try { - try { - igfs.awaitDeletesAsync().get(trashPurgeTimeout); - } - catch (IgniteFutureTimeoutCheckedException ignore) { - // Ignore. - } - - // Additional size check. - if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) - return new GridFinishedFuture( - new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " + - "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() + - ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']')); - - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(new IgniteCheckedException("Failed to store data " + - "block due to unexpected exception.", e)); - } - } - - return dataCachePrj.putAllAsync(blocks); - } - - /** - * @param nodeId Node ID. - * @param blocksMsg Write request message. - */ - private void processBlocksMessage(final UUID nodeId, final IgfsBlocksMessage blocksMsg) { - storeBlocksAsync(blocksMsg.blocks()).listen(new CI1>() { - @Override public void apply(IgniteInternalFuture fut) { - IgniteCheckedException err = null; - - try { - fut.get(); - } - catch (IgniteCheckedException e) { - err = e; - } - - try { - // Send reply back to node. - igfsCtx.send(nodeId, topic, new IgfsAckMessage(blocksMsg.fileId(), blocksMsg.id(), err), IGFS_POOL); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send batch acknowledgement (did node leave the grid?) [nodeId=" + nodeId + - ", fileId=" + blocksMsg.fileId() + ", batchId=" + blocksMsg.id() + ']', e); - } - } - }); - } - - /** - * @param nodeId Node ID. - * @param ackMsg Write acknowledgement message. - */ - private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) { - try { - ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal message (will ignore): " + ackMsg, e); - - return; - } - - IgniteUuid fileId = ackMsg.fileId(); - - WriteCompletionFuture fut = pendingWrites.get(fileId); - - if (fut != null) { - if (ackMsg.error() != null) - fut.onError(nodeId, ackMsg.error()); - else - fut.onWriteAck(nodeId, ackMsg.id()); - } - else { - if (log.isDebugEnabled()) - log.debug("Received write acknowledgement for non-existent write future (most likely future was " + - "failed) [nodeId=" + nodeId + ", fileId=" + fileId + ']'); - } - } - - /** * Creates block key based on block ID, file info and local affinity range. * * @param block Block ID. @@ -1346,199 +979,149 @@ } /** - * Abstract class to handle writes from different type of input data. - */ - private abstract class BlocksWriter { - /** - * Stores data blocks read from abstracted source. - * - * @param fileInfo File info. - * @param reservedLen Reserved length. - * @param remainder Remainder. - * @param remainderLen Remainder length. - * @param src Source to read bytes. - * @param srcLen Data length to read from source. - * @param flush Flush flag. - * @param affinityRange Affinity range to update if file write can be colocated. + * Stores data blocks read from abstracted source. + * + * @param fileInfo File info. + * @param reservedLen Reserved length. + * @param remainder Remainder. + * @param remainderLen Remainder length. + * @param src Source to read bytes. + * @param srcLen Data length to read from source. + * @param flush Flush flag. + * @param affinityRange Affinity range to update if file write can be colocated. - * @param batch Optional secondary file system worker batch. + * @param secondaryPutWorker Optional secondary file system worker batch. - * @throws IgniteCheckedException If failed. - * @return Data remainder if {@code flush} flag is {@code false}. - */ + * @throws IgniteCheckedException If failed. + * @return Data remainder if {@code flush} flag is {@code false}. + */ - @Nullable public byte[] storeDataBlocks( - IgfsEntryInfo fileInfo, - long reservedLen, + final byte[] storeDataBlocks( + final IgfsEntryInfo fileInfo, + final long reservedLen, - @Nullable byte[] remainder, - final int remainderLen, + @Nullable byte[] remainder, + final int remainderLen, - T src, - int srcLen, - boolean flush, - IgfsFileAffinityRange affinityRange, - @Nullable IgfsFileWorkerBatch batch - ) throws IgniteCheckedException { - IgniteUuid id = fileInfo.id(); - int blockSize = fileInfo.blockSize(); + final T src, + final int srcLen, + final boolean flush, + final IgfsFileAffinityRange affinityRange, + final @Nullable IgfsFileWorkerBatch secondaryPutWorker, + final GridCompoundFuture fut + ) throws IOException, IgniteCheckedException { + final IgniteUuid id = fileInfo.id(); + final int blockSize = fileInfo.blockSize(); - int len = remainderLen + srcLen; + assert fut != null; + final int len = remainderLen + srcLen; + - if (len > reservedLen) - throw new IgfsException("Not enough space reserved to store data [id=" + id + - ", reservedLen=" + reservedLen + ", remainderLen=" + remainderLen + - ", data.length=" + srcLen + ']'); + if (len > reservedLen) + throw new IgfsException("Not enough space reserved to store data [id=" + id + + ", reservedLen=" + reservedLen + ", remainderLen=" + remainderLen + + ", data.length=" + srcLen + ']'); - long start = reservedLen - len; - long first = start / blockSize; - long limit = (start + len + blockSize - 1) / blockSize; + final long start = reservedLen - len; + final long first = start / blockSize; + final long limit = (start + len + blockSize - 1) / blockSize; + - int written = 0; - int remainderOff = 0; + int written = 0; + int remainderOff = 0; - Map nodeBlocks = U.newLinkedHashMap((int)(limit - first)); + Map nodeBlocks = U.newLinkedHashMap((int)(limit - first)); - ClusterNode node = null; - int off = 0; - for (long block = first; block < limit; block++) { - final long blockStartOff = block == first ? (start % blockSize) : 0; - final long blockEndOff = block == (limit - 1) ? (start + len - 1) % blockSize : (blockSize - 1); + for (long block = first; block < limit; block++) { + final long blockStartOff = block == first ? (start % blockSize) : 0; + final long blockEndOff = block == (limit - 1) ? (start + len - 1) % blockSize : (blockSize - 1); - final long size = blockEndOff - blockStartOff + 1; + final long size = blockEndOff - blockStartOff + 1; - assert size > 0 && size <= blockSize; - assert blockStartOff + size <= blockSize; + assert size > 0 && size <= blockSize; + assert blockStartOff + size <= blockSize; - final byte[] portion = new byte[(int)size]; + final byte[] portion = new byte[(int)size]; - // Data length to copy from remainder. - int portionOff = Math.min((int)size, remainderLen - remainderOff); + // Data length to copy from remainder. + int portionOff = Math.min((int)size, remainderLen - remainderOff); - if (remainderOff != remainderLen) { - U.arrayCopy(remainder, remainderOff, portion, 0, portionOff); + if (remainderOff != remainderLen) { + U.arrayCopy(remainder, remainderOff, portion, 0, portionOff); - remainderOff += portionOff; - } + remainderOff += portionOff; + } - if (portionOff < size) + if (portionOff < size) - readData(src, portion, portionOff); + IgfsUtils.readData(src, portion, portionOff, portion.length - portionOff); - // Will update range if necessary. - IgfsBlockKey key = createBlockKey(block, fileInfo, affinityRange); + // Will update range if necessary. + IgfsBlockKey key = createBlockKey(block, fileInfo, affinityRange); - ClusterNode primaryNode = dataCachePrj.cache().affinity().mapKeyToNode(key); - - if (block == first) { - off = (int)blockStartOff; - node = primaryNode; - } - - if (size == blockSize) { - assert blockStartOff == 0 : "Cannot write the whole block not from start position [start=" + - start + ", block=" + block + ", blockStartOff=" + blockStartOff + ", blockEndOff=" + - blockEndOff + ", size=" + size + ", first=" + first + ", limit=" + limit + ", blockSize=" + - blockSize + ']'; - } - else { - // If partial block is being written from the beginning and not flush, return it as remainder. - if (blockStartOff == 0 && !flush) { - assert written + portion.length == len; + if (size == blockSize) { + assert blockStartOff == 0 : "Cannot write the whole block not from start position [start=" + + start + ", block=" + block + ", blockStartOff=" + blockStartOff + ", blockEndOff=" + + blockEndOff + ", size=" + size + ", first=" + first + ", limit=" + limit + ", blockSize=" + + blockSize + ']'; + } + else { + // If partial block is being written from the beginning and not flush, return it as remainder. + if (blockStartOff == 0 && !flush) { + assert written + portion.length == len; - if (!nodeBlocks.isEmpty()) { + if (!nodeBlocks.isEmpty()) { - processBatch(id, node, nodeBlocks); + processBatch(nodeBlocks, fut); - metrics.addWriteBlocks(1, 0); - } + metrics.addWriteBlocks(1, 0); + } - return portion; + return portion; // exit point #1 - } - } + } + } - int writtenSecondary = 0; + int writtenSecondary = 0; - if (batch != null) { - if (!batch.write(portion)) + if (secondaryPutWorker != null) { + if (!secondaryPutWorker.write(portion)) - throw new IgniteCheckedException("Cannot write more data to the secondary file system output " + + throw new IgniteCheckedException("Cannot write more data to the secondary file system output " + - "stream because it was marked as closed: " + batch.path()); + "stream because it was marked as closed: " + secondaryPutWorker.path()); - else - writtenSecondary = 1; - } + else + writtenSecondary = 1; + } - assert primaryNode != null; - - int writtenTotal = 0; + int writtenTotal = 0; - if (!primaryNode.id().equals(node.id())) { - if (!nodeBlocks.isEmpty()) - processBatch(id, node, nodeBlocks); + if (!nodeBlocks.isEmpty()) { + processBatch(nodeBlocks, fut); - writtenTotal = nodeBlocks.size(); + writtenTotal = nodeBlocks.size(); + } - nodeBlocks = U.newLinkedHashMap((int)(limit - first)); + nodeBlocks = U.newLinkedHashMap((int)(limit - first)); - node = primaryNode; - } - assert size == portion.length; + assert size == portion.length; - if (size != blockSize) { - // Partial writes must be always synchronous. + if (size != blockSize) { + // Partial writes must be always synchronous. - processPartialBlockWrite(id, key, block == first ? off : 0, portion); + processPartialBlockWrite(key, block == first ? (int)blockStartOff : 0, portion); - writtenTotal++; - } - else - nodeBlocks.put(key, portion); + writtenTotal++; + } + else + nodeBlocks.put(key, portion); - metrics.addWriteBlocks(writtenTotal, writtenSecondary); + metrics.addWriteBlocks(writtenTotal, writtenSecondary); - written += portion.length; - } + written += portion.length; + } - // Process final batch, if exists. - if (!nodeBlocks.isEmpty()) { + // Process final batch, if exists. + if (!nodeBlocks.isEmpty()) { - processBatch(id, node, nodeBlocks); + processBatch(nodeBlocks, fut); - metrics.addWriteBlocks(nodeBlocks.size(), 0); - } + metrics.addWriteBlocks(nodeBlocks.size(), 0); + } - assert written == len; + assert written == len; - return null; + return null; // exit point #2 - } + } - /** + /** - * Fully reads data from specified source into the specified byte array. - * - * @param src Data source. - * @param dst Destination. - * @param dstOff Destination buffer offset. - * @throws IgniteCheckedException If read failed. - */ - protected abstract void readData(T src, byte[] dst, int dstOff) throws IgniteCheckedException; - } - - /** - * Byte buffer writer. - */ - private class ByteBufferBlocksWriter extends BlocksWriter { - /** {@inheritDoc} */ - @Override protected void readData(ByteBuffer src, byte[] dst, int dstOff) { - src.get(dst, dstOff, dst.length - dstOff); - } - } - - /** - * Data input writer. - */ - private class DataInputBlocksWriter extends BlocksWriter { - /** {@inheritDoc} */ - @Override protected void readData(DataInput src, byte[] dst, int dstOff) - throws IgniteCheckedException { - try { - src.readFully(dst, dstOff, dst.length - dstOff); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - } - - /** * Helper closure to update data in cache. */ @GridInternal @@ -1638,8 +1221,6 @@ protected AsyncDeleteWorker(@Nullable String gridName, String name, IgniteLogger log) { super(gridName, name, log); - long time = System.currentTimeMillis(); - stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid()); } @@ -1738,180 +1319,6 @@ req = delReqs.poll(); } } - } - } - - /** - * Allows output stream to await for all current acks. - * - * @param fileId File ID. - * @throws IgniteInterruptedCheckedException In case of interrupt. - */ - void awaitAllAcksReceived(IgniteUuid fileId) throws IgniteInterruptedCheckedException { - WriteCompletionFuture fut = pendingWrites.get(fileId); - - if (fut != null) - fut.awaitAllAcksReceived(); - } - - /** - * Future that is completed when all participating - */ - private class WriteCompletionFuture extends GridFutureAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** File id to remove future from map. */ - private final IgniteUuid fileId; - - /** Pending acks. */ - private final ConcurrentMap ackMap = new ConcurrentHashMap8<>(); - - /** Lock for map-related conditions. */ - private final Lock lock = new ReentrantLock(); - - /** Condition to wait for empty map. */ - private final Condition allAcksRcvCond = lock.newCondition(); - - /** Flag indicating future is waiting for last ack. */ - private volatile boolean awaitingLast; - - /** - * @param fileId File id. - */ - private WriteCompletionFuture(IgniteUuid fileId) { - assert fileId != null; - - this.fileId = fileId; - } - - /** - * Await all pending data blockes to be acked. - * - * @throws IgniteInterruptedCheckedException In case of interrupt. - */ - public void awaitAllAcksReceived() throws IgniteInterruptedCheckedException { - lock.lock(); - - try { - while (!ackMap.isEmpty()) - U.await(allAcksRcvCond); - } - finally { - lock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { - if (!isDone()) { - pendingWrites.remove(fileId, this); - - if (super.onDone(res, err)) - return true; - } - - return false; - } - - /** - * Write request will be asynchronously executed on node with given ID. - * - * @param nodeId Node ID. - * @param batchId Assigned batch ID. - */ - private void onWriteRequest(UUID nodeId, long batchId) { - if (!isDone()) { - UUID pushedOut = ackMap.putIfAbsent(batchId, nodeId); - - assert pushedOut == null; - } - } - - /** - * Answers if there are some batches for the specified node we're currently waiting acks for. - * - * @param nodeId The node Id. - * @return If there are acks awaited from this node. - */ - private boolean hasPendingAcks(UUID nodeId) { - assert nodeId != null; - - for (Map.Entry e : ackMap.entrySet()) - if (nodeId.equals(e.getValue())) - return true; - - return false; - } - - /** - * Error occurred on node with given ID. - * - * @param nodeId Node ID. - * @param e Caught exception. - */ - private void onError(UUID nodeId, IgniteCheckedException e) { - // If waiting for ack from this node. - if (hasPendingAcks(nodeId)) { - ackMap.clear(); - - signalNoAcks(); - - if (e.hasCause(IgfsOutOfSpaceException.class)) - onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + nodeId, e)); - else - onDone(new IgniteCheckedException( - "Failed to wait for write completion (write failed on node): " + nodeId, e)); - } - } - - /** - * Write ack received from node with given ID for given batch ID. - * - * @param nodeId Node ID. - * @param batchId Batch ID. - */ - private void onWriteAck(UUID nodeId, long batchId) { - if (!isDone()) { - boolean rmv = ackMap.remove(batchId, nodeId); - - assert rmv : "Received acknowledgement message for not registered batch [nodeId=" + - nodeId + ", batchId=" + batchId + ']'; - - if (ackMap.isEmpty()) { - signalNoAcks(); - - if (awaitingLast) - onDone(true); - } - } - } - - /** - * Signal that currenlty there are no more pending acks. - */ - private void signalNoAcks() { - lock.lock(); - - try { - allAcksRcvCond.signalAll(); - } - finally { - lock.unlock(); - } - } - - /** - * Marks this future as waiting last ack. - */ - private void markWaitingLastAck() { - awaitingLast = true; - - if (log.isDebugEnabled()) - log.debug("Marked write completion future as awaiting last ack: " + fileId); - - if (ackMap.isEmpty()) - onDone(true); } } } \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java (revision ) @@ -17,6 +17,16 @@ package org.apache.ignite.internal.processors.igfs; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheMode; @@ -40,22 +50,9 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import org.jsr166.ThreadLocalRandom8; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; - import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -256,41 +253,6 @@ } /** - * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testPartitionedOversizeDelay() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = true; - - checkOversizeDelay(); - } - - /** - * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testColocatedOversizeDelay() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = false; - - checkOversizeDelay(); - } - - /** - * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testReplicatedOversizeDelay() throws Exception { - cacheMode = REPLICATED; - - checkOversizeDelay(); - } - - /** * Ensure that IGFS size is correctly updated in case of preloading for PARTITIONED cache. * * @throws Exception If failed. @@ -447,40 +409,42 @@ final IgfsPath path = new IgfsPath("/file"); // This write is expected to be successful. - IgfsOutputStream os = igfs(0).create(path, false); + try (IgfsOutputStream os = igfs(0).create(path, false)) { - os.write(chunk(BLOCK_SIZE - 1)); + os.write(chunk(BLOCK_SIZE - 1)); - os.close(); + } // This write must be successful as well. - os = igfs(0).append(path, false); + try (IgfsOutputStream os = igfs(0).append(path, false)) { - os.write(chunk(1)); + os.write(chunk(1)); - os.close(); + } - // This write must fail w/ exception. - GridTestUtils.assertThrows(log(), new Callable() { - @Override public Object call() throws Exception { + try { - IgfsOutputStream osErr = igfs(0).append(path, false); + IgfsOutputStream osErr = igfs(0).append(path, false); - try { - osErr.write(chunk(BLOCK_SIZE)); - osErr.close(); + try { + osErr.write(chunk(BLOCK_SIZE)); + osErr.close(); - - return null; - } - catch (IOException e) { - Throwable e0 = e; + } + catch (IOException e) { + Throwable e0 = e; - while (e0.getCause() != null) - e0 = e0.getCause(); + while (e0.getCause() != null) + e0 = e0.getCause(); - throw (Exception)e0; + throw (Exception) e0; - } - finally { - U.closeQuiet(osErr); - } + } + finally { + U.closeQuiet(osErr); + } + + fail("IgfsOutOfSpaceException expected."); - } + } - }, IgfsOutOfSpaceException.class, "Failed to write data block (IGFS maximum data size exceeded) [used=" + - igfsMaxData + ", allowed=" + igfsMaxData + ']'); + catch (IgfsOutOfSpaceException ioose) { + assert ioose.getMessage().startsWith("Failed to write data block " + + "(IGFS maximum data size exceeded on node) "); + + assert ioose.getMessage().endsWith(", used=" + igfsMaxData + ", allowed=" + igfsMaxData + ']'); + } } /** \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java (revision 72df110499962c177857cbeff368528094535f5c) @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs; - -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.Ignition; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; -import org.apache.ignite.igfs.IgfsMode; -import org.apache.ignite.igfs.IgfsOutputStream; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.util.lang.GridAbsPredicateX; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.transactions.TransactionConcurrency; -import org.apache.ignite.transactions.TransactionIsolation; - -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - -/** - * Test to check for system pool starvation due to {@link IgfsBlocksMessage}. - */ -public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbstractTest { - /** First node name. */ - private static final String NODE_1_NAME = "node1"; - - /** Second node name. */ - private static final String NODE_2_NAME = "node2"; - - /** Data cache name. */ - private static final String DATA_CACHE_NAME = "data"; - - /** Meta cache name. */ - private static final String META_CACHE_NAME = "meta"; - - /** Key in data caceh we will use to reproduce the issue. */ - private static final Integer DATA_KEY = 1; - - /** First node. */ - private Ignite victim; - - /** Second node. */ - private Ignite attacker; - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked", "ConstantConditions"}) - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - // Start nodes. - TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - victim = Ignition.start(config(NODE_1_NAME, ipFinder)); - attacker = Ignition.start(config(NODE_2_NAME, ipFinder)); - - // Check if we selected victim correctly. - if (F.eq(dataCache(victim).affinity().mapKeyToNode(DATA_KEY).id(), attacker.cluster().localNode().id())) { - Ignite tmp = victim; - - victim = attacker; - - attacker = tmp; - } - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - G.stopAll(true); - - victim = null; - attacker = null; - - super.afterTest(); - } - - /** - * Test starvation. - * - * @throws Exception If failed. - */ - @SuppressWarnings("unchecked") - public void testStarvation() throws Exception { - // 1. Create two IGFS file to make all system threads busy. - CountDownLatch fileWriteLatch = new CountDownLatch(1); - - final IgniteInternalFuture fileFut1 = createFileAsync(new IgfsPath("/file1"), fileWriteLatch); - final IgniteInternalFuture fileFut2 = createFileAsync(new IgfsPath("/file2"), fileWriteLatch); - - // 2. Start transaction and keep it opened. - final CountDownLatch txStartLatch = new CountDownLatch(1); - final CountDownLatch txCommitLatch = new CountDownLatch(1); - - IgniteInternalFuture txFut = GridTestUtils.runAsync(new Callable() { - @Override public Void call() throws Exception { - GridCacheAdapter dataCache = dataCache(attacker); - - try (IgniteInternalTx tx = - dataCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { - dataCache.put(DATA_KEY, 0); - - txStartLatch.countDown(); - - txCommitLatch.await(); - - tx.commit(); - } - - return null; - } - }); - - txStartLatch.await(); - - // 3. Start async operation to drain semaphore permits. - final IgniteInternalFuture putFut = dataCache(victim).putAsync(DATA_KEY, 1); - - assert !awaitFuture(putFut); - - // 4. Write data to files and ensure we stuck. - fileWriteLatch.countDown(); - - assert !awaitFuture(fileFut1); - assert !awaitFuture(fileFut2); - - // 5. Finish transaction. - txCommitLatch.countDown(); - - assert awaitFuture(txFut); - - // 6. Async put must succeed. - assert awaitFuture(putFut); - - // 7. Writes must succeed. - assert awaitFuture(fileFut1); - assert awaitFuture(fileFut2); - } - - /** - * Await future completion. - * - * @param fut Future. - * @return {@code True} if future completed. - * @throws Exception If failed. - */ - private static boolean awaitFuture(final IgniteInternalFuture fut) throws Exception { - return GridTestUtils.waitForCondition(new GridAbsPredicateX() { - @Override public boolean applyx() throws IgniteCheckedException { - return fut.isDone(); - } - }, 1000); - } - - /** - * Create IGFS file asynchronously. - * - * @param path Path. - * @return Future. - */ - private IgniteInternalFuture createFileAsync(final IgfsPath path, final CountDownLatch writeStartLatch) { - return GridTestUtils.runAsync(new Callable() { - @Override public Void call() throws Exception { - IgniteFileSystem igfs = attacker.fileSystem(null); - - try (IgfsOutputStream out = igfs.create(path, true)) { - writeStartLatch.await(); - - out.write(new byte[1024]); - - out.flush(); - } - - return null; - } - }); - } - - /** - * Get data cache for node. - * - * @param node Node. - * @return Data cache. - * @throws Exception If failed. - */ - private GridCacheAdapter dataCache(Ignite node) throws Exception { - return ((IgniteKernal)node).internalCache(DATA_CACHE_NAME); - } - - /** - * Create node configuration. - * - * @param name Node name. - * @return Configuration. - * @throws Exception If failed. - */ - private IgniteConfiguration config(String name, TcpDiscoveryVmIpFinder ipFinder) throws Exception { - // Data cache configuration. - CacheConfiguration dataCcfg = new CacheConfiguration(); - - dataCcfg.setName(DATA_CACHE_NAME); - dataCcfg.setCacheMode(CacheMode.REPLICATED); - dataCcfg.setAtomicityMode(TRANSACTIONAL); - dataCcfg.setWriteSynchronizationMode(FULL_SYNC); - dataCcfg.setAffinityMapper(new DummyAffinityMapper(1)); - dataCcfg.setMaxConcurrentAsyncOperations(1); - - // Meta cache configuration. - CacheConfiguration metaCcfg = new CacheConfiguration(); - - metaCcfg.setName(META_CACHE_NAME); - metaCcfg.setCacheMode(CacheMode.REPLICATED); - metaCcfg.setAtomicityMode(TRANSACTIONAL); - metaCcfg.setWriteSynchronizationMode(FULL_SYNC); - - // File system configuration. - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setDefaultMode(IgfsMode.PRIMARY); - igfsCfg.setDataCacheName(DATA_CACHE_NAME); - igfsCfg.setMetaCacheName(META_CACHE_NAME); - igfsCfg.setFragmentizerEnabled(false); - igfsCfg.setBlockSize(1024); - - // Ignite configuration. - IgniteConfiguration cfg = getConfiguration(name); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCcfg, metaCcfg); - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setLocalHost("127.0.0.1"); - cfg.setConnectorConfiguration(null); - - cfg.setSystemThreadPoolSize(2); - cfg.setRebalanceThreadPoolSize(1); - cfg.setPublicThreadPoolSize(1); - - return cfg; - } - - /** - * Dimmy affinity mapper. - */ - private static class DummyAffinityMapper extends IgfsGroupDataBlocksKeyMapper { - /** */ - private static final long serialVersionUID = 0L; - - /** Dummy affinity key. */ - private static final Integer KEY = 1; - - /** - * Constructor. - * - * @param grpSize Group size. - */ - public DummyAffinityMapper(int grpSize) { - super(grpSize); - } - - /** {@inheritDoc} */ - @Override public Object affinityKey(Object key) { - return KEY; - } - } -} Index: modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java (revision ) @@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.igfs.IgfsBackupsDualAsyncSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsBackupsDualSyncSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsBackupsPrimarySelfTest; -import org.apache.ignite.internal.processors.igfs.IgfsBlockMessageSystemPoolStarvationSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsCachePerBlockLruEvictionPolicySelfTest; import org.apache.ignite.internal.processors.igfs.IgfsCacheSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsClientCacheSelfTest; @@ -126,8 +125,6 @@ suite.addTestSuite(IgfsBackupsPrimarySelfTest.class); suite.addTestSuite(IgfsBackupsDualSyncSelfTest.class); suite.addTestSuite(IgfsBackupsDualAsyncSelfTest.class); - - suite.addTestSuite(IgfsBlockMessageSystemPoolStarvationSelfTest.class); // TODO: Enable when IGFS failover is fixed. //suite.addTestSuite(IgfsBackupFailoverSelfTest.class); \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java (revision ) @@ -50,6 +50,7 @@ import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -712,7 +713,28 @@ * @param id The file id. * @return The new short name for trash directory. */ - static String composeNameForTrash(IgfsPath path, IgniteUuid id) { + public static String composeNameForTrash(IgfsPath path, IgniteUuid id) { return id.toString() + TRASH_NAME_SEPARATOR + path.toString(); + } + + /** + * Read data from some source. Source could be either {@code ByteBuffer} of {@code DataInput}. + * + * @param src Source. + * @param dst Destination. + * @param dstOff Destination offset. + * @param len Length. + * @throws IOException If failed. + * @throws IgniteCheckedException If failed. + */ + public static void readData(Object src, byte[] dst, int dstOff, int len) throws IOException, + IgniteCheckedException { + assert src != null; + assert src instanceof ByteBuffer || src instanceof DataInput; + + if (src instanceof ByteBuffer) + ((ByteBuffer)src).get(dst, dstOff, len); + else + ((DataInput)src).readFully(dst, dstOff, len); } } \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java (revision 72df110499962c177857cbeff368528094535f5c) @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs; - -import java.io.Externalizable; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.Nullable; - -/** - * Block write request acknowledgement message. - */ -public class IgfsAckMessage extends IgfsCommunicationMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** File id. */ - private IgniteUuid fileId; - - /** Request ID to ack. */ - private long id; - - /** Write exception. */ - @GridDirectTransient - private IgniteCheckedException err; - - /** */ - private byte[] errBytes; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public IgfsAckMessage() { - // No-op. - } - - /** - * @param fileId File ID. - * @param id Request ID. - * @param err Error. - */ - public IgfsAckMessage(IgniteUuid fileId, long id, @Nullable IgniteCheckedException err) { - this.fileId = fileId; - this.id = id; - this.err = err; - } - - /** - * @return File ID. - */ - public IgniteUuid fileId() { - return fileId; - } - - /** - * @return Batch ID. - */ - public long id() { - return id; - } - - /** - * @return Error occurred when writing this batch, if any. - */ - public IgniteCheckedException error() { - return err; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - super.prepareMarshal(marsh); - - if (err != null && errBytes == null) - errBytes = marsh.marshal(err); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(marsh, ldr); - - if (errBytes != null && err == null) - err = marsh.unmarshal(errBytes, ldr); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeIgniteUuid("fileId", fileId)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeLong("id", id)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 0: - errBytes = reader.readByteArray("errBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - fileId = reader.readIgniteUuid("fileId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - id = reader.readLong("id"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(IgfsAckMessage.class); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 64; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 3; - } -} \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java (revision 72df110499962c177857cbeff368528094535f5c) @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs; - -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.Map; -import org.apache.ignite.internal.GridDirectMap; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * IGFS write blocks message. - */ -public class IgfsBlocksMessage extends IgfsCommunicationMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** File id. */ - private IgniteUuid fileId; - - /** Batch id */ - private long id; - - /** Blocks to store. */ - @GridDirectMap(keyType = IgfsBlockKey.class, valueType = byte[].class) - private Map blocks; - - /** - * Empty constructor required by {@link Externalizable} - */ - public IgfsBlocksMessage() { - // No-op. - } - - /** - * Constructor. - * - * @param fileId File ID. - * @param id Message id. - * @param blocks Blocks to put in cache. - */ - public IgfsBlocksMessage(IgniteUuid fileId, long id, Map blocks) { - this.fileId = fileId; - this.id = id; - this.blocks = blocks; - } - - /** - * @return File id. - */ - public IgniteUuid fileId() { - return fileId; - } - - /** - * @return Batch id. - */ - public long id() { - return id; - } - - /** - * @return Map of blocks to put in cache. - */ - public Map blocks() { - return blocks; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeMap("blocks", blocks, MessageCollectionItemType.MSG, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeIgniteUuid("fileId", fileId)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeLong("id", id)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 0: - blocks = reader.readMap("blocks", MessageCollectionItemType.MSG, MessageCollectionItemType.BYTE_ARR, false); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - fileId = reader.readIgniteUuid("fileId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - id = reader.readLong("id"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(IgfsBlocksMessage.class); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 66; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 3; - } -} \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java (revision ) @@ -198,18 +198,9 @@ * @param data Data to store. * @throws IgniteCheckedException If failed. */ - protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException; + protected abstract void storeDataBlocks(Object data, int len) throws IgniteCheckedException, IOException; /** - * Store data blocks in file reading appropriate number of bytes from given data input. - * - * @param in Data input to read from. - * @param len Data length to store. - * @throws IgniteCheckedException If failed. - */ - protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException; - - /** * Close callback. It will be called only once in synchronized section. * * @throws IOException If failed. @@ -249,7 +240,7 @@ if (flip) buf.flip(); - storeDataBlock(buf); + storeDataBlocks(buf, buf.remaining()); } catch (IgniteCheckedException e) { throw new IOException("Failed to store data into file: " + path, e); \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java (revision ) @@ -108,9 +108,7 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse; -import org.apache.ignite.internal.processors.igfs.IgfsAckMessage; import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; -import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage; import org.apache.ignite.internal.processors.igfs.IgfsDeleteMessage; import org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange; import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerRequest; @@ -481,18 +479,8 @@ break; - case 64: - msg = new IgfsAckMessage(); - - break; - case 65: msg = new IgfsBlockKey(); - - break; - - case 66: - msg = new IgfsBlocksMessage(); break; Index: modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java (revision 72df110499962c177857cbeff368528094535f5c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java (revision ) @@ -129,7 +129,7 @@ * Process the batch. */ @SuppressWarnings("unchecked") - public void run() { + @Override public void run() { Throwable err = null; try { \ No newline at end of file