Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23337

withWatermark raises an exception on struct objects

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.2.1
    • None
    • Structured Streaming
    • Linux Ubuntu, Spark on standalone mode

    Description

      Hi,

       

      when using a nested object (I mean an object within a struct, here concrete: _source.createTime) from a json file as the parameter for the withWatermark-method, I get an exception (see below).

      Anything else works flawlessly with the nested object.

       

      works: 

      Dataset<Row> jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", "10 seconds").toDF();

       

      json structure:

      root
       |-- _id: string (nullable = true)
       |-- _index: string (nullable = true)
       |-- _score: long (nullable = true)
       |-- myTime: timestamp (nullable = true)
      ..

      does not work - nested json:

      Dataset<Row> jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", "10 seconds").toDF();

       

      json structure:

       

      root
       |-- _id: string (nullable = true)
       |-- _index: string (nullable = true)
       |-- _score: long (nullable = true)
       |-- _source: struct (nullable = true)
       | |-- createTime: timestamp (nullable = true)
      ..
       
      Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
      'EventTimeWatermark '_source.createTime, interval 10 seconds
      +- Deduplicate [_id#0], true
       +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
      at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
       at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
       at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
       at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
       at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
       at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
       at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
       at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796)
       at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674)
       at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
       at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
       at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
       at scala.collection.immutable.List.foldLeft(List.scala:84)
       at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
       at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
       at scala.collection.immutable.List.foreach(List.scala:381)
       at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
       at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
       at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
       at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:165)
       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171)
       at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62)
       at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889)
       at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:569)
       at my.package.hpstatistics.Importer.importViaSparkStreaming(Importer.java:278)
       at my.package.hpstatistics.Main.main(Main.java:80)
      Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
      Failed to copy node.
      Is otherCopyArgs specified correctly for EventTimeWatermark.
      Exception message: argument type mismatch
      ctor: public org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark(org.apache.spark.sql.catalyst.expressions.Attribute,org.apache.spark.unsafe.types.CalendarInterval,org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)?
      types: class org.apache.spark.sql.catalyst.expressions.Alias, class org.apache.spark.unsafe.types.CalendarInterval, class org.apache.spark.sql.catalyst.plans.logical.Deduplicate
      args: _source#3.createTime AS createTime#12, interval 10 seconds, Deduplicate [_id#0], true
      +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
      , tree:
      'EventTimeWatermark '_source.createTime, interval 10 seconds
      +- Deduplicate [_id#0], true
       +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:415)
       at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:385)
       at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
       ... 29 more
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              aydinchavez Aydin Kocas
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: