Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
Currently in ProcessJobFactory the metadata-store connection is closed after generating the JobModel.
To read from coordinator stream only once in samza-yarn ApplicationMaster, we ended up making the LocalityManager, TaskAssignmentManager, ChanglogStreamManager etc. However, after the above change closing the metadata store in ProcessJobFactory after generating the JobModel, results in the following exception when the servlet API is hit for querying the JobModel:
org.codehaus.jackson.map.JsonMappingException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped. (through reference chain: org.apache.samza.job.model.JobModel["all-container-locality"]) at org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218) at org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183) at org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140) at org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158) at org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112) at org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610) at org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256) at org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575) at org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081) at org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39) at org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32) at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) at org.eclipse.jetty.server.Server.handle(Server.java:497) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245) at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.samza.SamzaException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped. at org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311) at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87) at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58) at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154) at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148) at org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98) at org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74) at org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63) at org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123) at org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483) at org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418) at org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150) ... 25 more 2019-04-16 12:16:46.090 [qtp220371218-112] HttpChannel [WARN] / org.codehaus.jackson.map.JsonMappingException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped. (through reference chain: org.apache.samza.job.model.JobModel["all-container-locality"]) at org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218) at org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183) at org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140) at org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158) at org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112) at org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610) at org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256) at org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575) at org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081) at org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39) at org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32) at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) at org.eclipse.jetty.server.Server.handle(Server.java:497) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245) at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.samza.SamzaException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped. at org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311) at org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87) at org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58) at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154) at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148) at org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98) at org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74) at org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63) at org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123) at org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483) at org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418) at org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150) ... 25 more 2019-04-16 12:16:46.091 [qtp220371218-112] HttpChannel [WARN] Could not send response error 500: org.codehaus.jackson.map.JsonMappingException: samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has stopped. (through reference chain: org.apache.samza.job.model.JobModel["all-container-locality"])
The above exception causes the local deployment to fail.
Attachments
Issue Links
- links to