Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.15.0
-
None
-
None
-
flink version: 1.15
jdk: 1.8
Description
Detail
See in GSBlobStorageImpl
@Override public int write(byte[] content, int start, int length) throws IOException { LOGGER.trace("Writing {} bytes to blob {}", length, blobIdentifier); Preconditions.checkNotNull(content); Preconditions.checkArgument(start >= 0); Preconditions.checkArgument(length >= 0); ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length); int written = writeChannel.write(byteBuffer); LOGGER.trace("Wrote {} bytes to blob {}", written, blobIdentifier); return written; } @Override public void close() throws IOException { LOGGER.trace("Closing write channel to blob {}", blobIdentifier); writeChannel.close(); }
when I write data into google cloud storage by flink-gs-fs-haddoop.
The service always has read time out exceptions, which can be reproduced in a very short time of task execution.
I tried to trace the code and found that it always occurs when the writeChannel Close code is executed. I tried retrying by modifying the source code but it didn't solve the problem, the timeout is 20s and the checkpoint will fail if this problem occurs.
I tried to change the chunk size but found no help, with this component, I can't write data to gcs via flink.
By the way, I found that 503 service unavailable occurs when create writeChannel. This problem occurs less often than Read time out, but it needs to be checked
@Override public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) { LOGGER.trace("Creating writeable blob for identifier {}", blobIdentifier); Preconditions.checkNotNull(blobIdentifier); BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build(); com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo); return new WriteChannel(blobIdentifier, writeChannel); } @Override public GSBlobStorage.WriteChannel writeBlob( GSBlobIdentifier blobIdentifier, MemorySize chunkSize) { LOGGER.trace( "Creating writeable blob for identifier {} with chunk size {}", blobIdentifier, chunkSize); Preconditions.checkNotNull(blobIdentifier); Preconditions.checkArgument(chunkSize.getBytes() > 0); BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build(); com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo); writeChannel.setChunkSize((int) chunkSize.getBytes()); return new WriteChannel(blobIdentifier, writeChannel); }