Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
1、Problem description:
select * from ( SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, FLOOR(proc_time TO day) order by proc_time asc ) as row_num from tableA where cmd = 1 and user_id > 0 ) where row_num <=10
Currently, the deduplication operator uses the Flink State TTL mechanism. The default behavior of this mechanism is that expired states are only cleaned up when they are accessed again. In our case, the key in the Flink state includes the LOOR (proc_time TO day) timestamp. For example, if today is December 28th, the new keys in the Flink state will include December 28th. When it becomes December 29th, the keys for new records will include December 29th, and the keys from December 28th will never be accessed again. Since they are not accessed, they will not be cleaned up by the Flink State TTL mechanism. As a result, the state in Flink will increase indefinitely.
2021-02-25 06:49:25,593 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: hadoop02.tcd.com/9.44.33.8:608992021-02-25 06:49:25,593 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@hadoop02.tcd.com:60899] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@hadoop02.tcd.com:60899]] Caused by: [java.net.ConnectException: Connection refused: hadoop02.tcd.com/9.44.33.8:60899]2021-02-25 06:49:32,762 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker container_e26_1614150721877_0021_01_000004 is terminated. Diagnostics: [2021-02-25 06:49:31.879]Container [pid=24324,containerID=container_e26_1614150721877_0021_01_000004] is running 103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing container.Dump of the process-tree for container_e26_1614150721877_0021_01_000004 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 24551 24324 24324 24324 (java) 1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address=hadoop02.tcd.com -Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths= -Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_000004 -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b -Dexecution.target=embedded -Dweb.tmpdir=/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8 -Dinternal.taskmanager.resource-id.metadata=hadoop03.tcd.com:8041 -Djobmanager.rpc.port=54474 -Dpipeline.jars=file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar -Drest.address=hadoop02.tcd.com -Djobmanager.memory.jvm-metaspace.size=268435456b -Djobmanager.memory.heap.size=1073741824b -Djobmanager.memory.jvm-overhead.max=201326592b |- 24324 24315 24324 24324 (bash) 1 0 11046912 372 /bin/bash -c /usr/java/jdk1.8.0_131/bin/java -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1530082070b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address='hadoop02.tcd.com' -Djobmanager.memory.jvm-overhead.min='201326592b' -Dpipeline.classpaths='' -Dtaskmanager.resource-id='container_e26_1614150721877_0021_01_000004' -Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b' -Dexecution.target='embedded' -Dweb.tmpdir='/tmp/flink-web-30926c2a-4700-49a4-89d6-6b8485785ae8' -Dinternal.taskmanager.resource-id.metadata='hadoop03.tcd.com:8041' -Djobmanager.rpc.port='54474' -Dpipeline.jars='file:/data1/yarn/nm/usercache/user_00/appcache/application_1614150721877_0021/container_e26_1614150721877_0021_01_000001/unipro-interval-index-stat-v2-1.0-SNAPSHOT.jar' -Drest.address='hadoop02.tcd.com' -Djobmanager.memory.jvm-metaspace.size='268435456b' -Djobmanager.memory.heap.size='1073741824b' -Djobmanager.memory.jvm-overhead.max='201326592b' 1> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.out 2> /data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_000004/taskmanager.err [2021-02-25 06:49:31.896]Container killed on request. Exit code is 143[2021-02-25 06:49:31.908]Container exited with a non-zero exit code 143.
2、Solution:
2.1 The Flink State TTL mechanism has added the cleanupFullSnapshot and cleanupInRocksdbCompactFilter methods to clean up old states, even if they have not been accessed.
- cleanupFullSnapshot: Removes expired states during a full snapshot, thereby cleaning up old states.
- cleanupInRocksdbCompactFilter: Allows specifying the queryTimeAfterNumEntries parameter. This parameter determines after how many state entries the current timestamp should be updated. When RocksDB performs compaction operations in the background, it uses the current timestamp to determine whether a state is expired and filters out those expired keys and values. If the queryTimeAfterNumEntries value is set low, it will speed up the state cleanup process. However, since Flink calls RocksDB code via JNI, frequent calls can incur significant overhead.
2.2 add RocksDB state cleanup configuration in Rank operators
Attachments
Attachments
Issue Links
- links to