Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33821

ArrowSerializer$finishCurrentBatch consumes too much time

    XMLWordPrintableJSON

Details

    • Technical Debt
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.18.0
    • None
    • API / Python
    • None

    Description

      We convert the data into arrow format through flink and send it to doris.
      Data convertion likes this: RowData --> arrow --> doris.
      But during testing, we found that the `ArrowSerializer` provided by flink-python consumes a lot of time in the `finishCurrentBatch` function.
      A total of 1.4G parquet files, the overall conversion time is 70 seconds, but `finishCurrentBatch` takes a total of 40 seconds(especially `writeBatch` cost 39 seconds in `finishCurrentBatch`).

       

      So, we compare with spark, data convertion likes this: InternalRow --> arrow --> doris.
      Using the same parquet file, the overall conversion time only takes 35 seconds, and `writeBatch` only cost 10 seconds.

       

      In spark, we use `org.apache.spark.sql.execution.arrow.ArrowWriter` to convert `InternalRow` into arrowVector, and then serialize arrowVector into binary through `org.apache.arrow.vector.ipc.ArrowStreamWriter$writeBatch`.
      Simple code like this:

       

                  ArrowWriter arrowWriter = ArrowWriter.create(vectorSchemaRoot);
      
                  // --- phase1: InternalRow to arrowVector
                  while (....) {
                      arrowWriter.write(iterator.next());
                  }
                  arrowWriter.finish();
      
                  // --- phase2: arrowVector to binary
                  ByteArrayOutputStream out = new ByteArrayOutputStream();
                  ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, null, out);
                  writer.writeBatch();
                  writer.end();
      
                  // --- phase3: get binary
                  out.toByteArray(); 

       

      In flink, we use `org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer`. This class is very useful, not only includes the conversion of RowData to arrowVector, but also the serialization of arrowVector to binary.
      Simple code like this:

                  arrowSerializer = new ArrowSerializer(rowType, rowType);
                  outputStream = new ByteArrayOutputStream();
                  arrowSerializer.open(new ByteArrayInputStream(new byte[0]), outputStream);
      
                  // --- phase1: RowData to arrowVector
                  while(....) {
                      arrowSerializer.write(rowData);
                  }
      
                  // --- phase2: arrowVector to binary
                  arrowSerializer.finishCurrentBatch();
      
                  // --- phase3: get binary
                  outputStream.toByteArray();
                  outputStream.reset(); 

      In phase 1 and phase 3, the time of flink and spark is basically the same. In phase 2, spark's writeBatch function took 10 seconds, but the writeBatch function in flink's finishCurrentBatch took 40 seconds.

      Is there any flink related configuration that I am missing? Or, did I use it wrong somewhere in flink?

       

      Looking forward to your reply! Thanks!

      Attachments

        Activity

          People

            Unassigned Unassigned
            wenchi wuwenchi
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: