Uploaded image for project: 'Griffin'
  1. Griffin
  2. GRIFFIN-359

DirectKafkaInputDStream has not been initialized when recovery from checkpoint when streaming connector use spark-sql rule

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 0.5.0
    • None
    • Measure Module
    • None

    Description

      I try to use spark-sql rule to check data in streaming, but after restarting the job, spark starts to report an exception:

      //代码占位符
      21/03/30 02:01:16 ERROR Application$: process run error: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been initialized org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:90) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply$mcZ$sp(StreamingDQApp.scala:116) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76) at scala.util.Try$.apply(Try.scala:192) at org.apache.griffin.measure.launch.streaming.StreamingDQApp.run(StreamingDQApp.scala:76) at org.apache.griffin.measure.Application$.main(Application.scala:92) at org.apache.griffin.measure.Application.main(Application.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)

       

      The reason why spark issue mentioned this problem is that streaming does not support sqlconf recovery   issue:  https://issues.apache.org/jira/browse/SPARK-6770

      Official solutions :http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

      My DQConfig:

      //代码占位符
      { "name": "comments_lt_0", "process.type": "STREAMING", "data.sources": [ { "name": "source", "connector": { "type": "KAFKA", "version": "1.0", "config": { "kafka.config":{ "bootstrap.servers": "10.18.255.117:9092", "group.id": "group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" }, "topics": "wdm_apiData", "key.type": "java.lang.String", "value.type": "java.lang.String" }, "pre.proc": [ { "dsl.type": "df-ops", "in.dataframe.name": "this", "out.dataframe.name": "s1", "rule": "from_json" }, { "dsl.type": "spark-sql", "out.dataframe.name": "this", "rule": "select pubcode, doc_url, from s1" } ] }, "cache": { "file.path": "hdfs:///griffin/streaming/dump/source", "info.path": "source", "ready.time.interval": "10s", "ready.time.delay": "0", "time.range": [ "0", "0" ] } } ], "evaluate.rule": { "rules": [ { "dsl.type": "spark-sql", "rule": "select count(1) as comments_cnt from source where comments < 0", "out": [{ "type": "metric", "name": "prof" }] }, { "dsl.type": "spark-sql", "rule": "select pubcode, duc_url from source where dcomments < 0", "out": [{ "type": "record", "name": "comments_lt_0", "flatten": "array" }] } ] }, "sinks": [ "consoleSink", "elasticSink", "hdfsSink" ] }

      Attachments

        Activity

          People

            Unassigned Unassigned
            yxj1141 yang xuejun
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: