Uploaded image for project: 'PredictionIO (Retired)'
  1. PredictionIO (Retired)
  2. PIO-213

Elastic search as event data storage does not work

Add voteWatch issue
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 0.12.1, 0.14.0, 0.15.0
    • None
    • Build, Core, Documentation
    • None

    Description

      In docs we see that we can use elastic search as event store instead of hbase ( with hbase pio is working as expected but we do not like hbase thus decided to migrate to ES ) 
      but we tried pio 0.14.0, 0.15.0 and different versions of elastic 5.9, 6.8.1  and spark 2.1.3, 2.4.0
      all the time train fails because of json4s lib when spark cannot serialise an object from json4s 
      we also tried to upgrade json4s lib to newest version but it also did not help
      so guys we given up since we cannot use elastic search instead of hbase without code changes 

      here is stack trace we are struggling with: ( pio 0.15.0 with spark 2.4 ) 

      Exception in thread "main" org.apache.spark.SparkException: Task not serializable

      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)

      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)

      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)

      at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)

      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:934)

      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)

      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

      at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

      at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)

      at org.apache.predictionio.data.storage.elasticsearch.ESPEvents.delete(ESPEvents.scala:111)

      at org.apache.predictionio.core.SelfCleaningDataSource$class.removePEvents(SelfCleaningDataSource.scala:198)

      at co.unreel.DataSource.removePEvents(DataSource.scala:13)

      at org.apache.predictionio.core.SelfCleaningDataSource$class.wipePEvents(SelfCleaningDataSource.scala:184)

      at co.unreel.DataSource.wipePEvents(DataSource.scala:13)

      at co.unreel.DataSource.cleanPersistedPEvents(DataSource.scala:39)

      at co.unreel.DataSource.readTraining(DataSource.scala:48)

      at co.unreel.DataSource.readTraining(DataSource.scala:13)

      at org.apache.predictionio.controller.PDataSource.readTrainingBase(PDataSource.scala:40)

      at org.apache.predictionio.controller.Engine$.train(Engine.scala:642)

      at org.apache.predictionio.controller.Engine.train(Engine.scala:176)

      at org.apache.predictionio.workflow.CoreWorkflow$.runTrain(CoreWorkflow.scala:67)

      at org.apache.predictionio.workflow.CreateWorkflow$.main(CreateWorkflow.scala:251)

      at org.apache.predictionio.workflow.CreateWorkflow.main(CreateWorkflow.scala)

      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

      at java.lang.reflect.Method.invoke(Method.java:498)

      at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)

      at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)

      at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)

      at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)

      at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)

      at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)

      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)

      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

      Caused by: java.io.NotSerializableException: org.json4s.ext.IntervalSerializer$$anon$1

      Serialization stack:

      • object not serializable (class: org.json4s.ext.IntervalSerializer$$anon$1, value: org.json4s.ext.IntervalSerializer$$anon$1@6d9428f3)
      • field (class: org.json4s.ext.ClassSerializer, name: t, type: interface org.json4s.ext.ClassType)
      • object (class org.json4s.ext.ClassSerializer, ClassSerializer(org.json4s.ext.IntervalSerializer$$anon$1@6d9428f3))
      • writeObject data (class: scala.collection.immutable.List$SerializationProxy)
      • object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6106dfb6)
      • writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
      • object (class scala.collection.immutable.$colon$colon, List(DurationSerializer, InstantSerializer, DateTimeSerializer, DateMidnightSerializer, ClassSerializer(org.json4s.ext.IntervalSerializer$$anon$1@6d9428f3), ClassSerializer(org.json4s.ext.LocalDateSerializer$$anon$2@7dddfc35), ClassSerializer(org.json4s.ext.LocalTimeSerializer$$anon$3@71316cd7), PeriodSerializer))
      • field (class: org.json4s.Formats$$anon$3, name: wCustomSerializers$1, type: class scala.collection.immutable.List)
      • object (class org.json4s.Formats$$anon$3, org.json4s.Formats$$anon$3@7a730479)
      • field (class: org.apache.predictionio.data.storage.elasticsearch.ESPEvents, name: formats, type: interface org.json4s.Formats)
      • object (class org.apache.predictionio.data.storage.elasticsearch.ESPEvents, org.apache.predictionio.data.storage.elasticsearch.ESPEvents@3f45dfec)
      • field (class: org.apache.predictionio.data.storage.elasticsearch.ESPEvents$$anonfun$delete$1, name: $outer, type: class org.apache.predictionio.data.storage.elasticsearch.ESPEvents)
      • object (class org.apache.predictionio.data.storage.elasticsearch.ESPEvents$$anonfun$delete$1, <function1>)

      at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)

      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)

      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)

      ... 35 more

      Attachments

        Activity

          People

            Unassigned Unassigned
            eduard_sanin eduard

            Dates

              Created:
              Updated:

              Slack

                Issue deployment