Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
0.14.0, 0.14.1
-
None
-
Java8, Spark 3.x, Hadoop 3.3.x
Description
What is the problem?
Writing to Hudi tables with RECORD_INDEX enabled by a Spark job on Hadoop secured clusters fails once the job reaches 24h of lifetime (HDFS delegation token is renewed and the old one is expired) with an error:
24/01/17 09:55:19 WARN TaskSetManager: Lost task 0.0 in stage 104.0 (TID 244) (host1.apache.org executor 1): org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Error occurs when executing map at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270) at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296) at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170) at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747) at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721) at java.util.stream.AbstractTask.compute(AbstractTask.java:327) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: org.apache.hudi.exception.HoodieIOException: Failed to scan metadata at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:165) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:155) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:175) at org.apache.hudi.common.table.HoodieTableMetaClient.getActiveTimeline(HoodieTableMetaClient.java:355) at org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1264) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473) at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414) at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291) at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275) at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38) ... 12 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Token for real user: , can't be found in cache at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1612) at org.apache.hadoop.ipc.Client.call(Client.java:1558) at org.apache.hadoop.ipc.Client.call(Client.java:1455) at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242) at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129) at com.sun.proxy.$Proxy40.getListing(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:688) at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) at com.sun.proxy.$Proxy41.getListing(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1702) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1686) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1100) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:147) at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1175) at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1172) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1182) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:597) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:596) at org.apache.hudi.common.table.HoodieTableMetaClient.scanFiles(HoodieTableMetaClient.java:552) at org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:645) at org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:628) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:163) ... 22 more
Affected version:
Hudi 0.14.0 and 0.14.1
Spark 3.4.1 - probably all others too
Expected behavior:
Since the delegation token is internally renewed by spark, the job should not fail and continue to write Hudi table without an error, the same way as is done before a token expiration
Steps to reproduce:
On a Kerberos-secured Hadoop cluster run the following snippet:
# start spark-shell in secure mode with keytab and short spark.security.credentials.renewalRatio period so we don't have to wait hours for dt renewal. spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \ --conf spark.security.credentials.renewalRatio=0.001 \ --num-executors 1 \ --executor-memory 2g \ --keytab user.keytab \ --principal user@APACHE.ORG # Comments: # dt renewed every 86sec # keytab is needed to do dt renewal # run the snipped within spark-shell import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.DataFrame // keep the login token because we will cancel it later val token = org.apache.hadoop.security.UserGroupInformation.getLoginUser.getCredentials.getToken(new org.apache.hadoop.io.Text(org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration).getCanonicalServiceName)) // helper function to create a dataframe def getDataFrame(data: Seq[Row]): DataFrame = { // Define the schema for the DataFrame val schema = StructType(Seq( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true) )) // Create a DataFrame from the schema and data val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) df } // helper function to write df to hudi def write(dataFrame: DataFrame): Unit = { dataFrame. write. format("org.apache.hudi"). option("hoodie.table.name", "test"). option("hoodie.datasource.write.precombine.field", "age"). option("hoodie.datasource.write.recordkey.field", "id"). option("hoodie.datasource.write.partitionpath.field", ""). option("hoodie.datasource.hive_sync.support_timestamp", "true"). option("hoodie.datasource.hive_sync.enable", "false"). option("hoodie.datasource.write.reconcile.schema", "true"). option("hoodie.write.markers.type", "DIRECT"). option("hoodie.schema.on.read.enable","true"). option("hoodie.datasource.write.reconcile.schema","true"). option("hoodie.metadata.enable","true"). option("hoodie.index.type","RECORD_INDEX"). option("hoodie.metadata.record.index.enable","true"). option("hoodie.datasource.write.operation", "UPSERT"). option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator"). option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor"). mode(org.apache.spark.sql.SaveMode.Append). save("/tmp/hudi_test") } val data1 = Seq( Row(1, "Alice", 25), Row(2, "Bob", 30), Row(3, "Charlie", 22) ) val df1 = getDataFrame(data1); val data2 = Seq( Row(1, "Alice", 26), Row(2, "Bob", 30), Row(3, "Charlie", 23) ) val df2 = getDataFrame(data2); // write for the first time this will be successfull because the token is still valid write(df1) // wait for renewal so we can 'safely' cancel the prevoius token (wait time = day ms * spark.security.credentials.renewalRatio ) java.lang.Thread.sleep((86400000 * spark.conf.get("spark.security.credentials.renewalRatio").toFloat).toLong) // cancel the token token.cancel(spark.sparkContext.hadoopConfiguration) //this should fail some tasks on executors and the whole job with Token for real user: , can't be found in cache write(df1) // if not run the write again write(df2) // and again - not all task may fail as this depends which thread is used for handling meta access write(df1)
Root cause analysis:
Record Index lookup is done via HoodieBackedTableMetadata.getRecordsByKeys which uses HoodieLocalEngineContext object on Spark executors.
HoodieLocalEngineContext is leveraging java.util.stream for input data processing in a parallel fashion (by calling parallel() in the pipeline).
Parallelism in java.util.stream is implemented using ForkJoinPool executor pool.
This pool once initialized for the first time reuses the same threads for any future call.
This is the key to understanding why even if HDFS delegation token is renewed by spark, some actions executed in HoodieLocalEngineContext are still using the old/first token that spark executor was started with.
ForkJoinPool seems to be initialized at the beginning of the spark executor lifetime, outside of the Java Access Control Context privilege action (not within UserGroupInformation.doAs call). Therefore it only has login user credentials/tokens read from token files available to each YARN executor during startup - these are valid for the next 24h, so all will be fine for the next 24h.
Normal Spark task thread pool is expected within Java Access Control Context privilege action(as a part of doAs): https://github.com/apache/spark/blob/4a69ce68172dc4bae230e933f30081414ddb50cd/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L413C1-L485C4). So in single-threaded execution of a spark task, the thread has the access to tokens available within Java Access Control Context (returned by UserGroupInformation.getCurrentUser()).
Finally, the current state of the Spark art makes Spark renew the tokens only within Java Security Context: https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L138, so those outside of the Java Access Control Context are unchanged - this applies to ForkJoinPool
To conclude, because of the way how Spark renews the delegation token on executors, using parallel pipelines of java.util.stream in a code delegated to executors is unsafe and will cause a failure once the current token has expired. So in general using of HoodieLocalEngineContext is not delegation token safe.
Worth noting here that the delegation token business applies only to the YARN-secured cluster
Potential solutions/workarounds:
The following attempts have been tried and they are confirmed to be solving the problem:
1) Remove all calls to parallel() on java.util.stream in functions of HoodieLocalEngineContext. For index lookup fixing only removal in map function is needed
2) Populate new credentials/token to ForkJoinPool threads by updating login user credentials:
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java index 5239490816d..ca78e6d7529 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -18,6 +18,10 @@ package org.apache.hudi.common.engine; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; @@ -34,7 +38,11 @@ import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -56,8 +64,17 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrap */ public final class HoodieLocalEngineContext extends HoodieEngineContext { + private static final Logger LOG = LoggerFactory.getLogger(HoodieLocalEngineContext.class); + public HoodieLocalEngineContext(Configuration conf) { this(conf, new LocalTaskContextSupplier()); + try { + LOG.debug("Creating HoodieLocalEngineContext ugi tokens: " + + UserGroupInformation.getCurrentUser().getCredentials().getAllTokens() + " config=" + + conf.toString()); + } catch (IOException e) { + throw new RuntimeException(e); + } } public HoodieLocalEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) { @@ -81,12 +98,15 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext { @Override public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) { + checkAndPropagateTokensRenewal(); return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); + //return data.stream().map(throwingMapWrapper(func)).collect(toList()); } @Override public <I, K, V> List<V> mapToPairAndReduceByKey( List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { + checkAndPropagateTokensRenewal(); return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc)) .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get()) @@ -97,6 +117,7 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext { public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey( Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { + checkAndPropagateTokensRenewal(); return throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator()) .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( @@ -107,6 +128,7 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext { @Override public <I, K, V> List<V> reduceByKey( List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { + checkAndPropagateTokensRenewal(); return data.stream().parallel() .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)) @@ -116,6 +138,7 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext { @Override public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) { + checkAndPropagateTokensRenewal(); return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList()); } @@ -170,4 +193,42 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext { public void cancelAllJobs() { // no operation for now } + + private void checkAndPropagateTokensRenewal() { + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + + try { + UserGroupInformation login = UserGroupInformation.getLoginUser(); + UserGroupInformation current = UserGroupInformation.getCurrentUser(); + LOG.debug("Login tokens:" + login.getCredentials().getAllTokens()); + LOG.debug("Current tokens:" + current.getCredentials().getAllTokens()); + + if (!compareCredentials(current.getCredentials(), login.getCredentials())) { + LOG.debug("Login and current are different, updating the tokens"); + login.addCredentials(current.getCredentials()); + LOG.debug(("Updated login tokens: " + login.getCredentials().getAllTokens())); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static boolean compareCredentials(Credentials credentials1, Credentials credentials2) { + // Get all the tokens from the first Credentials instance + Collection<Token<? extends TokenIdentifier>> tokens1 = credentials1.getAllTokens(); + + // Get all the tokens from the second Credentials instance + Collection<Token<? extends TokenIdentifier>> tokens2 = credentials2.getAllTokens(); + + // Check if the number of tokens is the same + if (tokens1.size() != tokens2.size()) { + return false; + } + + + // All tokens are the same + return tokens2.containsAll(tokens1); + } }
Attachments
Issue Links
- is caused by
-
HUDI-1479 Replace FSUtils.getAllPartitionPaths() with HoodieTableMetadata#getAllPartitionPaths()
- Resolved