Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29242

Read time out when close write channel [flink-gs-fs-hadoop ]

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.15.0
    • None
    • FileSystems
    • None
    • flink version: 1.15

      jdk: 1.8

       

    Description

      Detail

      See in GSBlobStorageImpl

      @Override
      public int write(byte[] content, int start, int length) throws IOException {
          LOGGER.trace("Writing {} bytes to blob {}", length, blobIdentifier);
          Preconditions.checkNotNull(content);
          Preconditions.checkArgument(start >= 0);
          Preconditions.checkArgument(length >= 0);
      
          ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
          int written = writeChannel.write(byteBuffer);
          LOGGER.trace("Wrote {} bytes to blob {}", written, blobIdentifier);
          return written;
      }
      
      @Override
      public void close() throws IOException {
          LOGGER.trace("Closing write channel to blob {}", blobIdentifier);
          writeChannel.close();
      } 

      when I write data into google cloud storage by flink-gs-fs-haddoop.

      The service always has read time out exceptions, which can be reproduced in a very short time of task execution. 
      I tried to trace the code and found that it always occurs when the writeChannel Close code is executed. I tried retrying by modifying the source code but it didn't solve the problem, the timeout is 20s and the checkpoint will fail if this problem occurs.

      I tried to change the chunk size but found no help, with this component, I can't write data to gcs via flink.

       

      By the way, I found that 503 service unavailable occurs when create writeChannel. This problem occurs less often than Read time out, but it needs to be checked

      @Override
      public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) {
          LOGGER.trace("Creating writeable blob for identifier {}", blobIdentifier);
          Preconditions.checkNotNull(blobIdentifier);
      
          BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
          com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
          return new WriteChannel(blobIdentifier, writeChannel);
      }
      
      @Override
      public GSBlobStorage.WriteChannel writeBlob(
              GSBlobIdentifier blobIdentifier, MemorySize chunkSize) {
          LOGGER.trace(
                  "Creating writeable blob for identifier {} with chunk size {}",
                  blobIdentifier,
                  chunkSize);
          Preconditions.checkNotNull(blobIdentifier);
          Preconditions.checkArgument(chunkSize.getBytes() > 0);
      
          BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
          com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
          writeChannel.setChunkSize((int) chunkSize.getBytes());
          return new WriteChannel(blobIdentifier, writeChannel);
      } 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            j.zheng Jian Zheng
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: