Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-2167

Should not close the MetadataStore after generating JobModel in ProcessJobFactory.

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.2
    • None
    • None

    Description

      Currently in ProcessJobFactory the metadata-store connection is closed after generating the JobModel.

      To read from coordinator stream only once in samza-yarn ApplicationMaster, we ended up making the LocalityManager, TaskAssignmentManager, ChanglogStreamManager etc. However, after the above change closing the metadata store in ProcessJobFactory after generating the JobModel, results in the following exception when the servlet API is hit for querying the JobModel:

      org.codehaus.jackson.map.JsonMappingException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped. (through reference chain: org.apache.samza.job.model.JobModel["all-container-locality"])
      	at org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
      	at org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
      	at org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
      	at org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
      	at org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
      	at org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
      	at org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
      	at org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
      	at org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
      	at org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
      	at org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
      	at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
      	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
      	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
      	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
      	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
      	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
      	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
      	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
      	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
      	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
      	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
      	at org.eclipse.jetty.server.Server.handle(Server.java:497)
      	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
      	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
      	at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
      	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
      	at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.samza.SamzaException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped.
      	at org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
      	at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
      	at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
      	at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
      	at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
      	at org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
      	at org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
      	at org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
      	at org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
      	at org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:483)
      	at org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
      	at org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
      	at org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
      	... 25 more
      2019-04-16 12:16:46.090 [qtp220371218-112] HttpChannel [WARN] /
      org.codehaus.jackson.map.JsonMappingException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped. (through reference chain: org.apache.samza.job.model.JobModel["all-container-locality"])
      	at org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
      	at org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
      	at org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
      	at org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
      	at org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
      	at org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
      	at org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
      	at org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
      	at org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
      	at org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
      	at org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
      	at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
      	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
      	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
      	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
      	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
      	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
      	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
      	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
      	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
      	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
      	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
      	at org.eclipse.jetty.server.Server.handle(Server.java:497)
      	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
      	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
      	at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
      	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
      	at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.samza.SamzaException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped.
      	at org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
      	at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
      	at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
      	at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
      	at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
      	at org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
      	at org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
      	at org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
      	at org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
      	at org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:483)
      	at org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
      	at org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
      	at org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
      	... 25 more
      2019-04-16 12:16:46.091 [qtp220371218-112] HttpChannel [WARN] Could not send response error 500: org.codehaus.jackson.map.JsonMappingException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped. (through reference chain: org.apache.samza.job.model.JobModel["all-container-locality"])
      
      

      The above exception causes the local deployment to fail.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            spvenkat Shanthoosh Venkataraman
            spvenkat Shanthoosh Venkataraman
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 0.5h
                0.5h

                Slack

                  Issue deployment