Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-1814

(Carbon1.3.0 - Streaming) Nullpointereception in spark shell when the streaming started with table streaming altered from default(false) to true

VotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.3.0
    • 1.3.0
    • other
    • 3 node ant cluster

    Description

      Steps :
      Spark submit thrift server is started.
      User starts spark shell using the command - bin/spark-shell --master yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-executors 3 --jars /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar

      In spark shell User tries to start streaming with table streaming property altered from default(false) to true.
      scala> import java.io.

      {File, PrintWriter}
      import java.io.{File, PrintWriter}

      scala> import java.net.ServerSocket
      import java.net.ServerSocket

      scala>

      scala> import org.apache.spark.sql.

      {CarbonEnv, SparkSession}
      import org.apache.spark.sql.{CarbonEnv, SparkSession}

      scala> import org.apache.spark.sql.hive.CarbonRelation
      import org.apache.spark.sql.hive.CarbonRelation

      scala> import org.apache.spark.sql.streaming.

      {ProcessingTime, StreamingQuery}
      import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}

      scala>

      scala> import org.apache.carbondata.core.constants.CarbonCommonConstants
      import org.apache.carbondata.core.constants.CarbonCommonConstants

      scala> import org.apache.carbondata.core.util.CarbonProperties
      import org.apache.carbondata.core.util.CarbonProperties

      scala> import org.apache.carbondata.core.util.path.

      {CarbonStorePath, CarbonTablePath}
      import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}

      scala>

      scala> CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
      res0: org.apache.carbondata.core.util.CarbonProperties = org.apache.carbondata.core.util.CarbonProperties@69ee0861

      scala>

      scala> import org.apache.spark.sql.CarbonSession._
      import org.apache.spark.sql.CarbonSession._

      scala>

      scala> val carbonSession = SparkSession.

      builder().
      appName("StreamExample").
      getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store")
      carbonSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.CarbonSession@6ce365b7

      scala>

      carbonSession.sparkContext.setLogLevel("INFO")

      scala>

      scala> def sql(sql: String) = carbonSession.sql(sql)
      sql: (sql: String)org.apache.spark.sql.DataFrame

      scala>

      scala> def writeSocket(serverSocket: ServerSocket): Thread = {

      val thread = new Thread() {
      override def run(): Unit = {
      // wait for client to connection request and accept
      val clientSocket = serverSocket.accept()
      val socketWriter = new PrintWriter(clientSocket.getOutputStream())
      var index = 0
      for (_ <- 1 to 1000) {
      // write 5 records per iteration
      for (_ <- 0 to 100) { | index = index + 1 | socketWriter.println(index.toString + ",name_" + index | + ",city_" + index + "," + (index * 10000.00).toString + | ",school_" + index + ":school_" + index + index + "$" + index) | }
      socketWriter.flush()
      Thread.sleep(2000)
      }
      socketWriter.close()
      System.out.println("Socket closed")
      }
      }
      thread.start()
      thread
      }
      writeSocket: (serverSocket: java.net.ServerSocket)Thread

      scala>

      def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, tableName: String, port: Int): Thread = {
      val thread = new Thread() {
      override def run(): Unit = {
      var qry: StreamingQuery = null
      try { | val readSocketDF = spark.readStream | .format("socket") | .option("host", "10.18.98.34") | .option("port", port) | .load() | | qry = readSocketDF.writeStream | .format("carbondata") | .trigger(ProcessingTime("5 seconds")) | .option("checkpointLocation", tablePath.getStreamingCheckpointDir) | .option("tablePath", tablePath.getPath).option("tableName", tableName) | .start() | | qry.awaitTermination() | }

      catch

      { | case ex: Throwable => | ex.printStackTrace() | println("Done reading and writing streaming data") | }

      finally

      { | qry.stop() | }
      }
      }
      thread.start()
      thread
      }
      startStreaming: (spark: org.apache.spark.sql.SparkSession, tablePath: org.apache.carbondata.core.util.path.CarbonTablePath, tableName: String, port: Int)Thread

      scala>

      scala> val streamTableName = "all_datatypes_2048"
      streamTableName: String = all_datatypes_2048

      scala>

      scala>

      scala> sql(s"create table all_datatypes_2048 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('table_blocksize'='2048')")
      res4: org.apache.spark.sql.DataFrame = []

      scala>

      scala> sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/100_olap_C20.csv' INTO table all_datatypes_2048 options ('DELIMITER'=',', 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')")
      res5: org.apache.spark.sql.DataFrame = []

      scala>

      scala> sql(s"ALTER TABLE all_datatypes_2048 SET TBLPROPERTIES('streaming'='true')")
      res6: org.apache.spark.sql.DataFrame = []

      scala>

      scala>

      scala>

      scala> val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore.

      lookupRelation(Some("default"), streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable
      carbonTable: org.apache.carbondata.core.metadata.schema.table.CarbonTable = org.apache.carbondata.core.metadata.schema.table.CarbonTable@77648a90

      scala>

      scala> val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
      tablePath: org.apache.carbondata.core.util.path.CarbonTablePath = hdfs://hacluster/user/hive/warehouse/carbon.store/default/all_datatypes_2048

      scala>

      scala> val port = 8010
      port: Int = 8010

      scala> val serverSocket = new ServerSocket(port)
      serverSocket: java.net.ServerSocket = ServerSocket[addr=0.0.0.0/0.0.0.0,localport=8010]

      scala> val socketThread = writeSocket(serverSocket)
      socketThread: Thread = Thread[Thread-81,5,main]

      scala> val streamingThread = startStreaming(carbonSession, tablePath, streamTableName, port)

      Issue : Nullpointereception in spark shell when the streaming started with table streaming altered from default(false) to true. Streaming fails.
      scala> org.apache.carbondata.streaming.CarbonStreamException: Table default.all_datatypes_2048 is not a streaming table
      at org.apache.spark.sql.CarbonSource.createSink(CarbonSource.scala:242)
      at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:274)
      at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
      at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:51)
      Done reading and writing streaming data
      Exception in thread "Thread-82" java.lang.NullPointerException
      at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:59)

      Expected : Streaming should be continued successfully without any failure or exception after table streaming property altered from default(false) to true.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            anubhavtarar anubhav tarar
            chetdb Chetan Bhat
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 3h 10m
                3h 10m

                Slack

                  Issue deployment