Status: Resolved
Resolution: Fixed
Observed a broken future chain long lives in the current code:
// CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer( ChunkBuffer chunk) throws IOException { int effectiveChunkSize = chunk.remaining(); final long offset = chunkOffset.getAndAdd(effectiveChunkSize); final ByteString data = chunk.toByteString( bufferPool.byteStringConversion()); ChecksumData checksumData = checksum.computeChecksum(chunk); ChunkInfo chunkInfo = ChunkInfo.newBuilder() .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex) .setOffset(offset) .setLen(effectiveChunkSize) .setChecksumData(checksumData.getProtoBufMessage()) .build(); if (LOG.isDebugEnabled()) { LOG.debug("Writing chunk {} length {} at offset {}", chunkInfo.getChunkName(), effectiveChunkSize, offset); } try { XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, blockID.get(), data, token, replicationIndex); CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future = asyncReply.getResponse(); future.thenApplyAsync(e -> { // <-- the new stage is not holded and returned try { validateResponse(e); } catch (IOException sce) { future.completeExceptionally(sce); } return e; }, responseExecutor).exceptionally(e -> { String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " + "into block " + blockID; LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage()); CompletionException ce = new CompletionException(msg, e); setIoException(ce); throw ce; }); containerBlockData.addChunks(chunkInfo); return future; // <-- actually returning a future in the middle } catch (IOException | ExecutionException e) { throw new IOException(EXCEPTION_MSG + e.toString(), e); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); handleInterruptedException(ex, false); } return null; }
In Ratis path, the future is not used, so there is no problem;
In EC path, the future is waited below, so there is a problem.
// private boolean isFailed( ECBlockOutputStream outputStream, CompletableFuture<ContainerProtos. ContainerCommandResponseProto> chunkWriteResponseFuture) { ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto = null; try { containerCommandResponseProto = chunkWriteResponseFuture != null ? chunkWriteResponseFuture.get() : <-- wait for the future null; } catch (InterruptedException e) { outputStream.setIoException(e); Thread.currentThread().interrupt(); } catch (ExecutionException e) { outputStream.setIoException(e); } if ((outputStream != null && containerCommandResponseProto != null) && (outputStream.getIoException() != null || isStreamFailed( containerCommandResponseProto, outputStream))) { return true; } return false; }
Since `validateResponse` is already executed in the lambda in the 1st piece, the `isStreamFailed` is not needed anymore, just checking the already set IoException is ok.
Note that `executePutBlock` has the correct future chain and have `validateResponse` called, too.
Issue Links
- links to