Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35668

Throw exception "java.lang.OutOfMemoryError" when import data of a MySQL table to StarRocks

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • cdc-3.1.0
    • None
    • Flink CDC
    • None
    • flink-1.18.1

      flink-cdc-3.1.0

      MySQL 8.0.33

      StarRocks-3.2.7

    Description

      I have 40 mysql insert sql files of a big table which total record number is about 100 million, each file size is 100MB. I recover these files into a mysql table named "standby_atomic_action" use mysql cli program in a loop, at the same time, I started a Flink CDC pipeline with scan.startup.mode "initial" to copy the MySQL table data to a StarRocks table, when the Flink task executes the last sql split to get snapshot data, it returns more than 1 million records, then the Flink taskexecutor throw an exception "java.lang.OutOfMemoryError" and has been terminated.

      I have checked the flink cdc source code of method org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils#buildSplitScanQuery in module flink-connector-mysql-cdc, it calls method buildSplitQuery use parameter limitSize -1, so when isLastSplit is true, the split sql is "select * from standby_atomic_action where id>=?" without limit cause, at same time, mysql cli have just committed more than 1 million records which far greater than scan.incremental.snapshot.chunk.size default value 8096.

       
      Exception:
      2024-06-21 05:52:35,440 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Snapshot step 1 - Determining low watermark

      {ts_sec=0, file=binlog.000045, pos=488153283, kind=SPECIFIC, gtids=334bec6e-849e-11eb-9157-b8599f49e7f4:1-936486941, 778ec805-a69b-11eb-9250-506b4b02d56e:1-900365321, d6a416c1-a2de-11e9-adcf-506b4b233658:1-166810501, d6bb9ebd-a2de-11e9-936f-506b4bfd5c94:1-80755324, row=0, event=0}

      for split MySqlSnapshotSplit{tableId=qwgas.standby_atomic_action, splitId='qwgas.standby_atomic_action:10569', splitKeyType=[`id` BIGINT NOT NULL], splitStart=[92277940], splitEnd=null, highWatermark=null}
      2024-06-21 05:52:35,440 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Snapshot step 2 - Snapshotting data
      2024-06-21 05:52:35,440 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exporting data from split 'qwgas.standby_atomic_action:10569' of table qwgas.standby_atomic_action
      2024-06-21 05:52:35,440 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - For split 'qwgas.standby_atomic_action:10569' of table qwgas.standby_atomic_action using select statement: 'SELECT * FROM `qwgas`.`standby_atomic_action` WHERE `id` >= ?'
      ...
      ...
      ...
      2024-06-21 05:54:59,970 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 1877835 records for split 'qwgas.standby_atomic_action:10569' after 00:02:24.53
      2024-06-21 05:55:06,410 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 5149 records for split 'qwgas.standby_atomic_action_counter:148' after 00:00:33.897
      2024-06-21 05:55:07,484 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 6646 records for split 'qwgas.standby_atomic_action_counter:147' after 00:00:34.971
      2024-06-21 05:55:10,714 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 4214 records for split 'qwgas.standby_atomic_action_counter:149' after 00:00:37.106
      2024-06-21 05:55:11,784 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 1878137 records for split 'qwgas.standby_atomic_action:10569' after 00:02:36.344
      2024-06-21 05:55:17,187 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 5826 records for split 'qwgas.standby_atomic_action_counter:148' after 00:00:44.674
      2024-06-21 05:55:18,274 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 7150 records for split 'qwgas.standby_atomic_action_counter:147' after 00:00:45.761
      2024-06-21 05:55:21,522 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 4312 records for split 'qwgas.standby_atomic_action_counter:149' after 00:00:47.914
      2024-06-21 05:56:15,500 ERROR com.starrocks.data.load.stream.v2.StreamLoadManagerV2        [] - StarRocks-Sink-Manager error
      java.lang.OutOfMemoryError: Java heap space
      2024-06-21 05:56:15,500 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception.
      java.lang.OutOfMemoryError: Java heap space
      2024-06-21 05:56:15,500 WARN  org.jboss.netty.channel.socket.nio.AbstractNioSelector       [] - Unexpected exception in the selector loop.
      java.lang.OutOfMemoryError: Java heap space
      2024-06-21 05:56:15,500 WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry    [] - Failed to serialize accumulators for task.
      java.lang.OutOfMemoryError: Java heap space
      2024-06-21 05:56:15,501 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 6112 records for split 'qwgas.standby_atomic_action_counter:148' after 00:01:42.988
      2024-06-21 05:56:15,502 ERROR org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 'flink-pekko.remote.default-remote-dispatcher-16' produced an uncaught exception. Stopping the process...
      java.lang.OutOfMemoryError: Java heap space
      2024-06-21 05:56:15,502 INFO  org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask [] - Exported 7459 records for split 'qwgas.standby_atomic_action_counter:147' after 00:01:42.989
      2024-06-21 05:56:15,503 ERROR org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 'flink-pekko.remote.default-remote-dispatcher-14' produced an uncaught exception. Stopping the process...
      java.lang.OutOfMemoryError: Java heap space
      2024-06-21 05:56:15,776 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - The heartbeat of JobManager with id f357f3610bf2f36204f11d9e81d50b09 timed out.
      2024-06-21 05:56:15,777 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job f7d731948175e4efbc9ee57824148e6b.
      2024-06-21 05:56:15,777 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to fail task externally Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (3/4)#4 (77a0a5b99d79f9f19621258a8015daee_cbc357ccb763df2852fee8c4fc7d55f2_2_4).
      2024-06-21 05:56:15,777 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (3/4)#4 (77a0a5b99d79f9f19621258a8015daee_cbc357ccb763df2852fee8c4fc7d55f2_2_4) switched from RUNNING to FAILED with failure cause:
      org.apache.flink.util.FlinkException: Disconnect from JobManager responsible for f7d731948175e4efbc9ee57824148e6b.
      at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1764) ~[flink-dist-1.18.1.jar:1.18.1]
      at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectAndTryReconnectToJobManager(TaskExecutor.java:1305) ~[flink-dist-1.18.1.jar:1.18.1]
      at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$4300(TaskExecutor.java:188) ~[flink-dist-1.18.1.jar:1.18.1]
      at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$handleJobManagerConnectionLoss$0(TaskExecutor.java:2549) ~[flink-dist-1.18.1.jar:1.18.1]
      at java.util.Optional.ifPresent(Optional.java:183) ~[?:?]
      at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.handleJobManagerConnectionLoss(TaskExecutor.java:2547) ~[flink-dist-1.18.1.jar:1.18.1]
      at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.notifyHeartbeatTimeout(TaskExecutor.java:2530) ~[flink-dist-1.18.1.jar:1.18.1]
      at org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java:158) ~[flink-dist-1.18.1.jar:1.18.1]
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
      at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
      at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) ~[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.18.1.jar:1.18.1]
      at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) ~[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) ~[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
      at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
      at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) [?:?]
      at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
      at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?]
      at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) [?:?]
      Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id f357f3610bf2f36204f11d9e81d50b09 timed out.
      ... 30 more

      Attachments

        Activity

          People

            Unassigned Unassigned
            lockie Lv Luo Gang
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: