Uploaded image for project: 'Phoenix'
  1. Phoenix
  2. PHOENIX-6559

spark connector access to SmallintArray / UnsignedSmallintArray columns

    XMLWordPrintableJSON

Details

    • Hide
      Solves PHOENIX-6559 including 2 test cases: can convert arrays of Short type in Phoenix schema, can save arrays of Short type back to phoenix
      Show
      Solves PHOENIX-6559 including 2 test cases: can convert arrays of Short type in Phoenix schema, can save arrays of Short type back to phoenix
    • Patch

    Description

      We have some tables defined with SMALLINT array[] columns, that are not accessible correctly with the spark connector.

      Seems that the Spark data type is incorrectly inferred by the connector as an array of integers ArrayType(IntegerType), instead of ArrayType(ShortType).

       A table example:

      CREATE TABLE IF NOT EXISTS AEIDEV.ARRAY_TABLE (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT ARRAY[] );
       UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (1, ARRAY[-32678,-9876,-234,-1]);
       UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (2, ARRAY[0,8,9,10]);
       UPSERT INTO AEIDEV.ARRAY_TABLE VALUES (3, ARRAY[123,1234,12345,32767]);

       Accessing the values from Spark gives wrong values:

       

      scala> val df = spark.sqlContext.read.format("org.apache.phoenix.spark").option("table","AEIDEV.ARRAY_TABLE").option("zkUrl","ithdp1101.cern.ch:2181").load
       df: org.apache.spark.sql.DataFrame = [ID: bigint, COL1: array<int>]
      scala> df.show
       ---------------------+
      
      ID COL1
      ---------------------+
      
      1 [-647200678, -234...   2 [524288, 655369, ...   3 [80871547, 214743...
      ---------------------+
      scala> df.collect
       res3: Array[org.apache.spark.sql.Row] = Array([1,WrappedArray(-647200678, -234, 0, 0)], [2,WrappedArray(524288, 655369, 0, 0)], [3,WrappedArray(80871547, 2147430457, 0, 0)])
      

      We have identified the problem in the SparkSchemaUtil class, and applied the tiny patch included in the report. After this, the data type is correctly inferred and results are correct:

       

      scala> val df = spark.sqlContext.read.format("org.apache.phoenix.spark").option("table","AEIDEV.ARRAY_TABLE").option("zkUrl","ithdp1101.cern.ch:2181").load
       df: org.apache.spark.sql.DataFrame = [ID: bigint, COL1: array<smallint>]
      scala> df.show
       ---------------------+
      
      ID COL1
      ---------------------+
      
      1 [-32678, -9876, -...   2 [0, 8, 9, 10]   3 [123, 1234, 12345...
      ---------------------+
      scala> df.collect
       res1: Array[org.apache.spark.sql.Row] = Array([1,WrappedArray(-32678, -9876, -234, -1)], [2,WrappedArray(0, 8, 9, 10)], [3,WrappedArray(123, 1234, 12345, 32767)])
      

       

       

      We can provide more information and submit a merge request if needed.

       

       

       

      Attachments

        1. PHOENIX-6559.master.v1.patch
          8 kB
          Alvaro Fernandez

        Issue Links

          Activity

            People

              alferca Alvaro Fernandez
              alferca Alvaro Fernandez
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: