Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-5253

HoodieMergeOnReadTableInputFormat could have duplicate records issue if it contains delta files while still splittable

    XMLWordPrintableJSON

Details

    Description

      We can also find that sometimes could throw IllegalStateException duplicates key error when we run the CI.

       java.lang.IllegalStateException: Duplicate key {"_hoodie_commit_time": 20221122170106908, "_hoodie_commit_seqno": 20221122170106908_0_48, "_hoodie_record_key": 53c028d3-e4b0-4d6f-a041-f09e418c36b3, "_hoodie_partition_path": 2016/03/15, "_hoodie_file_name": 6a9fff5b-3b75-48ad-85fe-c621c9a2c25d-0, "timestamp": 0, "_row_key": 53c028d3-e4b0-4d6f-a041-f09e418c36b3, "partition_path": 2016/03/15, "rider": rider-20221122170106908, "driver": driver-20221122170106908, "begin_lat": 0.5407076277518825, "begin_lon": 0.39726822192851885, "end_lat": 0.49363027135660975, "end_lon": 0.6482366665027408, "distance_in_meters": -1534272590, "seconds_since_epoch": 6103867871123100710, "weight": 0.38126373, "nation": 7b 62 79 74 65 73 3d 43 61 6e 61 64 61 7d, "current_date": 1970-01-17, "current_ts": 1460315658, "height": 0.093258, "city_to_state": org.apache.hadoop.io.ArrayWritable@1b28684, "fare": org.apache.hadoop.io.ArrayWritable@694b818f, "tip_history": org.apache.hadoop.io.ArrayWritable@63cb1a9a, "_hoodie_is_deleted": false}
      
      	at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
      	at java.util.HashMap.merge(HashMap.java:1254)
      	at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
      	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
      	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
      	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
      	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
      	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
      	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
      	at org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:95)
      	at org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:80)
      	at org.apache.hudi.client.functional.TestHoodieClientOnMergeOnReadStorage.testLogCompactionOnMORTable(TestHoodieClientOnMergeOnReadStorage.java:187)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
      	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
      	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
      	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
      	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
      	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
      	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
      	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
      	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
      	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
      	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
      	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
      	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
      	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
      	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
      	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
      	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
      	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
      	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
      	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
      	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
      	at java.util.ArrayList.forEach(ArrayList.java:1259)
      	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
      	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
      	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
      	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
      	at java.util.ArrayList.forEach(ArrayList.java:1259)
      	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
      	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
      	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
      	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
      	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
      	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
      	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
      	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
      	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
      	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
      	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
      	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
      	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
      

      We can easily to reproduce this in org.apache.hudi.testutils.HoodieMergeOnReadTestUtils#getRecordsUsingInputFormat to allow it create more splits

      FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
      // Add 3 to the inputPaths to create more splits
      InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size() + 3);
      
      for (InputSplit split : splits) {
      // ...
      }
      

      We actually cannot allow a path to be splitable if it contains delta files.

      Attachments

        Issue Links

          Activity

            People

              Bone An Hui An
              Bone An Hui An
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: