Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-5151

Flink data skipping doesn't work with ClassNotFoundException of InLineFileSystem

    XMLWordPrintableJSON

Details

    Description

      The problem

      Flink data skipping doesn't work and warning occurs in log

      7799 [main] WARN  org.apache.hudi.source.FileIndex [] - Read column stats for data skipping error
      org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Error occurs when executing map
      	...
      Caused by: java.lang.ClassNotFoundException: Class org.apache.hudi.common.fs.inline.InLineFileSystem not found
      	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2329) ~[hadoop-common-2.10.1.jar:?]
      	...
      	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_345]
      

      The solution

      This problem has already been fixed by https://issues.apache.org/jira/browse/HUDI-3763
      But the patch doesn't fix flink's job

      We should use InLineFileSystem.class.getClassLoader() instead of Thread.currentThread().getContextClassLoader() because method lookupRecords(keys, fullKey) is called from commonForkJoinPool-worker thread which may contain the wrong contextClassLoader

      When the problem occurs

      It is an intermittent bug that is hard to reproduce. Below is the sequence of events leading to the error. I can attach flink job if necessary.

      1. Flink runs job with miniCluster
      2. TemporaryClassLoaderContext consisting SubmoduleClassLoader is passed to akka RpcSystem
        org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils:345
        // akka internally caches the context class loader
        // make sure it uses the plugin class loader
        try (TemporaryClassLoaderContext ignored =
                TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
        
      3. All akka threads have SubmoduleClassLoader as a contextClassLoader in thread local storage
      4. An akka thread creates completableFuture
        org.apache.flink.runtime.rpc.akka.SupervisorActor:206
        return Patterns.ask(
                        supervisor,
                        createStartAkkaRpcActorMessage(propsFactory, endpointId),
                        RpcUtils.INF_DURATION)
                .toCompletableFuture()
                .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
                .join();
        
      5. Creation of worker thread in commonForkJoinPool is triggered
        java.util.concurrent.ForkJoinPool:1485
        try {
            if (fac != null && (wt = fac.newThread(this)) != null) {
                wt.start();
        
      6. commonForkJoinPool worker thread inherits SubmoduleClassLoader as a contextClassLoader due to it is created from akka thread
      7. The commonForkJoin worker creates another one. Now two workers contain the wrong classLoader
      8. It is possible when commonForkJoinPool consists
        2 workers with SubmoduleClassLoader
        8 workers with AppClassLoader
      9. Hudi uses java parallel stream which performs its tasks using commonForkJoinPool as well
        org.apache.hudi.metadata.HoodieBackedTableMetadata:160
        return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : engineContext.parallelize(partitionFileSlices))
            .flatMap((SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
        
      10. lookupRecords is called from commonForkJoinPool worker
        20% chance that Thread.currentThread().getContextClassLoader() returns the wrong classLoader
        org.apache.hudi.common.table.log.block.HoodieHFileDataBlock:197
        inlineConf.setClassLoader(Thread.currentThread().getContextClassLoader());
        
      11. Class.forName in hadoop Configuration throws ClassNotFoundException
        org.apache.hadoop.conf.Configuration:2362
        clazz = Class.forName(name, true, classLoader);
        
      12. FileIndex prints warning. Data skipping does not work
        org.apache.hudi.source.FileIndex:272
        } catch (Throwable throwable) {
          LOG.warn("Read column stats for data skipping error", throwable);
          return null;
        

      Attachments

        Issue Links

          Activity

            People

              trushev Alexander Trushev
              trushev Alexander Trushev
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: