Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
The problem
Flink data skipping doesn't work and warning occurs in log
7799 [main] WARN org.apache.hudi.source.FileIndex [] - Read column stats for data skipping error org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Error occurs when executing map ... Caused by: java.lang.ClassNotFoundException: Class org.apache.hudi.common.fs.inline.InLineFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2329) ~[hadoop-common-2.10.1.jar:?] ... at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_345]
The solution
This problem has already been fixed by https://issues.apache.org/jira/browse/HUDI-3763
But the patch doesn't fix flink's job
We should use InLineFileSystem.class.getClassLoader() instead of Thread.currentThread().getContextClassLoader() because method lookupRecords(keys, fullKey) is called from commonForkJoinPool-worker thread which may contain the wrong contextClassLoader
When the problem occurs
It is an intermittent bug that is hard to reproduce. Below is the sequence of events leading to the error. I can attach flink job if necessary.
- Flink runs job with miniCluster
- TemporaryClassLoaderContext consisting SubmoduleClassLoader is passed to akka RpcSystem
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils:345// akka internally caches the context class loader // make sure it uses the plugin class loader try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
- All akka threads have SubmoduleClassLoader as a contextClassLoader in thread local storage
- An akka thread creates completableFuture
org.apache.flink.runtime.rpc.akka.SupervisorActor:206return Patterns.ask( supervisor, createStartAkkaRpcActorMessage(propsFactory, endpointId), RpcUtils.INF_DURATION) .toCompletableFuture() .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast) .join();
- Creation of worker thread in commonForkJoinPool is triggered
java.util.concurrent.ForkJoinPool:1485try { if (fac != null && (wt = fac.newThread(this)) != null) { wt.start();
- commonForkJoinPool worker thread inherits SubmoduleClassLoader as a contextClassLoader due to it is created from akka thread
- The commonForkJoin worker creates another one. Now two workers contain the wrong classLoader
- It is possible when commonForkJoinPool consists
2 workers with SubmoduleClassLoader
8 workers with AppClassLoader - Hudi uses java parallel stream which performs its tasks using commonForkJoinPool as well
org.apache.hudi.metadata.HoodieBackedTableMetadata:160return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : engineContext.parallelize(partitionFileSlices)) .flatMap((SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
- lookupRecords is called from commonForkJoinPool worker
20% chance that Thread.currentThread().getContextClassLoader() returns the wrong classLoader
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock:197inlineConf.setClassLoader(Thread.currentThread().getContextClassLoader());
- Class.forName in hadoop Configuration throws ClassNotFoundException
org.apache.hadoop.conf.Configuration:2362clazz = Class.forName(name, true, classLoader);
- FileIndex prints warning. Data skipping does not work
org.apache.hudi.source.FileIndex:272} catch (Throwable throwable) { LOG.warn("Read column stats for data skipping error", throwable); return null;
Attachments
Issue Links
- links to