Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21202

Assignment operator ==== working, But equalTo is throw with error

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Invalid
    • 1.6.1
    • None
    • Spark Core, SQL
    • Spark 1.6.1 and scala 2.10.5 in windows

    Description

      I raised question in
      https://stackoverflow.com/questions/44642403/minus-logic-implementation-not-working-with-spark-scala

      ==== is working fine for Join two table with multiple key, But equalTo is not working when i use to join second key.

      You can find both working and not working query with error message below

      Working Query:
      val ft = spark.sql("select * from " + targetTable)
      val stg = spark.sql("select * from " + stgTable)
      val result = ft.join ( stg, columnList.length
      match {
      case 2 => ft(columnList(0)) equalTo stg(columnList(0)) and
      ft(columnList(1))*=== *stg(columnList(1))
      case 1 => ft(columnList(0)) equalTo stg(columnList(0))
      }, "left_outer").where(stg(columnList(0)).isNull)


      Not Working Query :
      val result = ft.join ( stg, columnList.length
      match {
      case 2 => ft(columnList(0)) equalTo stg(columnList(0)) and
      ft(columnList(1)) equalTo stg(columnList(1))
      case 1 => ft(columnList(0)) *equalTo *
      stg(columnList(0))
      }, "left_outer").where(stg(columnList(0)).isNull)

      Error Message:

      column used for filtering the not modified records in target table is : order_id
      Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '((order_id = order_id) && order_product)' due to data type mismatch: differing types in '((order_id = order_id) && order_product)' (boolean and string).;
      at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
      at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
      at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
      at scala.collection.AbstractIterator.to(Iterator.scala:1157)
      at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
      at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
      at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
      at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:108)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:119)
      at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
      at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
      at scala.collection.AbstractIterator.to(Iterator.scala:1157)
      at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
      at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
      at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
      at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:57)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
      at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
      at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
      at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
      at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
      at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126)
      at org.apache.spark.sql.DataFrame.join(DataFrame.scala:541)
      at com.esi.spark.incremental.Load1.HiveIncrementalAppend1$.mergeTwoTable(HiveIncrementalAppend1.scala:108)
      at com.esi.spark.incremental.Load1.HiveIncrementalAppend1$.main(HiveIncrementalAppend1.scala:46)
      at com.esi.spark.incremental.Load1.HiveIncrementalAppend1.main(HiveIncrementalAppend1.scala)
      17/06/24 08:20:56 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

      Attachments

        Activity

          People

            Unassigned Unassigned
            nssaravanan303 saravanan
            Apache Spark Apache Spark
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 168h
                168h
                Remaining:
                Remaining Estimate - 168h
                168h
                Logged:
                Time Spent - Not Specified
                Not Specified