Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
Hello everyone, I am trying to extract Graphite metrics from Hudi using a Spark Streaming process, but the method that sends metrics is throwing java.lang.IllegalArgumentException after the first microbatch, like this:
20/05/06 11:49:25 ERROR Metrics: Failed to send metrics: java.lang.IllegalArgumentException: A metric named kafka_hudi.finalize.duration already exists at org.apache.hudi.com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:97) at org.apache.hudi.metrics.Metrics.registerGauge(Metrics.java:83) at org.apache.hudi.metrics.HoodieMetrics.updateFinalizeWriteMetrics(HoodieMetrics.java:177) at org.apache.hudi.HoodieWriteClient.lambda$finalizeWrite$14(HoodieWriteClient.java:1233) at org.apache.hudi.common.util.Option.ifPresent(Option.java:96) at org.apache.hudi.HoodieWriteClient.finalizeWrite(HoodieWriteClient.java:1231) at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:497) at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:479) at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:470) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:152) at org.apache.hudi.HoodieStreamingSink$$anonfun$1$$anonfun$2.apply(HoodieStreamingSink.scala:51) at org.apache.hudi.HoodieStreamingSink$$anonfun$1$$anonfun$2.apply(HoodieStreamingSink.scala:51) at scala.util.Try$.apply(Try.scala:192) at org.apache.hudi.HoodieStreamingSink$$anonfun$1.apply(HoodieStreamingSink.scala:50) at org.apache.hudi.HoodieStreamingSink$$anonfun$1.apply(HoodieStreamingSink.scala:50) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:114) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:49) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Here is my config object for Hudi:
val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "key", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "dt", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", DataSourceWriteOptions.OPERATION_OPT_KEY -> "insert", DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY -> "true", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP -> "1", HoodieMetricsConfig.METRICS_ON -> "true", HoodieMetricsConfig.METRICS_REPORTER_TYPE -> "GRAPHITE", HoodieMetricsConfig.GRAPHITE_SERVER_HOST -> "localhost", HoodieMetricsConfig.GRAPHITE_SERVER_PORT -> "4756", HoodieMetricsConfig.GRAPHITE_METRIC_PREFIX -> "hudi" )
Environment Description
- Hudi version: 0.5.0
- Spark version : 2.4.4
- Hive version : 2.3.6
- Hadoop version : Amazon 2.8.5 (emr-5.29.0)
- Storage (HDFS/S3/GCS..) : S3
- Running on Docker? (yes/no) : No