Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.0
    • Component/s: None
    • Labels:
      None

      Description

      Elasticsearch is likely to be a common output datastore for Samza so it would be good to have a Producer (and possibly Consumer) as part of the core project.

      Elasticsearch organises data into indexes which can contain multiple types of documents. Each document has a id which it is identified by and the source which is it's actual data. These map well on to concepts in Samza.

      Elasticsearch also has mappings which defines how it indexes the documents pushed to it. I don't think the Samza System should be concerned with this.

      (index, type) -> stream

      id -> key

      source -> message

      The main one needing to be agreed upon is how the index and type are defined as a stream. We've started by simply joining them with a / as they would in the elasticsearch REST api and using that as the stream name.

      The java elasticsearch client can deal with the source being a variety of types that can be presented as json (Object, Map, byte[]). We could just pass objects to the Producer or use the Samza json serde to handle that (or maybe both), we're currently passing through the message object and assuming the client can deal with it.

      Elasticsearch also has the ability to bulk index documents, so combining this with correctly flushing the Producer can get good performance.

      Finally we'd need to think how this can be configured. Elasticsearch java client has two different transports that have various configuration. Currently we are only using the TransportClient, we should probably make it configurable or maybe not initially.

      1. 0001-SAMZA-654-Added-ElasticsearchSystemProducer-and-Fact.patch
        11 kB
        Dan Harvey
      2. ElasticsearchSystemProducer.patch
        59 kB
        Dan Harvey
      3. ElasticsearchSystemProducer.patch
        59 kB
        Dan Harvey
      4. ElasticsearchSystemProducer.patch
        56 kB
        Dan Harvey
      5. ElasticsearchSystemProducer.patch
        28 kB
        Dan Harvey

        Issue Links

          Activity

          Hide
          danharvey Dan Harvey added a comment -

          I've added the patch for the initial version we're using. I'm yet to add tests so it would be good to get feedback and opinions on the implementation before we finish it off. Review board link is here: https://reviews.apache.org/r/33297/

          Show
          danharvey Dan Harvey added a comment - I've added the patch for the initial version we're using. I'm yet to add tests so it would be good to get feedback and opinions on the implementation before we finish it off. Review board link is here: https://reviews.apache.org/r/33297/
          Hide
          jghoman Jakob Homan added a comment -

          The main one needing to be agreed upon is how the index and type are defined as a stream. We've started by simply joining them with a / as they would in the elasticsearch REST api and using that as the stream name.

          Does it make sense to make this a pluggable class that would allow the user to specify how to combine the index, type and ID as they wish, and provide the slash implementation as a default? Should the type and/or ID be mapped to the partition field? (No elasticsearch experience here, so these questions may be naive. Just going off of what I've read from this tutorial: http://joelabrahamsson.com/elasticsearch-101/)

          Currently we are only using the TransportClient, we should probably make it configurable or maybe not initially.

          If it's easy to make pluggable, let's go with that.

          Show
          jghoman Jakob Homan added a comment - The main one needing to be agreed upon is how the index and type are defined as a stream. We've started by simply joining them with a / as they would in the elasticsearch REST api and using that as the stream name. Does it make sense to make this a pluggable class that would allow the user to specify how to combine the index, type and ID as they wish, and provide the slash implementation as a default? Should the type and/or ID be mapped to the partition field? (No elasticsearch experience here, so these questions may be naive. Just going off of what I've read from this tutorial: http://joelabrahamsson.com/elasticsearch-101/ ) Currently we are only using the TransportClient, we should probably make it configurable or maybe not initially. If it's easy to make pluggable, let's go with that.
          Hide
          jghoman Jakob Homan added a comment -

          Do we need to consider rate limiting or throttling? How likely is it that a large Samza job can pound an elasticsearch cluster into oblivion?

          Show
          jghoman Jakob Homan added a comment - Do we need to consider rate limiting or throttling? How likely is it that a large Samza job can pound an elasticsearch cluster into oblivion?
          Hide
          danharvey Dan Harvey added a comment -

          Yes it would make sense to make that pluggable, I've made a IndexRequestFactory that maps (source, envelope) -> IndexRequest. Which by default can behave as above which will work for most cases.

          I think the partition key maps to a routing key that you can set on index time that specifies the shard the document ends up in. We don't use that but I've added that in to the default factory anyway.

          With throttling the current implementation uses Elasticsearches java BulkProcessor which by default batches documents into groups of 1000 or 15mb what ever is first. Then sends this batch to one of the nodes in a round robin fashion. I think this can only have one batch being sent at a time so that should self limit itself if the batches start to get slow. Writes in elasticsearch are "cheaper" as they are not committed to the index straight away but put on an internal change log and committed every second or so. We should also expose the BulkProcessor settings so users can change those to control how fast it indexes.

          Do you think we would need anything more specific throttling? It might be better to wait until someone tests this and finds improvements to add rather than guessing. We've not had any issues on our side yet.

          Show
          danharvey Dan Harvey added a comment - Yes it would make sense to make that pluggable, I've made a IndexRequestFactory that maps (source, envelope) -> IndexRequest. Which by default can behave as above which will work for most cases. I think the partition key maps to a routing key that you can set on index time that specifies the shard the document ends up in. We don't use that but I've added that in to the default factory anyway. With throttling the current implementation uses Elasticsearches java BulkProcessor which by default batches documents into groups of 1000 or 15mb what ever is first. Then sends this batch to one of the nodes in a round robin fashion. I think this can only have one batch being sent at a time so that should self limit itself if the batches start to get slow. Writes in elasticsearch are "cheaper" as they are not committed to the index straight away but put on an internal change log and committed every second or so. We should also expose the BulkProcessor settings so users can change those to control how fast it indexes. Do you think we would need anything more specific throttling? It might be better to wait until someone tests this and finds improvements to add rather than guessing. We've not had any issues on our side yet.
          Hide
          danharvey Dan Harvey added a comment -

          I've updated reviewboard with some changes, should I attach a patch every time too?

          I've got the pluggable config to add for the IndexRequestFactory and a ClientFactory and a few smaller changes left from your comments.

          Show
          danharvey Dan Harvey added a comment - I've updated reviewboard with some changes, should I attach a patch every time too? I've got the pluggable config to add for the IndexRequestFactory and a ClientFactory and a few smaller changes left from your comments.
          Hide
          closeuris Yan Fang added a comment -

          should I attach a patch every time too

          yes, thanks. That helps people test the patch.

          Show
          closeuris Yan Fang added a comment - should I attach a patch every time too yes, thanks. That helps people test the patch.
          Hide
          danharvey Dan Harvey added a comment -

          New patch and update rb.

          There is now a pluggable client and index request factory. You can configure the bulk processor and it's set to no concurrent requests, so flush() is now blocking and send() will block every time it hits any of the defined limits (1000 docs / 15mb by default)

          I still need to think over the serialisation part.

          If this looks good I should probably look at adding some tests and documentation for the settings.

          Show
          danharvey Dan Harvey added a comment - New patch and update rb. There is now a pluggable client and index request factory. You can configure the bulk processor and it's set to no concurrent requests, so flush() is now blocking and send() will block every time it hits any of the defined limits (1000 docs / 15mb by default) I still need to think over the serialisation part. If this looks good I should probably look at adding some tests and documentation for the settings.
          Hide
          closeuris Yan Fang added a comment -

          Hi Dan Harvey, it looks good for me.

          • Navina left a comment in the RB.
          • for the configs, could we also follow the same fashion as the Kafka, using something like, systems.system-name.settings.foo.foo?
          • a few java docs for each class
          • could you add one webpage for Elasticsearch Producer in Documentation and add configurations in the Configuration table(page) ? (as you mentioned)
          • unit tests (as you mentioned)

          Thank you.

          Show
          closeuris Yan Fang added a comment - Hi Dan Harvey , it looks good for me. Navina left a comment in the RB. for the configs, could we also follow the same fashion as the Kafka, using something like, systems.system-name.settings.foo.foo? a few java docs for each class could you add one webpage for Elasticsearch Producer in Documentation and add configurations in the Configuration table(page) ? (as you mentioned) unit tests (as you mentioned) Thank you.
          Hide
          danharvey Dan Harvey added a comment -

          Added a new patch and updated rb.

          Cleaned up the serialisation, configuration, added Javadocs and some units tests.

          I could possibly also add some integration tests with an in JVM Elasticsearch node/cluster. These test would be slower so I'm not sure if they should go in the same place or elsewhere?

          Show
          danharvey Dan Harvey added a comment - Added a new patch and updated rb. Cleaned up the serialisation, configuration, added Javadocs and some units tests. I could possibly also add some integration tests with an in JVM Elasticsearch node/cluster. These test would be slower so I'm not sure if they should go in the same place or elsewhere?
          Hide
          closeuris Yan Fang added a comment -

          Applied the patch to the master, failed to compile

          Execution failed for task ':samza-elasticsearch:compileJava'.
          > Compilation failed; see the compiler error output for details.
          
          * Try:
          Run with --info or --debug option to get more log output.
          
          * Exception is:
          org.gradle.api.tasks.TaskExecutionException: Execution failed for task ':samza-elasticsearch:compileJava'.
          	at org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.executeActions(ExecuteActionsTaskExecuter.java:69)
          	at org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.execute(ExecuteActionsTaskExecuter.java:46)
          	at org.gradle.api.internal.tasks.execution.PostExecutionAnalysisTaskExecuter.execute(PostExecutionAnalysisTaskExecuter.java:35)
          	at org.gradle.api.internal.tasks.execution.SkipUpToDateTaskExecuter.execute(SkipUpToDateTaskExecuter.java:64)
          	at org.gradle.api.internal.tasks.execution.ValidatingTaskExecuter.execute(ValidatingTaskExecuter.java:58)
          	at org.gradle.api.internal.tasks.execution.SkipEmptySourceFilesTaskExecuter.execute(SkipEmptySourceFilesTaskExecuter.java:42)
          	at org.gradle.api.internal.tasks.execution.SkipTaskWithNoActionsExecuter.execute(SkipTaskWithNoActionsExecuter.java:52)
          	at org.gradle.api.internal.tasks.execution.SkipOnlyIfTaskExecuter.execute(SkipOnlyIfTaskExecuter.java:53)
          	at org.gradle.api.internal.tasks.execution.ExecuteAtMostOnceTaskExecuter.execute(ExecuteAtMostOnceTaskExecuter.java:43)
          	at org.gradle.api.internal.AbstractTask.executeWithoutThrowingTaskFailure(AbstractTask.java:296)
          	at org.gradle.execution.taskgraph.AbstractTaskPlanExecutor$TaskExecutorWorker.executeTask(AbstractTaskPlanExecutor.java:79)
          	at org.gradle.execution.taskgraph.AbstractTaskPlanExecutor$TaskExecutorWorker.processTask(AbstractTaskPlanExecutor.java:63)
          	at org.gradle.execution.taskgraph.AbstractTaskPlanExecutor$TaskExecutorWorker.run(AbstractTaskPlanExecutor.java:51)
          	at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor.process(DefaultTaskPlanExecutor.java:23)
          	at org.gradle.execution.taskgraph.DefaultTaskGraphExecuter.execute(DefaultTaskGraphExecuter.java:86)
          	at org.gradle.execution.SelectedTaskExecutionAction.execute(SelectedTaskExecutionAction.java:29)
          	at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:61)
          	at org.gradle.execution.DefaultBuildExecuter.access$200(DefaultBuildExecuter.java:23)
          	at org.gradle.execution.DefaultBuildExecuter$2.proceed(DefaultBuildExecuter.java:67)
          	at org.gradle.execution.DryRunBuildExecutionAction.execute(DryRunBuildExecutionAction.java:32)
          	at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:61)
          	at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:54)
          	at org.gradle.initialization.DefaultGradleLauncher.doBuildStages(DefaultGradleLauncher.java:148)
          	at org.gradle.initialization.DefaultGradleLauncher.doBuild(DefaultGradleLauncher.java:105)
          	at org.gradle.initialization.DefaultGradleLauncher.run(DefaultGradleLauncher.java:85)
          	at org.gradle.launcher.exec.InProcessBuildActionExecuter$DefaultBuildController.run(InProcessBuildActionExecuter.java:81)
          	at org.gradle.launcher.cli.ExecuteBuildAction.run(ExecuteBuildAction.java:33)
          	at org.gradle.launcher.cli.ExecuteBuildAction.run(ExecuteBuildAction.java:24)
          	at org.gradle.launcher.exec.InProcessBuildActionExecuter.execute(InProcessBuildActionExecuter.java:39)
          	at org.gradle.launcher.daemon.server.exec.ExecuteBuild.doBuild(ExecuteBuild.java:45)
          	at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:34)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.WatchForDisconnection.execute(WatchForDisconnection.java:35)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.ResetDeprecationLogger.execute(ResetDeprecationLogger.java:24)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.StartStopIfBuildAndStop.execute(StartStopIfBuildAndStop.java:33)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.ReturnResult.execute(ReturnResult.java:34)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.ForwardClientInput$2.call(ForwardClientInput.java:71)
          	at org.gradle.launcher.daemon.server.exec.ForwardClientInput$2.call(ForwardClientInput.java:69)
          	at org.gradle.util.Swapper.swap(Swapper.java:38)
          	at org.gradle.launcher.daemon.server.exec.ForwardClientInput.execute(ForwardClientInput.java:69)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.LogToClient.doBuild(LogToClient.java:60)
          	at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:34)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.EstablishBuildEnvironment.doBuild(EstablishBuildEnvironment.java:60)
          	at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:34)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.StartBuildOrRespondWithBusy$1.run(StartBuildOrRespondWithBusy.java:45)
          	at org.gradle.launcher.daemon.server.DaemonStateCoordinator.runCommand(DaemonStateCoordinator.java:184)
          	at org.gradle.launcher.daemon.server.exec.StartBuildOrRespondWithBusy.doBuild(StartBuildOrRespondWithBusy.java:49)
          	at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:34)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.HandleStop.execute(HandleStop.java:30)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.DaemonHygieneAction.execute(DaemonHygieneAction.java:39)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.CatchAndForwardDaemonFailure.execute(CatchAndForwardDaemonFailure.java:32)
          	at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125)
          	at org.gradle.launcher.daemon.server.exec.DefaultDaemonCommandExecuter.executeCommand(DefaultDaemonCommandExecuter.java:51)
          	at org.gradle.launcher.daemon.server.DefaultIncomingConnectionHandler$ConnectionWorker.handleCommand(DefaultIncomingConnectionHandler.java:155)
          	at org.gradle.launcher.daemon.server.DefaultIncomingConnectionHandler$ConnectionWorker.receiveAndHandleCommand(DefaultIncomingConnectionHandler.java:128)
          	at org.gradle.launcher.daemon.server.DefaultIncomingConnectionHandler$ConnectionWorker.run(DefaultIncomingConnectionHandler.java:116)
          	at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
          Caused by: org.gradle.api.internal.tasks.compile.CompilationFailedException: Compilation failed; see the compiler error output for details.
          	at org.gradle.api.internal.tasks.compile.jdk6.Jdk6JavaCompiler.execute(Jdk6JavaCompiler.java:47)
          	at org.gradle.api.internal.tasks.compile.jdk6.Jdk6JavaCompiler.execute(Jdk6JavaCompiler.java:38)
          	at org.gradle.api.internal.tasks.compile.NormalizingJavaCompiler.delegateAndHandleErrors(NormalizingJavaCompiler.java:96)
          	at org.gradle.api.internal.tasks.compile.NormalizingJavaCompiler.execute(NormalizingJavaCompiler.java:49)
          	at org.gradle.api.internal.tasks.compile.NormalizingJavaCompiler.execute(NormalizingJavaCompiler.java:35)
          	at org.gradle.api.internal.tasks.compile.DelegatingJavaCompiler.execute(DelegatingJavaCompiler.java:29)
          	at org.gradle.api.internal.tasks.compile.DelegatingJavaCompiler.execute(DelegatingJavaCompiler.java:20)
          	at org.gradle.api.internal.tasks.compile.CleaningJavaCompilerSupport.execute(CleaningJavaCompilerSupport.java:33)
          	at org.gradle.api.internal.tasks.compile.CleaningJavaCompilerSupport.execute(CleaningJavaCompilerSupport.java:24)
          	at org.gradle.api.tasks.compile.JavaCompile.performCompilation(JavaCompile.java:87)
          	at org.gradle.api.tasks.compile.JavaCompile.compile(JavaCompile.java:65)
          	at org.gradle.api.tasks.compile.JavaCompile.compile(JavaCompile.java:53)
          	at org.gradle.internal.reflect.JavaMethod.invoke(JavaMethod.java:63)
          	at org.gradle.api.internal.project.taskfactory.AnnotationProcessingTaskFactory$IncrementalTaskAction.doExecute(AnnotationProcessingTaskFactory.java:235)
          	at org.gradle.api.internal.project.taskfactory.AnnotationProcessingTaskFactory$StandardTaskAction.execute(AnnotationProcessingTaskFactory.java:211)
          	at org.gradle.api.internal.project.taskfactory.AnnotationProcessingTaskFactory$IncrementalTaskAction.execute(AnnotationProcessingTaskFactory.java:222)
          	at org.gradle.api.internal.project.taskfactory.AnnotationProcessingTaskFactory$StandardTaskAction.execute(AnnotationProcessingTaskFactory.java:200)
          	at org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.executeAction(ExecuteActionsTaskExecuter.java:80)
          	at org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.executeActions(ExecuteActionsTaskExecuter.java:61)
          	... 66 more
          
          
          BUILD FAILED
          
          
          Show
          closeuris Yan Fang added a comment - Applied the patch to the master, failed to compile Execution failed for task ':samza-elasticsearch:compileJava'. > Compilation failed; see the compiler error output for details. * Try: Run with --info or --debug option to get more log output. * Exception is: org.gradle.api.tasks.TaskExecutionException: Execution failed for task ':samza-elasticsearch:compileJava'. at org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.executeActions(ExecuteActionsTaskExecuter.java:69) at org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.execute(ExecuteActionsTaskExecuter.java:46) at org.gradle.api.internal.tasks.execution.PostExecutionAnalysisTaskExecuter.execute(PostExecutionAnalysisTaskExecuter.java:35) at org.gradle.api.internal.tasks.execution.SkipUpToDateTaskExecuter.execute(SkipUpToDateTaskExecuter.java:64) at org.gradle.api.internal.tasks.execution.ValidatingTaskExecuter.execute(ValidatingTaskExecuter.java:58) at org.gradle.api.internal.tasks.execution.SkipEmptySourceFilesTaskExecuter.execute(SkipEmptySourceFilesTaskExecuter.java:42) at org.gradle.api.internal.tasks.execution.SkipTaskWithNoActionsExecuter.execute(SkipTaskWithNoActionsExecuter.java:52) at org.gradle.api.internal.tasks.execution.SkipOnlyIfTaskExecuter.execute(SkipOnlyIfTaskExecuter.java:53) at org.gradle.api.internal.tasks.execution.ExecuteAtMostOnceTaskExecuter.execute(ExecuteAtMostOnceTaskExecuter.java:43) at org.gradle.api.internal.AbstractTask.executeWithoutThrowingTaskFailure(AbstractTask.java:296) at org.gradle.execution.taskgraph.AbstractTaskPlanExecutor$TaskExecutorWorker.executeTask(AbstractTaskPlanExecutor.java:79) at org.gradle.execution.taskgraph.AbstractTaskPlanExecutor$TaskExecutorWorker.processTask(AbstractTaskPlanExecutor.java:63) at org.gradle.execution.taskgraph.AbstractTaskPlanExecutor$TaskExecutorWorker.run(AbstractTaskPlanExecutor.java:51) at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor.process(DefaultTaskPlanExecutor.java:23) at org.gradle.execution.taskgraph.DefaultTaskGraphExecuter.execute(DefaultTaskGraphExecuter.java:86) at org.gradle.execution.SelectedTaskExecutionAction.execute(SelectedTaskExecutionAction.java:29) at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:61) at org.gradle.execution.DefaultBuildExecuter.access$200(DefaultBuildExecuter.java:23) at org.gradle.execution.DefaultBuildExecuter$2.proceed(DefaultBuildExecuter.java:67) at org.gradle.execution.DryRunBuildExecutionAction.execute(DryRunBuildExecutionAction.java:32) at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:61) at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:54) at org.gradle.initialization.DefaultGradleLauncher.doBuildStages(DefaultGradleLauncher.java:148) at org.gradle.initialization.DefaultGradleLauncher.doBuild(DefaultGradleLauncher.java:105) at org.gradle.initialization.DefaultGradleLauncher.run(DefaultGradleLauncher.java:85) at org.gradle.launcher.exec.InProcessBuildActionExecuter$DefaultBuildController.run(InProcessBuildActionExecuter.java:81) at org.gradle.launcher.cli.ExecuteBuildAction.run(ExecuteBuildAction.java:33) at org.gradle.launcher.cli.ExecuteBuildAction.run(ExecuteBuildAction.java:24) at org.gradle.launcher.exec.InProcessBuildActionExecuter.execute(InProcessBuildActionExecuter.java:39) at org.gradle.launcher.daemon.server.exec.ExecuteBuild.doBuild(ExecuteBuild.java:45) at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:34) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.WatchForDisconnection.execute(WatchForDisconnection.java:35) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.ResetDeprecationLogger.execute(ResetDeprecationLogger.java:24) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.StartStopIfBuildAndStop.execute(StartStopIfBuildAndStop.java:33) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.ReturnResult.execute(ReturnResult.java:34) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.ForwardClientInput$2.call(ForwardClientInput.java:71) at org.gradle.launcher.daemon.server.exec.ForwardClientInput$2.call(ForwardClientInput.java:69) at org.gradle.util.Swapper.swap(Swapper.java:38) at org.gradle.launcher.daemon.server.exec.ForwardClientInput.execute(ForwardClientInput.java:69) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.LogToClient.doBuild(LogToClient.java:60) at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:34) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.EstablishBuildEnvironment.doBuild(EstablishBuildEnvironment.java:60) at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:34) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.StartBuildOrRespondWithBusy$1.run(StartBuildOrRespondWithBusy.java:45) at org.gradle.launcher.daemon.server.DaemonStateCoordinator.runCommand(DaemonStateCoordinator.java:184) at org.gradle.launcher.daemon.server.exec.StartBuildOrRespondWithBusy.doBuild(StartBuildOrRespondWithBusy.java:49) at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:34) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.HandleStop.execute(HandleStop.java:30) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.DaemonHygieneAction.execute(DaemonHygieneAction.java:39) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.CatchAndForwardDaemonFailure.execute(CatchAndForwardDaemonFailure.java:32) at org.gradle.launcher.daemon.server.exec.DaemonCommandExecution.proceed(DaemonCommandExecution.java:125) at org.gradle.launcher.daemon.server.exec.DefaultDaemonCommandExecuter.executeCommand(DefaultDaemonCommandExecuter.java:51) at org.gradle.launcher.daemon.server.DefaultIncomingConnectionHandler$ConnectionWorker.handleCommand(DefaultIncomingConnectionHandler.java:155) at org.gradle.launcher.daemon.server.DefaultIncomingConnectionHandler$ConnectionWorker.receiveAndHandleCommand(DefaultIncomingConnectionHandler.java:128) at org.gradle.launcher.daemon.server.DefaultIncomingConnectionHandler$ConnectionWorker.run(DefaultIncomingConnectionHandler.java:116) at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64) Caused by: org.gradle.api.internal.tasks.compile.CompilationFailedException: Compilation failed; see the compiler error output for details. at org.gradle.api.internal.tasks.compile.jdk6.Jdk6JavaCompiler.execute(Jdk6JavaCompiler.java:47) at org.gradle.api.internal.tasks.compile.jdk6.Jdk6JavaCompiler.execute(Jdk6JavaCompiler.java:38) at org.gradle.api.internal.tasks.compile.NormalizingJavaCompiler.delegateAndHandleErrors(NormalizingJavaCompiler.java:96) at org.gradle.api.internal.tasks.compile.NormalizingJavaCompiler.execute(NormalizingJavaCompiler.java:49) at org.gradle.api.internal.tasks.compile.NormalizingJavaCompiler.execute(NormalizingJavaCompiler.java:35) at org.gradle.api.internal.tasks.compile.DelegatingJavaCompiler.execute(DelegatingJavaCompiler.java:29) at org.gradle.api.internal.tasks.compile.DelegatingJavaCompiler.execute(DelegatingJavaCompiler.java:20) at org.gradle.api.internal.tasks.compile.CleaningJavaCompilerSupport.execute(CleaningJavaCompilerSupport.java:33) at org.gradle.api.internal.tasks.compile.CleaningJavaCompilerSupport.execute(CleaningJavaCompilerSupport.java:24) at org.gradle.api.tasks.compile.JavaCompile.performCompilation(JavaCompile.java:87) at org.gradle.api.tasks.compile.JavaCompile.compile(JavaCompile.java:65) at org.gradle.api.tasks.compile.JavaCompile.compile(JavaCompile.java:53) at org.gradle.internal.reflect.JavaMethod.invoke(JavaMethod.java:63) at org.gradle.api.internal.project.taskfactory.AnnotationProcessingTaskFactory$IncrementalTaskAction.doExecute(AnnotationProcessingTaskFactory.java:235) at org.gradle.api.internal.project.taskfactory.AnnotationProcessingTaskFactory$StandardTaskAction.execute(AnnotationProcessingTaskFactory.java:211) at org.gradle.api.internal.project.taskfactory.AnnotationProcessingTaskFactory$IncrementalTaskAction.execute(AnnotationProcessingTaskFactory.java:222) at org.gradle.api.internal.project.taskfactory.AnnotationProcessingTaskFactory$StandardTaskAction.execute(AnnotationProcessingTaskFactory.java:200) at org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.executeAction(ExecuteActionsTaskExecuter.java:80) at org.gradle.api.internal.tasks.execution.ExecuteActionsTaskExecuter.executeActions(ExecuteActionsTaskExecuter.java:61) ... 66 more BUILD FAILED
          Hide
          closeuris Yan Fang added a comment -

          Ok. It is caused by

          samza/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java:34: error: ElasticsearchSystemAdmin is not abstract and does not override abstract method createCoordinatorStream(String) in SystemAdmin
          public class ElasticsearchSystemAdmin implements SystemAdmin {
                 ^
          1 error
          
          
          Show
          closeuris Yan Fang added a comment - Ok. It is caused by samza/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java:34: error: ElasticsearchSystemAdmin is not abstract and does not override abstract method createCoordinatorStream( String ) in SystemAdmin public class ElasticsearchSystemAdmin implements SystemAdmin { ^ 1 error
          Hide
          closeuris Yan Fang added a comment -

          A few nits in the RB. One suggestion: can we put all the config in one place, ElasticConfig, not spread to all factories? Currently it's a little difficulty to manage. I think we can pass the ElasticConfig to the get Method of the factories, instead of having the factory construct with config.

          Show
          closeuris Yan Fang added a comment - A few nits in the RB. One suggestion: can we put all the config in one place, ElasticConfig, not spread to all factories? Currently it's a little difficulty to manage. I think we can pass the ElasticConfig to the get Method of the factories, instead of having the factory construct with config.
          Hide
          closeuris Yan Fang added a comment -

          I could possibly also add some integration tests with an in JVM Elasticsearch node/cluster. These test would be slower so I'm not sure if they should go in the same place or elsewhere?

          This can go to the integration test part, which will not be triggered when we run ./gradlew clean build. So the slowness should not affect the compiling. Refer to some test here https://github.com/apache/samza/tree/master/samza-test/src/main/python/tests . If you think it's a little too much for this ticket, we can separate the ticket – a new ticket for the integration test. What do you think? Thank you.

          Show
          closeuris Yan Fang added a comment - I could possibly also add some integration tests with an in JVM Elasticsearch node/cluster. These test would be slower so I'm not sure if they should go in the same place or elsewhere? This can go to the integration test part, which will not be triggered when we run ./gradlew clean build. So the slowness should not affect the compiling. Refer to some test here https://github.com/apache/samza/tree/master/samza-test/src/main/python/tests . If you think it's a little too much for this ticket, we can separate the ticket – a new ticket for the integration test. What do you think? Thank you.
          Hide
          danharvey Dan Harvey added a comment -

          I've updated the config as you mentioned and that's cleaner now. Added new patch and update rb.

          I agree that there should be a separate ticket for the integration tests as that will take more time for me to figure out. I've created SAMZA-705 for that.

          Show
          danharvey Dan Harvey added a comment - I've updated the config as you mentioned and that's cleaner now. Added new patch and update rb. I agree that there should be a separate ticket for the integration tests as that will take more time for me to figure out. I've created SAMZA-705 for that.
          Hide
          metacret Jae Hyeon Bae added a comment -

          What about using RestClient instead of native Java client? I have seen quite often native Java client caused serious problems on maintaining ES cluster such as version mismatches including JVM and ES. https://github.com/elastic/elasticsearch-hadoop has a RestClient implementation and we can take a look at Storm Bolt implementation, which should be similar to Samza SystemProducer.

          Show
          metacret Jae Hyeon Bae added a comment - What about using RestClient instead of native Java client? I have seen quite often native Java client caused serious problems on maintaining ES cluster such as version mismatches including JVM and ES. https://github.com/elastic/elasticsearch-hadoop has a RestClient implementation and we can take a look at Storm Bolt implementation, which should be similar to Samza SystemProducer.
          Hide
          danharvey Dan Harvey added a comment -

          I've not benchmarked this but the native Java client should perform a lot better as it talks directly to the Elasticsearch nodes directly rather than via the Rest interface that's built on top. I've personally never had the issue with the version miss match as I've always been able to manage the versions of the server and client.

          That said I can see that it might cases where people would prefer to use Elasticsearch via HTTP for access / load balancing etc...

          It might be worth either adding something on top of this or making a separate samza-elasticseach-rest module so users can pick which suits them better.

          Show
          danharvey Dan Harvey added a comment - I've not benchmarked this but the native Java client should perform a lot better as it talks directly to the Elasticsearch nodes directly rather than via the Rest interface that's built on top. I've personally never had the issue with the version miss match as I've always been able to manage the versions of the server and client. That said I can see that it might cases where people would prefer to use Elasticsearch via HTTP for access / load balancing etc... It might be worth either adding something on top of this or making a separate samza-elasticseach-rest module so users can pick which suits them better.
          Hide
          closeuris Yan Fang added a comment -

          Hi Dan Harvey, thank you very much for the update. I am reviewing it. There is an error in building.

          Users/yanfang/Develep/Samza/0613/samza/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java:34: error: ElasticsearchSystemAdmin is not abstract and does not override abstract method createCoordinatorStream(String) in SystemAdmin
          public class ElasticsearchSystemAdmin implements SystemAdmin {
          ^
          1 error
          :samza-elasticsearch:compileJava FAILED

          This is caused by the change we made in SystemAdmin API. Sorry for the inconvenience. Can you fix it? Thanks.

          Show
          closeuris Yan Fang added a comment - Hi Dan Harvey , thank you very much for the update. I am reviewing it. There is an error in building. Users/yanfang/Develep/Samza/0613/samza/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java:34: error: ElasticsearchSystemAdmin is not abstract and does not override abstract method createCoordinatorStream(String) in SystemAdmin public class ElasticsearchSystemAdmin implements SystemAdmin { ^ 1 error :samza-elasticsearch:compileJava FAILED This is caused by the change we made in SystemAdmin API. Sorry for the inconvenience. Can you fix it? Thanks.
          Hide
          closeuris Yan Fang added a comment - - edited

          Hi Dan Harvey, the RB LGTM. I would like to test it with master branch before shipping. Current patch is based on the 0.9.0, could you create a patch for the master branch? Thank you.

          Show
          closeuris Yan Fang added a comment - - edited Hi Dan Harvey , the RB LGTM. I would like to test it with master branch before shipping. Current patch is based on the 0.9.0, could you create a patch for the master branch? Thank you.
          Hide
          theduderog Roger Hoover added a comment -

          I took Dan Harvey's patch and added metrics. The code is here (https://github.com/Quantiply/rico/tree/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch).

          Either someone can take it and include it with this code or I'll submit it as a patch once this is commited.

          Thanks,

          Roger

          Show
          theduderog Roger Hoover added a comment - I took Dan Harvey 's patch and added metrics. The code is here ( https://github.com/Quantiply/rico/tree/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch ). Either someone can take it and include it with this code or I'll submit it as a patch once this is commited. Thanks, Roger
          Hide
          danharvey Dan Harvey added a comment -

          Sorry for the delay, I've rebase this against the latest master. I made createCoordinatorStream throw UnsupportedOperationException as with other methods that are not supported via Elasticsearch.

          I think this is all ready to be merged now

          Roger Hoover As this ticket is all reviewed now I think it would be best to get in this then create a new ticket to add the metrics to it?

          Show
          danharvey Dan Harvey added a comment - Sorry for the delay, I've rebase this against the latest master. I made createCoordinatorStream throw UnsupportedOperationException as with other methods that are not supported via Elasticsearch. I think this is all ready to be merged now Roger Hoover As this ticket is all reviewed now I think it would be best to get in this then create a new ticket to add the metrics to it?
          Hide
          theduderog Roger Hoover added a comment - - edited

          Dan Harvey, no problem on putting the metrics in a separate ticket. I'll submit a JIRA when this is merged.

          BTW, thank you very much for contributing this producer. It's great. Just starting to use it now.

          Show
          theduderog Roger Hoover added a comment - - edited Dan Harvey , no problem on putting the metrics in a separate ticket. I'll submit a JIRA when this is merged. BTW, thank you very much for contributing this producer. It's great. Just starting to use it now.
          Hide
          closeuris Yan Fang added a comment -

          +1 . Dan Harvey, thank you so much for this contribution. Committed to the master branch.

          Roger Hoover, feel free to create a ticket to add the metrics. Thank you.

          Show
          closeuris Yan Fang added a comment - +1 . Dan Harvey , thank you so much for this contribution. Committed to the master branch. Roger Hoover , feel free to create a ticket to add the metrics. Thank you.
          Hide
          theduderog Roger Hoover added a comment - - edited

          Created SAMZA-733

          Show
          theduderog Roger Hoover added a comment - - edited Created SAMZA-733
          Hide
          estezz Leo Woessner added a comment -

          How do I configure the ElasticSearchProducer?

          Show
          estezz Leo Woessner added a comment - How do I configure the ElasticSearchProducer?
          Hide
          navina Navina Ramesh added a comment -

          ElasticSearchProducer was recently added to the Samza repository. An example job and a tutorial should be available once SAMZA-740 is resolved.

          Show
          navina Navina Ramesh added a comment - ElasticSearchProducer was recently added to the Samza repository. An example job and a tutorial should be available once SAMZA-740 is resolved.

            People

            • Assignee:
              danharvey Dan Harvey
              Reporter:
              danharvey Dan Harvey
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development