Details
-
Technical Debt
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.18.0
-
None
-
None
Description
We convert the data into arrow format through flink and send it to doris.
Data convertion likes this: RowData --> arrow --> doris.
But during testing, we found that the `ArrowSerializer` provided by flink-python consumes a lot of time in the `finishCurrentBatch` function.
A total of 1.4G parquet files, the overall conversion time is 70 seconds, but `finishCurrentBatch` takes a total of 40 seconds(especially `writeBatch` cost 39 seconds in `finishCurrentBatch`).
So, we compare with spark, data convertion likes this: InternalRow --> arrow --> doris.
Using the same parquet file, the overall conversion time only takes 35 seconds, and `writeBatch` only cost 10 seconds.
In spark, we use `org.apache.spark.sql.execution.arrow.ArrowWriter` to convert `InternalRow` into arrowVector, and then serialize arrowVector into binary through `org.apache.arrow.vector.ipc.ArrowStreamWriter$writeBatch`.
Simple code like this:
ArrowWriter arrowWriter = ArrowWriter.create(vectorSchemaRoot); // --- phase1: InternalRow to arrowVector while (....) { arrowWriter.write(iterator.next()); } arrowWriter.finish(); // --- phase2: arrowVector to binary ByteArrayOutputStream out = new ByteArrayOutputStream(); ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, null, out); writer.writeBatch(); writer.end(); // --- phase3: get binary out.toByteArray();
In flink, we use `org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer`. This class is very useful, not only includes the conversion of RowData to arrowVector, but also the serialization of arrowVector to binary.
Simple code like this:
arrowSerializer = new ArrowSerializer(rowType, rowType); outputStream = new ByteArrayOutputStream(); arrowSerializer.open(new ByteArrayInputStream(new byte[0]), outputStream); // --- phase1: RowData to arrowVector while(....) { arrowSerializer.write(rowData); } // --- phase2: arrowVector to binary arrowSerializer.finishCurrentBatch(); // --- phase3: get binary outputStream.toByteArray(); outputStream.reset();
In phase 1 and phase 3, the time of flink and spark is basically the same. In phase 2, spark's writeBatch function took 10 seconds, but the writeBatch function in flink's finishCurrentBatch took 40 seconds.
Is there any flink related configuration that I am missing? Or, did I use it wrong somewhere in flink?
Looking forward to your reply! Thanks!