Details
Description
In docs we see that we can use elastic search as event store instead of hbase ( with hbase pio is working as expected but we do not like hbase thus decided to migrate to ES )
but we tried pio 0.14.0, 0.15.0 and different versions of elastic 5.9, 6.8.1 and spark 2.1.3, 2.4.0
all the time train fails because of json4s lib when spark cannot serialise an object from json4s
we also tried to upgrade json4s lib to newest version but it also did not help
so guys we given up since we cannot use elastic search instead of hbase without code changes
here is stack trace we are struggling with: ( pio 0.15.0 with spark 2.4 )
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
at org.apache.predictionio.data.storage.elasticsearch.ESPEvents.delete(ESPEvents.scala:111)
at org.apache.predictionio.core.SelfCleaningDataSource$class.removePEvents(SelfCleaningDataSource.scala:198)
at co.unreel.DataSource.removePEvents(DataSource.scala:13)
at org.apache.predictionio.core.SelfCleaningDataSource$class.wipePEvents(SelfCleaningDataSource.scala:184)
at co.unreel.DataSource.wipePEvents(DataSource.scala:13)
at co.unreel.DataSource.cleanPersistedPEvents(DataSource.scala:39)
at co.unreel.DataSource.readTraining(DataSource.scala:48)
at co.unreel.DataSource.readTraining(DataSource.scala:13)
at org.apache.predictionio.controller.PDataSource.readTrainingBase(PDataSource.scala:40)
at org.apache.predictionio.controller.Engine$.train(Engine.scala:642)
at org.apache.predictionio.controller.Engine.train(Engine.scala:176)
at org.apache.predictionio.workflow.CoreWorkflow$.runTrain(CoreWorkflow.scala:67)
at org.apache.predictionio.workflow.CreateWorkflow$.main(CreateWorkflow.scala:251)
at org.apache.predictionio.workflow.CreateWorkflow.main(CreateWorkflow.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.json4s.ext.IntervalSerializer$$anon$1
Serialization stack:
- object not serializable (class: org.json4s.ext.IntervalSerializer$$anon$1, value: org.json4s.ext.IntervalSerializer$$anon$1@6d9428f3)
- field (class: org.json4s.ext.ClassSerializer, name: t, type: interface org.json4s.ext.ClassType)
- object (class org.json4s.ext.ClassSerializer, ClassSerializer(org.json4s.ext.IntervalSerializer$$anon$1@6d9428f3))
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6106dfb6)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(DurationSerializer, InstantSerializer, DateTimeSerializer, DateMidnightSerializer, ClassSerializer(org.json4s.ext.IntervalSerializer$$anon$1@6d9428f3), ClassSerializer(org.json4s.ext.LocalDateSerializer$$anon$2@7dddfc35), ClassSerializer(org.json4s.ext.LocalTimeSerializer$$anon$3@71316cd7), PeriodSerializer))
- field (class: org.json4s.Formats$$anon$3, name: wCustomSerializers$1, type: class scala.collection.immutable.List)
- object (class org.json4s.Formats$$anon$3, org.json4s.Formats$$anon$3@7a730479)
- field (class: org.apache.predictionio.data.storage.elasticsearch.ESPEvents, name: formats, type: interface org.json4s.Formats)
- object (class org.apache.predictionio.data.storage.elasticsearch.ESPEvents, org.apache.predictionio.data.storage.elasticsearch.ESPEvents@3f45dfec)
- field (class: org.apache.predictionio.data.storage.elasticsearch.ESPEvents$$anonfun$delete$1, name: $outer, type: class org.apache.predictionio.data.storage.elasticsearch.ESPEvents)
- object (class org.apache.predictionio.data.storage.elasticsearch.ESPEvents$$anonfun$delete$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 35 more