Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-310

Publish container logs to a SystemStream

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.7.0
    • Fix Version/s: 0.9.0
    • Component/s: container
    • Labels:
      None

      Description

      At the moment, it's a bit awkward to get to a Samza job's logs: assuming you're running on YARN, you have to navigate around the YARN web interface, and you can only see one container's logs at a time.

      Given that Samza is all about streams, it would make sense for the logs generated by Samza jobs to also be sent to a stream. There, they could be indexed with Kibana, consumed by an exception-tracking system, etc.

      Notes:

      • The serde for encoding logs into a suitable wire format should be pluggable. There can be a default implementation that uses JSON, analogous to MetricsSnapshotSerdeFactory for metrics, but organisations that already have a standardised in-house encoding for logs should be able to use it.
      • Should this be at the level of Slf4j or Log4j? Currently the log configuration for YARN jobs uses Log4j, which has the advantage that any frameworks/libraries that use Log4j but not Slf4j appear in the logs. However, Samza itself currently only depends on Slf4j. If we tie this feature to Log4j, it would somewhat defeat the purpose of using Slf4j.
      • Do we need to consider partitioning? Perhaps we can use the container name as partitioning key, so that the ordering of logs from each container is preserved.
      1. SAMZA-310.1.patch
        15 kB
        Yan Fang
      2. SAMZA-310.2.patch
        23 kB
        Yan Fang
      3. SAMZA-310.4.patch
        27 kB
        Yan Fang
      4. SAMZA-310.5.patch
        27 kB
        Yan Fang
      5. SAMZA-310.6.patch
        29 kB
        Yan Fang
      6. SAMZA-310.7.patch
        29 kB
        Yan Fang
      7. SAMZA-310.patch
        20 kB
        Yan Fang

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          Could we just use KafkaLog4jAppender.scala for this? We might need to extend it slightly to tag the log message with the container (or some other YARN/Samza-specific data).

          One con to the log4j (or SLF4J) approach is that it doesn't give us gc logs (or any other random text file written to the logs directory in YARN), which are usually pretty useful. One counter argument to this con is that the GC info is already exposed (to some extent) through our JVM metrics class.

          Do we need to consider partitioning? Perhaps we can use the container name as partitioning key, so that the ordering of logs from each container is preserved.

          Yea, we should do this. Without a partition key, we'd lose ordering. The container name will make sure things are always lined up properly.

          The serde for encoding logs into a suitable wire format should be pluggable.

          Yea, the Kafka appender isn't pluggable, I think. I agree that well-structured JSON blobs for the logs is way more useful. This is, in effect, what log stash does, though. It's not that uncommon to go full ELK, though. That said, logstash always just struck me as a Samza job waiting to happen. Perhaps it's better to just well-form it from the beginning, as you suggest.

          Another idea: perhaps we could expose this log information through a dashboard (SAMZA-300), in some way.

          An alternative implementation idea that I've been toying with is to write a little log tailing daemon. This light-weight daemon would take a config that included a set of files/directories to tail, and a set of topics to send the lines to. This could be a stand-alone daemon or a contrib to Kafka. I'd always thought of it as stand alone. For Samza, we could then install it on every node where there's an NM running, and configure it to tail all userlog directories, and forward all files to appropriate Kafka topics. This would have the advantage that we'd get gc.logs (and other logs) in our streams. It'd also be generally useful as a Flume replacement. It could tail syslog, apache logs, etc. It could also be potentially useful to ship Map/Reduce logs to Kafka, as well. I think YARN, in general, is struggling with a solid story on how to deal with app logs.

          Show
          criccomini Chris Riccomini added a comment - Could we just use KafkaLog4jAppender.scala for this? We might need to extend it slightly to tag the log message with the container (or some other YARN/Samza-specific data). One con to the log4j (or SLF4J) approach is that it doesn't give us gc logs (or any other random text file written to the logs directory in YARN), which are usually pretty useful. One counter argument to this con is that the GC info is already exposed (to some extent) through our JVM metrics class. Do we need to consider partitioning? Perhaps we can use the container name as partitioning key, so that the ordering of logs from each container is preserved. Yea, we should do this. Without a partition key, we'd lose ordering. The container name will make sure things are always lined up properly. The serde for encoding logs into a suitable wire format should be pluggable. Yea, the Kafka appender isn't pluggable, I think. I agree that well-structured JSON blobs for the logs is way more useful. This is, in effect, what log stash does, though. It's not that uncommon to go full ELK , though. That said, logstash always just struck me as a Samza job waiting to happen. Perhaps it's better to just well-form it from the beginning, as you suggest. Another idea: perhaps we could expose this log information through a dashboard ( SAMZA-300 ), in some way. An alternative implementation idea that I've been toying with is to write a little log tailing daemon. This light-weight daemon would take a config that included a set of files/directories to tail, and a set of topics to send the lines to. This could be a stand-alone daemon or a contrib to Kafka. I'd always thought of it as stand alone. For Samza, we could then install it on every node where there's an NM running, and configure it to tail all userlog directories, and forward all files to appropriate Kafka topics. This would have the advantage that we'd get gc.logs (and other logs) in our streams. It'd also be generally useful as a Flume replacement. It could tail syslog, apache logs, etc. It could also be potentially useful to ship Map/Reduce logs to Kafka, as well. I think YARN, in general, is struggling with a solid story on how to deal with app logs.
          Hide
          closeuris Yan Fang added a comment -

          spent some time in looking at KafkaLog4jAppender.scala. I think KafkaLog4jAppender.scala works fine in our use case if we do not consider gc logs. a few change we need to make:
          1) add partition information when sending messages to broker in KafkaLog4jAppender.scala
          2) create the topic before hand with appropriate partition number
          3) tag container information to the log (for partition purpose)

          We could make it optional. The way to trigger it is from changing log4j.xml and properties file. One thing bugs me is that, the log4j topic name is defined in the log4j.xml, Samza seems not have the information before hand and so it can not create the topic in, say, AM, if it does not read log4j.xml.

          One benefit is that, it could easily fit into ELK with logstash-kafka

          Show
          closeuris Yan Fang added a comment - spent some time in looking at KafkaLog4jAppender.scala. I think KafkaLog4jAppender.scala works fine in our use case if we do not consider gc logs. a few change we need to make: 1) add partition information when sending messages to broker in KafkaLog4jAppender.scala 2) create the topic before hand with appropriate partition number 3) tag container information to the log (for partition purpose) We could make it optional. The way to trigger it is from changing log4j.xml and properties file. One thing bugs me is that, the log4j topic name is defined in the log4j.xml, Samza seems not have the information before hand and so it can not create the topic in, say, AM, if it does not read log4j.xml. One benefit is that, it could easily fit into ELK with logstash-kafka
          Hide
          criccomini Chris Riccomini added a comment - - edited

          Your approach seems like it should work.

          add partition information when sending messages to broker in KafkaLog4jAppender.scala

          What we really need is a way to specify a key for the message. For us, the key should be the ApplicationMaster, or Container ID (taskGroup). This would guarantee that, for a given container, all log messages will end up fully ordered in a single partition (mixed in with log messages from other containers). If a user just wants a single container's logs, they can just grep.

          tag container information to the log

          You might want to have a look at Log4J's MDC. The MDC allows us to statically set/update properties. These properties can then be accessed via the ConversionPattern in the log4j.xml file.

          This would allow us to set/update the MDC to include the things like containerName, taskName, etc. You could then include them in the logs at your convenience via %X{containerName}, %X{taskName}, etc.

          One thing bugs me is that, the log4j topic name is defined in the log4j.xml, Samza seems not have the information before hand and so it can not create the topic in, say, AM, if it does not read log4j.xml.

          This should be OK for now. Sending the log message will trigger a topic creation with the cluster defaults (replication, partition count, retention, etc) for the topic. We can always patch KafkaLog4jAppender.scala (or extend it) to add this feature if we really needed it, but I don't think that we do.

          This JIRA suffers from the same troubles that I'm having with SAMZA-350. Ideally, I'd like to have this work without depending on log4j in samza-core. The only injection point that I can think of right now to manage things like setting the MDC to update the taskName when we process a message is via the TaskLifecycleListener. This would mean that, to use this feature, you'd have to (1) include the samza-log4j package on your classpath and (2) define some Log4JTaskLifecycleListener in your configs. Does that seem tolerable to people?

          Show
          criccomini Chris Riccomini added a comment - - edited Your approach seems like it should work. add partition information when sending messages to broker in KafkaLog4jAppender.scala What we really need is a way to specify a key for the message. For us, the key should be the ApplicationMaster, or Container ID (taskGroup). This would guarantee that, for a given container, all log messages will end up fully ordered in a single partition (mixed in with log messages from other containers). If a user just wants a single container's logs, they can just grep. tag container information to the log You might want to have a look at Log4J's MDC . The MDC allows us to statically set/update properties. These properties can then be accessed via the ConversionPattern in the log4j.xml file. This would allow us to set/update the MDC to include the things like containerName, taskName, etc. You could then include them in the logs at your convenience via %X{containerName}, %X{taskName}, etc. One thing bugs me is that, the log4j topic name is defined in the log4j.xml, Samza seems not have the information before hand and so it can not create the topic in, say, AM, if it does not read log4j.xml. This should be OK for now. Sending the log message will trigger a topic creation with the cluster defaults (replication, partition count, retention, etc) for the topic. We can always patch KafkaLog4jAppender.scala (or extend it) to add this feature if we really needed it, but I don't think that we do. This JIRA suffers from the same troubles that I'm having with SAMZA-350 . Ideally, I'd like to have this work without depending on log4j in samza-core. The only injection point that I can think of right now to manage things like setting the MDC to update the taskName when we process a message is via the TaskLifecycleListener. This would mean that, to use this feature, you'd have to (1) include the samza-log4j package on your classpath and (2) define some Log4JTaskLifecycleListener in your configs. Does that seem tolerable to people?
          Hide
          closeuris Yan Fang added a comment -

          Thanks for pointing out the MDC.

          Ideally, I'd like to have this work without depending on log4j in samza-core.

          slf4j has the MDC, but grizzled.slf4j does not...

          The only injection point that I can think of right now to manage things like setting the MDC to update the taskName when we process a message is via the TaskLifecycleListener.

          Can we just set up MDC at the starting time of the containers, instead of vai TaskLifecycleListener? Since the goal of assgining the AM/ContainerID information is to have the key to the logs, these information can be retrieved at the starting time of the container.

          Show
          closeuris Yan Fang added a comment - Thanks for pointing out the MDC. Ideally, I'd like to have this work without depending on log4j in samza-core. slf4j has the MDC , but grizzled.slf4j does not... The only injection point that I can think of right now to manage things like setting the MDC to update the taskName when we process a message is via the TaskLifecycleListener. Can we just set up MDC at the starting time of the containers, instead of vai TaskLifecycleListener? Since the goal of assgining the AM/ContainerID information is to have the key to the logs, these information can be retrieved at the starting time of the container.
          Hide
          criccomini Chris Riccomini added a comment -

          slf4j has the MDC, but grizzled.slf4j does not...

          Doh! I forgot SLF4J had an MDC as well. Given that Grizzled doesn't, let's just directly access SLF4J's. We'll have to add SLF4J as an explicit compile-time dependency, but that's fine, since Grizzled is already pulling it in transitively.

          In fact, I'd like to remove Grizzled at some point anyway. We can implement our own (as Kafka does) in a single class, so I don't think that it provides much benefit. I've opened SAMZA-361 for this.

          Can we just set up MDC at the starting time of the containers, instead of vai TaskLifecycleListener? Since the goal of assigning the AM/ContainerID information is to have the key to the logs, these information can be retrieved at the starting time of the container.

          +1 to this. Great idea. Your original idea about setting the TaskName (used to be partition) every time we enter a TaskInstance's code to process/window/send/commit/close would also be useful. One thing to watch out for here is performance impact of setting the MDC. My hope is that it's not a thread safe, and just a map update. We should run TestSamzaContainerPerformance, and verify that we're still getting good performance after this change.

          Show
          criccomini Chris Riccomini added a comment - slf4j has the MDC, but grizzled.slf4j does not... Doh! I forgot SLF4J had an MDC as well. Given that Grizzled doesn't, let's just directly access SLF4J's. We'll have to add SLF4J as an explicit compile-time dependency, but that's fine, since Grizzled is already pulling it in transitively. In fact, I'd like to remove Grizzled at some point anyway. We can implement our own ( as Kafka does ) in a single class, so I don't think that it provides much benefit. I've opened SAMZA-361 for this. Can we just set up MDC at the starting time of the containers, instead of vai TaskLifecycleListener? Since the goal of assigning the AM/ContainerID information is to have the key to the logs, these information can be retrieved at the starting time of the container. +1 to this. Great idea. Your original idea about setting the TaskName (used to be partition) every time we enter a TaskInstance's code to process/window/send/commit/close would also be useful. One thing to watch out for here is performance impact of setting the MDC. My hope is that it's not a thread safe, and just a map update. We should run TestSamzaContainerPerformance, and verify that we're still getting good performance after this change.
          Hide
          closeuris Yan Fang added a comment -

          working on this ticket. planning to put the modified KafkaLog4jAppender.scala to samza-log4j. I realize this project is pure java-written now. So do you think I should add the scala nature or just go with Java ? Also, is it a right place to put this appender?

          Show
          closeuris Yan Fang added a comment - working on this ticket. planning to put the modified KafkaLog4jAppender.scala to samza-log4j. I realize this project is pure java-written now. So do you think I should add the scala nature or just go with Java ? Also, is it a right place to put this appender?
          Hide
          closeuris Yan Fang added a comment -

          I think going with java seems keeping this project neat.

          Show
          closeuris Yan Fang added a comment - I think going with java seems keeping this project neat.
          Hide
          criccomini Chris Riccomini added a comment -

          I realize this project is pure java-written now. So do you think I should add the scala nature or just go with Java ?

          Yep, that's what I was thinking. Given that the Log4J stuff is pretty isolated, I thought it'd be a good spot to start moving toward a pure-Java Samza. There was some discussion on moving toward Java 8 and away from Scala on the dev list, so I thought the Log4j module was a good place to start

          Also, is it a right place to put this appender?

          Yep, this was the intent.

          Show
          criccomini Chris Riccomini added a comment - I realize this project is pure java-written now. So do you think I should add the scala nature or just go with Java ? Yep, that's what I was thinking. Given that the Log4J stuff is pretty isolated, I thought it'd be a good spot to start moving toward a pure-Java Samza. There was some discussion on moving toward Java 8 and away from Scala on the dev list, so I thought the Log4j module was a good place to start Also, is it a right place to put this appender? Yep, this was the intent.
          Hide
          closeuris Yan Fang added a comment -

          But I am still not sure where I should put this class. samza-log4j and samza-kafka both make sense. In addition, if I put it into samza-log4j, I will need to introduce kafka dependency. If putting in samza-kafka, do not need to introduce any dependency. What do you think?

          Show
          closeuris Yan Fang added a comment - But I am still not sure where I should put this class. samza-log4j and samza-kafka both make sense. In addition, if I put it into samza-log4j, I will need to introduce kafka dependency. If putting in samza-kafka, do not need to introduce any dependency. What do you think?
          Hide
          criccomini Chris Riccomini added a comment -

          Hmm, yea. You raise a good point. It seems cleaner to put it in samza-kafka, since Kafka is by far a heavier dependency than log4j.

          Show
          criccomini Chris Riccomini added a comment - Hmm, yea. You raise a good point. It seems cleaner to put it in samza-kafka, since Kafka is by far a heavier dependency than log4j.
          Hide
          criccomini Chris Riccomini added a comment -

          The other way to implement this is not to depend directly on Kafka. Instead, you can instantiate a SystemProducer, and use that to send messages.

          This approach seems a little more generalizable, since you could have a SystemProducer for any system, and thus log streams that get sent to any system. It also breaks the kafka dependency, which means it could go into samza-log4j.

          Show
          criccomini Chris Riccomini added a comment - The other way to implement this is not to depend directly on Kafka. Instead, you can instantiate a SystemProducer, and use that to send messages. This approach seems a little more generalizable, since you could have a SystemProducer for any system, and thus log streams that get sent to any system. It also breaks the kafka dependency, which means it could go into samza-log4j.
          Hide
          cpsoman Chinmay Soman added a comment -

          Correct me if I'm wrong but if we use a SystemProducer instead of a Kafka producer, won't we lose those logs (KafkaSystemProducer.scala) from the context of the unified log stream ?

          Although I'm not sure if we necessarily care about these logs.

          Show
          cpsoman Chinmay Soman added a comment - Correct me if I'm wrong but if we use a SystemProducer instead of a Kafka producer, won't we lose those logs (KafkaSystemProducer.scala) from the context of the unified log stream ? Although I'm not sure if we necessarily care about these logs.
          Hide
          criccomini Chris Riccomini added a comment -

          won't we lose those logs (KafkaSystemProducer.scala) from the context of the unified log stream ?

          What's the unified log stream? I think I might be confused.

          Using the SystemProducer is kind of tricky anyway. We'd need access to the job's Config object from inside the appender. I don't think that we can get this easily, but I'm not too familiar with how constructors work with the Log4J appender.

          Show
          criccomini Chris Riccomini added a comment - won't we lose those logs (KafkaSystemProducer.scala) from the context of the unified log stream ? What's the unified log stream? I think I might be confused. Using the SystemProducer is kind of tricky anyway. We'd need access to the job's Config object from inside the appender. I don't think that we can get this easily, but I'm not too familiar with how constructors work with the Log4J appender.
          Hide
          cpsoman Chinmay Soman added a comment -

          What's the unified log stream? I think I might be confused.

          Oops - Apologies for not being clear. I meant the Kafka topic to which the logs are written. To me it looks like a way of getting all the logs in one place (hence unified).

          Show
          cpsoman Chinmay Soman added a comment - What's the unified log stream? I think I might be confused. Oops - Apologies for not being clear. I meant the Kafka topic to which the logs are written. To me it looks like a way of getting all the logs in one place (hence unified).
          Hide
          criccomini Chris Riccomini added a comment -

          Ah, yea. You're right, we'd lose all logs before the Kafka appender is instantiated. This would be an argument for keeping the appender as light and stable as possible. Seems to me like a straight Kafka producer is the way to go. We'll have to put it in the samza-kafka package in that case.

          Show
          criccomini Chris Riccomini added a comment - Ah, yea. You're right, we'd lose all logs before the Kafka appender is instantiated. This would be an argument for keeping the appender as light and stable as possible. Seems to me like a straight Kafka producer is the way to go. We'll have to put it in the samza-kafka package in that case.
          Hide
          closeuris Yan Fang added a comment -

          yea, accessing the job's Config from the appender seems not straightforward. In addition, since we will put the partition information, which is Kafka specific, into the appender, it may not be that useful to have a general SystemProducer here. I agree on putting it in the samza-kafka package in our use case.

          Show
          closeuris Yan Fang added a comment - yea, accessing the job's Config from the appender seems not straightforward. In addition, since we will put the partition information, which is Kafka specific, into the appender, it may not be that useful to have a general SystemProducer here. I agree on putting it in the samza-kafka package in our use case.
          Hide
          closeuris Yan Fang added a comment -

          working on this. I got a little confusion in terms of how to partition the logs

          1. what is the relation between "taskId", "taskName" and "containerId"?
          2. If my understanding is correct, one "containerId" may contains a few "taskId" and "taskName"?
          3. Then what does the "taskId" and "taskName" look like?

          If 2) is true, seems we can only partition the logs based on "containerId" because we will set MDC for one container, can not set it for each task. However, the "containerId" changes if one container fails and a new container is brought up.

          Show
          closeuris Yan Fang added a comment - working on this. I got a little confusion in terms of how to partition the logs 1. what is the relation between "taskId", "taskName" and "containerId"? 2. If my understanding is correct, one "containerId" may contains a few "taskId" and "taskName"? 3. Then what does the "taskId" and "taskName" look like? If 2) is true, seems we can only partition the logs based on " containerId " because we will set MDC for one container, can not set it for each task. However, the "containerId" changes if one container fails and a new container is brought up.
          Hide
          criccomini Chris Riccomini added a comment -

          Oof, yea, this is confusing.

          2. If my understanding is correct, one "containerId" may contains a few "taskId" and "taskName"?

          Correct. The relation between taskId and taskName is 1:1. The taskId is just an implementation detail of the YARN AM at this point. We should only need to expose taskName. I'm hoping to clean some of this up as part of SAMZA-348 when we refactor the AM into a generic job coordinator.

          If 2) is true, seems we can only partition the logs based on "containerId" because we will set MDC for one container, can not set it for each task.

          Technically, we MIGHT be able to update the MDC with a taskName every time we drop into the TaskInstance code, but then the question is what do we do with the container-level logs? In this situation you'd end up with all tasks from writing to different partitions, as well as the container. This would make it impossible to construct a single container's logs in time-order, which is the most useful thing, IMO.

          However, the "containerId" changes if one container fails and a new container is brought up.

          I think this is OK. A single container's logs will be fully ordered within a single partition. If the container restarts, the new logs might end up on another partition. This is effectively how logs work now with YARN, as well.

          Show
          criccomini Chris Riccomini added a comment - Oof, yea, this is confusing. 2. If my understanding is correct, one "containerId" may contains a few "taskId" and "taskName"? Correct. The relation between taskId and taskName is 1:1. The taskId is just an implementation detail of the YARN AM at this point. We should only need to expose taskName. I'm hoping to clean some of this up as part of SAMZA-348 when we refactor the AM into a generic job coordinator. If 2) is true, seems we can only partition the logs based on "containerId" because we will set MDC for one container, can not set it for each task. Technically, we MIGHT be able to update the MDC with a taskName every time we drop into the TaskInstance code, but then the question is what do we do with the container-level logs? In this situation you'd end up with all tasks from writing to different partitions, as well as the container. This would make it impossible to construct a single container's logs in time-order, which is the most useful thing, IMO. However, the "containerId" changes if one container fails and a new container is brought up. I think this is OK. A single container's logs will be fully ordered within a single partition. If the container restarts, the new logs might end up on another partition. This is effectively how logs work now with YARN, as well.
          Hide
          closeuris Yan Fang added a comment -

          1) aha, after exploring the code, I find actually the relation is containerId:taskId:taskName = 1:1:N. containerId == taskId. See the log below

           2014-09-25 18:58:24 SamzaAppMasterTaskManager [INFO] Claimed SSP taskNames Map(Partition 0 -> Set(SystemStreamPartition [kafka, wikipedia-raw, 0], SystemStreamPartition [kafka, wikipedia-raw2, 0]), Partition 1 -> Set(SystemStreamPartition [kafka, wikipedia-raw2, 1])) for container ID 0
           2014-09-25 18:58:24 SamzaAppMasterTaskManager [INFO] Task ID 0 using command bin/run-container.sh
           2014-09-25 18:58:24 SamzaAppMasterTaskManager [INFO] Task ID 0 using env Map(
          

          I think one of the confusion comes from the YarnConfig class where

          val TASK_COUNT = "yarn.container.count"
          

          And then in SamzaAppMasterTaskManager.scala, we use task Id as the container Id.

           info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, taskId))
          

          2) Another confusion is that, our container Id (0,1,2) is different from YARN's containerId, which is something like container_1411696645653_0001_01_000002, though they share the same name...

          I think those two should be cleaned-up in SAMZA-348. This is really confusing.

          3) After a few testing, I realize that whenever the AM restarts a new container, the taskId remains the same (this totally makes sense, though actually i do not know how yarn does this...) . So I think we can use the taskId (one container only has one taskId) as the partition key for the logs, instead of the YARN's containerId, which is not easy to map to certain partition.

          Show
          closeuris Yan Fang added a comment - 1) aha, after exploring the code, I find actually the relation is containerId:taskId:taskName = 1:1:N . containerId == taskId. See the log below 2014-09-25 18:58:24 SamzaAppMasterTaskManager [INFO] Claimed SSP taskNames Map(Partition 0 -> Set(SystemStreamPartition [kafka, wikipedia-raw, 0], SystemStreamPartition [kafka, wikipedia-raw2, 0]), Partition 1 -> Set(SystemStreamPartition [kafka, wikipedia-raw2, 1])) for container ID 0 2014-09-25 18:58:24 SamzaAppMasterTaskManager [INFO] Task ID 0 using command bin/run-container.sh 2014-09-25 18:58:24 SamzaAppMasterTaskManager [INFO] Task ID 0 using env Map( I think one of the confusion comes from the YarnConfig class where val TASK_COUNT = "yarn.container.count" And then in SamzaAppMasterTaskManager.scala, we use task Id as the container Id. info( "Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, taskId)) 2) Another confusion is that, our container Id (0,1,2) is different from YARN's containerId, which is something like container_1411696645653_0001_01_000002 , though they share the same name... I think those two should be cleaned-up in SAMZA-348 . This is really confusing. 3) After a few testing, I realize that whenever the AM restarts a new container, the taskId remains the same (this totally makes sense, though actually i do not know how yarn does this...) . So I think we can use the taskId (one container only has one taskId) as the partition key for the logs, instead of the YARN's containerId, which is not easy to map to certain partition.
          Hide
          criccomini Chris Riccomini added a comment -

          This is really confusing.

          Agreed. Sorry about this. The task/container stuff is really old (2+ years), and we hadn't solidified the naming scheme at that point. Clearly, I don't remember it either.

          So I think we can use the taskId.

          Yea, I think you're right. If the taskId is static between restarts, then it is the right candidate for use as a partition key.

          Show
          criccomini Chris Riccomini added a comment - This is really confusing. Agreed. Sorry about this. The task/container stuff is really old (2+ years), and we hadn't solidified the naming scheme at that point. Clearly, I don't remember it either. So I think we can use the taskId. Yea, I think you're right. If the taskId is static between restarts, then it is the right candidate for use as a partition key.
          Hide
          closeuris Yan Fang added a comment -

          RB: https://reviews.apache.org/r/26695/

          1. modify based on KafkaLog4jAppender of Kafka project
          2. use task id as the key for the partition
          3. do not use MDC eventually. Instead, set a environment variable called Task Id. Use this directly as the key for the logs. That is because, if we use MDC, we still need to set the task id as the environment variable to pass it from AM to containers. It's better to directly use this environment variable.
          4. need to specify the number of partitions and topic name in log4j.xml
          Example of log4j configuration:

          <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
          <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
            <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
               <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
               <param name="DatePattern" value="'.'yyyy-MM-dd" />
               <layout class="org.apache.log4j.PatternLayout">
                <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
               </layout>
            </appender>
            <appender name="KafkaAppender" class="org.apache.samza.util.KafkaLog4jAppender">
              <param name="Topic" value="log4j-test-2"/>
              <param name="BrokerList" value="localhost:9092"/>
              <param name="Zookeeper" value="localhost:2181" />
              <!-- this number should be (the number of containers) + 1. 1 is for AM-->
              <param name="PartitionNumber" value="3" />
               <layout class="org.apache.log4j.PatternLayout">
                <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
               </layout>
            </appender>
            <logger name="org.apache.samza">
              <level value="info" />
              <appender-ref ref="KafkaAppender" />
            </logger>
            <root>
              <priority value="debug" />
              <appender-ref ref="RollingAppender"/>
            </root>
          </log4j:configuration>
          

          Tested in local. Will test in cluster and add the documents.

          Show
          closeuris Yan Fang added a comment - RB: https://reviews.apache.org/r/26695/ 1. modify based on KafkaLog4jAppender of Kafka project 2. use task id as the key for the partition 3. do not use MDC eventually. Instead, set a environment variable called Task Id. Use this directly as the key for the logs. That is because, if we use MDC, we still need to set the task id as the environment variable to pass it from AM to containers. It's better to directly use this environment variable. 4. need to specify the number of partitions and topic name in log4j.xml Example of log4j configuration: <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" > <log4j:configuration xmlns:log4j= "http: //jakarta.apache.org/log4j/" > <appender name= "RollingAppender" class= "org.apache.log4j.DailyRollingFileAppender" > <param name= "File" value= "${samza.log.dir}/${samza.container.name}.log" /> <param name= "DatePattern" value= "'.'yyyy-MM-dd" /> <layout class= "org.apache.log4j.PatternLayout" > <param name= "ConversionPattern" value= "%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" /> </layout> </appender> <appender name= "KafkaAppender" class= "org.apache.samza.util.KafkaLog4jAppender" > <param name= "Topic" value= "log4j-test-2" /> <param name= "BrokerList" value= "localhost:9092" /> <param name= "Zookeeper" value= "localhost:2181" /> <!-- this number should be (the number of containers) + 1. 1 is for AM--> <param name= "PartitionNumber" value= "3" /> <layout class= "org.apache.log4j.PatternLayout" > <param name= "ConversionPattern" value= "%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" /> </layout> </appender> <logger name= "org.apache.samza" > <level value= "info" /> <appender-ref ref= "KafkaAppender" /> </logger> <root> <priority value= "debug" /> <appender-ref ref= "RollingAppender" /> </root> </log4j:configuration> Tested in local. Will test in cluster and add the documents.
          Hide
          closeuris Yan Fang added a comment -

          RB: https://reviews.apache.org/r/26695/

          1. updated a createTopicIfNotExist method.
          2. added docs.

          Show
          closeuris Yan Fang added a comment - RB: https://reviews.apache.org/r/26695/ 1. updated a createTopicIfNotExist method. 2. added docs.
          Hide
          criccomini Chris Riccomini added a comment -

          do not use MDC eventually. Instead, set a environment variable called Task Id. Use this directly as the key for the logs. That is because, if we use MDC, we still need to set the task id as the environment variable to pass it from AM to containers. It's better to directly use this environment variable.

          I think we should use the MDC. There are three reasons for this:

          1. Using environment variables everywhere feels kind of like a global variable. This is just a matter of personal taste, but it bugs me.
          2. If we use MDC, I think we don't need to specify partition number and topic name in the log4j.xml (see below).
          3. If we use MDC, the ConversionPattern can refer to variables such as job name, task ID, etc in the log lines. This lets you print things like %X {jobName}

            %X

            {taskId}

            , which could be useful.

          need to specify the number of partitions and topic name in log4j.xml

          If we use the MDC, I think the SamzaContainer (and Samza AM) can set the MDC with good defaults here. The AM knows the total number of containers (config.getTaskCount) and both the AM and SamzaContainer know the job name (config.getName). If the AM were to pass the container count (sadly, named task count right now), via an environment variable then the SamzaContainer could set all of this via the MDC, and the appender could use the proper partition count, and have a sane default topic name (e.g. __samza-<job name>-logs, or something).

          This whole task/container mix-up is really confusing. I've opened up SAMZA-433 to track renaming TASK_ID to CONTAINER_ID everywhere, and update YarnConfig accordingly.

          Show
          criccomini Chris Riccomini added a comment - do not use MDC eventually. Instead, set a environment variable called Task Id. Use this directly as the key for the logs. That is because, if we use MDC, we still need to set the task id as the environment variable to pass it from AM to containers. It's better to directly use this environment variable. I think we should use the MDC. There are three reasons for this: Using environment variables everywhere feels kind of like a global variable. This is just a matter of personal taste, but it bugs me. If we use MDC, I think we don't need to specify partition number and topic name in the log4j.xml (see below). If we use MDC, the ConversionPattern can refer to variables such as job name, task ID, etc in the log lines. This lets you print things like %X {jobName} %X {taskId} , which could be useful. need to specify the number of partitions and topic name in log4j.xml If we use the MDC, I think the SamzaContainer (and Samza AM) can set the MDC with good defaults here. The AM knows the total number of containers (config.getTaskCount) and both the AM and SamzaContainer know the job name (config.getName). If the AM were to pass the container count (sadly, named task count right now), via an environment variable then the SamzaContainer could set all of this via the MDC, and the appender could use the proper partition count, and have a sane default topic name (e.g. __samza-<job name>-logs, or something). This whole task/container mix-up is really confusing. I've opened up SAMZA-433 to track renaming TASK_ID to CONTAINER_ID everywhere, and update YarnConfig accordingly.
          Hide
          closeuris Yan Fang added a comment -

          If we use MDC, the ConversionPattern can refer to variables such as job name, task ID, etc in the log lines.

          yes, agree. This is the big benefit MDC can provide.

          have a sane default topic name (e.g. __samza-<job name>-logs, or something)

          yeah, it's good to have a default topic. I would also like to allow users to set their own topic name. Maybe they want to publish logs from different jobs to the same topic name, thought it could be rare.

          The AM knows the total number of containers (config.getTaskCount) and both the AM and SamzaContainer know the job name (config.getName). If the AM were to pass the container count (sadly, named task count right now), via an environment variable

          Do you mean, we pass the environment variables CONTAINER_COUNT, CONTAINER_ID (TASK_ID)? CONTAINER_COUNT seems a little awkward because it has nothing to do with the "environment" thought it's really straightforward. Maybe we parse a CONTAINER_ID as the format of , say, "0/7, 1/7, 2/7" ? Numerator is the id and the denominator is the container count. However, this approach makes the CONTAINER_ID not intuitive and leaves the appender to parse the CONTAINER_ID to get the total count.

          When we use the MDC, we want to make this optional from the performance perspective, right? (haven't done the performance test because I did not use MDC. Now a simple performance test is needed.) So my questions are:

          1) what is an appropriate property name? such as job.enable.mdc=true?
          2) how do we pass this property? a) we read the config in AM, and set an environment variable (again, sadly, environment variable...) to let containers know they should set MDC. b) AM and containers all read the config. But then name fashion of job.*** is not appropriate.

          Show
          closeuris Yan Fang added a comment - If we use MDC, the ConversionPattern can refer to variables such as job name, task ID, etc in the log lines. yes, agree. This is the big benefit MDC can provide. have a sane default topic name (e.g. __samza-<job name>-logs, or something) yeah, it's good to have a default topic. I would also like to allow users to set their own topic name. Maybe they want to publish logs from different jobs to the same topic name, thought it could be rare. The AM knows the total number of containers (config.getTaskCount) and both the AM and SamzaContainer know the job name (config.getName). If the AM were to pass the container count (sadly, named task count right now), via an environment variable Do you mean, we pass the environment variables CONTAINER_COUNT, CONTAINER_ID (TASK_ID)? CONTAINER_COUNT seems a little awkward because it has nothing to do with the "environment" thought it's really straightforward. Maybe we parse a CONTAINER_ID as the format of , say, "0/7, 1/7, 2/7" ? Numerator is the id and the denominator is the container count. However, this approach makes the CONTAINER_ID not intuitive and leaves the appender to parse the CONTAINER_ID to get the total count. When we use the MDC, we want to make this optional from the performance perspective, right? (haven't done the performance test because I did not use MDC. Now a simple performance test is needed.) So my questions are: 1) what is an appropriate property name? such as job.enable.mdc=true? 2) how do we pass this property? a) we read the config in AM, and set an environment variable (again, sadly, environment variable...) to let containers know they should set MDC. b) AM and containers all read the config. But then name fashion of job.*** is not appropriate.
          Hide
          criccomini Chris Riccomini added a comment -

          yeah, it's good to have a default topic. I would also like to allow users to set their own topic name. Maybe they want to publish logs from different jobs to the same topic name, thought it could be rare.

          Totally. I think we could keep the config, but I just want it to "do the right thing" in cases where users don't manually specify a topic.

          Do you mean, we pass the environment variables CONTAINER_COUNT, CONTAINER_ID (TASK_ID)?

          That's what I was thinking.

          CONTAINER_COUNT seems a little awkward because it has nothing to do with the "environment" thought it's really straightforward.

          Agreed. IMO, the best fix for this is to have it be part of what the AM exposes as part of the HTTP JSON server discussed in SAMZA-348. In the meantime, though, I the environment variable is really the only mechanism that we have to pass data between the AM and its containers.

          When we use the MDC, we want to make this optional from the performance perspective, right?

          I don't think so. I think it should just be always on. These MDC settings only need to be set at container start time, right? From then on, it should be reads. It looks like SLF4J's MDC put/get calls are just wrapping Log4J directly, which is a put on a thread local map. If we wanted to really optimize this, we could have the Kafka appender grab the MDC values once, and then never call the MDC again (to avoid thread locals).

          Also, as far as performance test goes, you can run TestSamzaContainerPerformance.scala if you're worried about performance.

          Show
          criccomini Chris Riccomini added a comment - yeah, it's good to have a default topic. I would also like to allow users to set their own topic name. Maybe they want to publish logs from different jobs to the same topic name, thought it could be rare. Totally. I think we could keep the config, but I just want it to "do the right thing" in cases where users don't manually specify a topic. Do you mean, we pass the environment variables CONTAINER_COUNT, CONTAINER_ID (TASK_ID)? That's what I was thinking. CONTAINER_COUNT seems a little awkward because it has nothing to do with the "environment" thought it's really straightforward. Agreed. IMO, the best fix for this is to have it be part of what the AM exposes as part of the HTTP JSON server discussed in SAMZA-348 . In the meantime, though, I the environment variable is really the only mechanism that we have to pass data between the AM and its containers. When we use the MDC, we want to make this optional from the performance perspective, right? I don't think so. I think it should just be always on. These MDC settings only need to be set at container start time, right? From then on, it should be reads. It looks like SLF4J's MDC put/get calls are just wrapping Log4J directly , which is a put on a thread local map . If we wanted to really optimize this, we could have the Kafka appender grab the MDC values once, and then never call the MDC again (to avoid thread locals). Also, as far as performance test goes, you can run TestSamzaContainerPerformance.scala if you're worried about performance.
          Hide
          martinkl Martin Kleppmann added a comment -

          Nice work – I'm glad to see this is happening!

          However, I'm not really keen on how the Kafka configuration has to be duplicated in the log4j config as well as the job config. As the log4j config will be baked into the zip file that is deployed to a cluster, any change of the log4j config would require the job package to be rebuilt. It would also make life difficult for anyone who wants to integrate with an external configuration management system (e.g. LinkedIn has a centralised config system for things like Kafka/ZK hostnames, and it deliberately separates deployable artifacts from their configuration).

          I understand the problem that the log appender can't access the job config, but to me that doesn't feel like a good enough reason to burden users with duplicating config. Could we arrange it so that the AM passes any necessary parameters to containers via environment variables or Java system properties? Then users only need to include them once, in the job config.

          Or perhaps the Log4j appender could just use the job config as passed in the environment variable? (like when the container starts up: val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(ShellCommandConfig.ENV_CONFIG)))) — It's not beautiful to parse the config twice, but I think it's better than asking the user to configure in two different places.

          Show
          martinkl Martin Kleppmann added a comment - Nice work – I'm glad to see this is happening! However, I'm not really keen on how the Kafka configuration has to be duplicated in the log4j config as well as the job config. As the log4j config will be baked into the zip file that is deployed to a cluster, any change of the log4j config would require the job package to be rebuilt. It would also make life difficult for anyone who wants to integrate with an external configuration management system (e.g. LinkedIn has a centralised config system for things like Kafka/ZK hostnames, and it deliberately separates deployable artifacts from their configuration). I understand the problem that the log appender can't access the job config, but to me that doesn't feel like a good enough reason to burden users with duplicating config. Could we arrange it so that the AM passes any necessary parameters to containers via environment variables or Java system properties? Then users only need to include them once, in the job config. Or perhaps the Log4j appender could just use the job config as passed in the environment variable? (like when the container starts up : val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(ShellCommandConfig.ENV_CONFIG))) ) — It's not beautiful to parse the config twice, but I think it's better than asking the user to configure in two different places.
          Hide
          closeuris Yan Fang added a comment -

          However, I'm not really keen on how the Kafka configuration has to be duplicated in the log4j config as well as the job config.

          I agree. This is not user-friendly. But when users use different system for the job and logging, then they have to specify the kafka information in the log4j.xml. Well, I assume this is rare. A more common use case is using the same kafka server for the Samza job and logging. So will try to have the log4j appender to read the config.

          Show
          closeuris Yan Fang added a comment - However, I'm not really keen on how the Kafka configuration has to be duplicated in the log4j config as well as the job config. I agree. This is not user-friendly. But when users use different system for the job and logging, then they have to specify the kafka information in the log4j.xml. Well, I assume this is rare. A more common use case is using the same kafka server for the Samza job and logging. So will try to have the log4j appender to read the config.
          Hide
          closeuris Yan Fang added a comment -

          After second thought, there are a few problems when we get kafka information from the config:

          1) The AM also triggers the kafkaLog4jAppender. So before it reads config, it does not have any knowledge about the config at all. Basically, this means, the log4j.xml should be read even before the config file is read.

          2) When containers read the config from the environment variable, they can not tell which system to produce messages to.
          a) we can not read "systems.kafka.producer.metadata.broker.list" because "kafka" may not be the system name though it is Kafka system.
          b) we can not read "*.producer.metadata.broker.list" because if any other systems use the same property name, we will get the unexpected result.
          c) if there are two kafka systems configed in the config file, can not determine which one to use for logging.

          Show
          closeuris Yan Fang added a comment - After second thought, there are a few problems when we get kafka information from the config: 1) The AM also triggers the kafkaLog4jAppender. So before it reads config, it does not have any knowledge about the config at all. Basically, this means, the log4j.xml should be read even before the config file is read. 2) When containers read the config from the environment variable, they can not tell which system to produce messages to. a) we can not read "systems.kafka.producer.metadata.broker.list" because "kafka" may not be the system name though it is Kafka system. b) we can not read "*.producer.metadata.broker.list" because if any other systems use the same property name, we will get the unexpected result. c) if there are two kafka systems configed in the config file, can not determine which one to use for logging.
          Hide
          closeuris Yan Fang added a comment -

          I think we could keep the config, but I just want it to "do the right thing" in cases where users don't manually specify a topic

          The similar reason in the previous comment (1) – the AM does not have the knowledge about the config when it starts, so we can not put the job name when creating the topic. The process in AM is: initial > trigger log -> create topic -> send logs -> start reading config. So can not set the default topic as "__samza<job name>-logs".

          Show
          closeuris Yan Fang added a comment - I think we could keep the config, but I just want it to "do the right thing" in cases where users don't manually specify a topic The similar reason in the previous comment (1) – the AM does not have the knowledge about the config when it starts, so we can not put the job name when creating the topic. The process in AM is: initial > trigger log -> create topic -> send logs -> start reading config. So can not set the default topic as "__samza <job name>-logs".
          Hide
          martinkl Martin Kleppmann added a comment -

          The AM also triggers the kafkaLog4jAppender. So before it reads config, it does not have any knowledge about the config at all. Basically, this means, the log4j.xml should be read even before the config file is read.

          I believe the AM also receives the config in a SAMZA_CONFIG environment variable, does it not? I don't understand the problem here.

          When containers read the config from the environment variable, they can not tell which system to produce messages to.

          We could introduce a new property in the configuration that tells the Log4j appender which system to use, e.g. job.log.system=kafka (analogous to task.checkpoint.system) or job.log.stream=kafka.my-log-topic (analogous to stores.*.changelog). Also, since this setup would have full access to the config, we could use a regular SystemProducer rather than tying the Log4j appender to Kafka.

          Show
          martinkl Martin Kleppmann added a comment - The AM also triggers the kafkaLog4jAppender. So before it reads config, it does not have any knowledge about the config at all. Basically, this means, the log4j.xml should be read even before the config file is read. I believe the AM also receives the config in a SAMZA_CONFIG environment variable, does it not? I don't understand the problem here. When containers read the config from the environment variable, they can not tell which system to produce messages to. We could introduce a new property in the configuration that tells the Log4j appender which system to use, e.g. job.log.system=kafka (analogous to task.checkpoint.system ) or job.log.stream=kafka.my-log-topic (analogous to stores.*.changelog ). Also, since this setup would have full access to the config, we could use a regular SystemProducer rather than tying the Log4j appender to Kafka.
          Hide
          closeuris Yan Fang added a comment -

          I believe the AM also receives the config in a SAMZA_CONFIG environment variable, does it not? I don't understand the problem here.

          Oh, yes ! I misunderstood this part. Now it should work.

          Also, since this setup would have full access to the config, we could use a regular SystemProducer rather than tying the Log4j appender to Kafka.

          Then it becomes a little tricky. Like Chinmay mentions in the previous comment, we will lose the logs in the SystemProducer (such as KafkaSystemProducer).

          Show
          closeuris Yan Fang added a comment - I believe the AM also receives the config in a SAMZA_CONFIG environment variable, does it not? I don't understand the problem here. Oh, yes ! I misunderstood this part. Now it should work. Also, since this setup would have full access to the config, we could use a regular SystemProducer rather than tying the Log4j appender to Kafka. Then it becomes a little tricky. Like Chinmay mentions in the previous comment , we will lose the logs in the SystemProducer (such as KafkaSystemProducer).
          Hide
          criccomini Chris Riccomini added a comment -

          Like Chinmay mentions in the previous comment, we will lose the logs in the SystemProducer (such as KafkaSystemProducer).

          This might be OK if the trade-off is that we can use this appender for any system. In addition, they're not totally lost--they're still on disk (assuming you still have a file-based appender).

          One question that I have is whether this would introduce an infinite loop. If SystemProducer.send() calls info(), which triggers a SystemProducer.send(), then it seems like we'd have an infinite loop. :/

          Show
          criccomini Chris Riccomini added a comment - Like Chinmay mentions in the previous comment, we will lose the logs in the SystemProducer (such as KafkaSystemProducer). This might be OK if the trade-off is that we can use this appender for any system. In addition, they're not totally lost--they're still on disk (assuming you still have a file-based appender). One question that I have is whether this would introduce an infinite loop. If SystemProducer.send() calls info(), which triggers a SystemProducer.send(), then it seems like we'd have an infinite loop. :/
          Hide
          criccomini Chris Riccomini added a comment -

          Thinking on this more, I advocate for cleaning up what we've got with the Kafka appender (Martin's feedback--the config stuff). If we want to pursue a generic SystemProducer approach, it's pretty much a re-write. This is fine, but I think it should go in a separate ticket. A functional Kafka appender is pretty useful, and we can roll it into the 0.8 release.

          Thoughts?

          Show
          criccomini Chris Riccomini added a comment - Thinking on this more, I advocate for cleaning up what we've got with the Kafka appender (Martin's feedback--the config stuff). If we want to pursue a generic SystemProducer approach, it's pretty much a re-write. This is fine, but I think it should go in a separate ticket. A functional Kafka appender is pretty useful, and we can roll it into the 0.8 release. Thoughts?
          Hide
          closeuris Yan Fang added a comment -

          If SystemProducer.send() calls info(), which triggers a SystemProducer.send(), then it seems like we'd have an infinite loop. :/

          haha, yes, this is true. Even in Kafka, it will have the infinite loop too. So the way to get around this problem is 1) exclude this class in the log config 2) set the log level higher, then the info() will not be called...

          I think it should go in a separate ticket. A functional Kafka appender is pretty useful, and we can roll it into the 0.8 release.

          I agree on this. Since I am almost done with this, just need some tests. Prefer to have a separate ticket for a generic SystemProducer.

          Show
          closeuris Yan Fang added a comment - If SystemProducer.send() calls info(), which triggers a SystemProducer.send(), then it seems like we'd have an infinite loop. :/ haha, yes, this is true. Even in Kafka, it will have the infinite loop too. So the way to get around this problem is 1) exclude this class in the log config 2) set the log level higher, then the info() will not be called... I think it should go in a separate ticket. A functional Kafka appender is pretty useful, and we can roll it into the 0.8 release. I agree on this. Since I am almost done with this, just need some tests. Prefer to have a separate ticket for a generic SystemProducer.
          Hide
          criccomini Chris Riccomini added a comment -

          Even in Kafka, it will have the infinite loop too.

          At least with Kafka, we know the packages/classes to suppress. Can this be done programmatically? With SystemProducer, since it's pluggable, it's kind of hard to know what needs to be suppressed--the user configuring the producer would be the only one who knows. How is this handled on the OS Kafka appender?

          Show
          criccomini Chris Riccomini added a comment - Even in Kafka, it will have the infinite loop too. At least with Kafka, we know the packages/classes to suppress. Can this be done programmatically? With SystemProducer, since it's pluggable, it's kind of hard to know what needs to be suppressed--the user configuring the producer would be the only one who knows. How is this handled on the OS Kafka appender?
          Hide
          martinkl Martin Kleppmann added a comment -

          I think it should go in a separate ticket. A functional Kafka appender is pretty useful, and we can roll it into the 0.8 release.

          Fine with me, as long as we don't mind breaking compatibility in future when we release a revamped log appender.

          Even in Kafka, it will have the infinite loop too.

          On another system, where I solved the same problem in the past, I took the following approach: for the duration of the call to SystemProducer.send, set a thread-local variable which allows you to detect if it's being called recursively. That avoids the need to hard-code package names to filter out. Code would look roughly like this:

          class Appender {
            private final ThreadLocal<Boolean> recursiveCall =
              new ThreadLocal<Boolean>() {
                @Override protected Boolean initialValue() { return false; }
              };
          
            public void append(LoggingEvent event) {
              if (!recursiveCall.get()) {
                try {
                  recursiveCall.set(true);
                  // do the actual logging
                } finally {
                  recursiveCall.set(false);
                }
              }
            }
          }
          
          Show
          martinkl Martin Kleppmann added a comment - I think it should go in a separate ticket. A functional Kafka appender is pretty useful, and we can roll it into the 0.8 release. Fine with me, as long as we don't mind breaking compatibility in future when we release a revamped log appender. Even in Kafka, it will have the infinite loop too. On another system, where I solved the same problem in the past, I took the following approach: for the duration of the call to SystemProducer.send, set a thread-local variable which allows you to detect if it's being called recursively. That avoids the need to hard-code package names to filter out. Code would look roughly like this: class Appender { private final ThreadLocal< Boolean > recursiveCall = new ThreadLocal< Boolean >() { @Override protected Boolean initialValue() { return false ; } }; public void append(LoggingEvent event) { if (!recursiveCall.get()) { try { recursiveCall.set( true ); // do the actual logging } finally { recursiveCall.set( false ); } } } }
          Hide
          criccomini Chris Riccomini added a comment -

          Fine with me, as long as we don't mind breaking compatibility in future when we release a revamped log appender.

          Yea, I was thinking about this more last night. I think the compatibility issue is going to be a problem for us. I wouldn't feel comfortable rolling this out, and knowing we'll have to modify everyone's log4j files in a few months to update their appender. It seems like it might be better to "do the right thing" here. Unfortunately, that means changing the appender pretty significantly.

          for the duration of the call to SystemProducer.send, set a thread-local variable which allows you to detect if it's being called recursively.

          This is a great solution. I'm slightly worried about introducing a thread local variable in the logging path. We've been bitten by ThreadLocal variable performance in Avro pretty horribly in the past. This concern isn't enough for me to say that we shouldn't do it, though. This solution is quite elegant.

          So it sounds like we'd need to:

          1. Move the appender to samza-log4j.
          2. Get config out of environment variable.
          3. Convert appender to use SystemProducer instead of Kafka's Producer.
          4. Add a thread local variable check in append to eliminate infinite loops.
          5. Write some tests.
          6. Update the docs.

          Yan Fang, I'm sorry about all the churn here. If you're burnt out on this ticket, just say the word.

          Show
          criccomini Chris Riccomini added a comment - Fine with me, as long as we don't mind breaking compatibility in future when we release a revamped log appender. Yea, I was thinking about this more last night. I think the compatibility issue is going to be a problem for us. I wouldn't feel comfortable rolling this out, and knowing we'll have to modify everyone's log4j files in a few months to update their appender. It seems like it might be better to "do the right thing" here. Unfortunately, that means changing the appender pretty significantly. for the duration of the call to SystemProducer.send, set a thread-local variable which allows you to detect if it's being called recursively. This is a great solution. I'm slightly worried about introducing a thread local variable in the logging path. We've been bitten by ThreadLocal variable performance in Avro pretty horribly in the past. This concern isn't enough for me to say that we shouldn't do it, though. This solution is quite elegant. So it sounds like we'd need to: Move the appender to samza-log4j. Get config out of environment variable. Convert appender to use SystemProducer instead of Kafka's Producer. Add a thread local variable check in append to eliminate infinite loops. Write some tests. Update the docs. Yan Fang , I'm sorry about all the churn here. If you're burnt out on this ticket, just say the word.
          Hide
          martinkl Martin Kleppmann added a comment -

          It seems like it might be better to "do the right thing" here.

          +1. Sorry Yan, we should have put more thought into it before you started implementing it.

          Move the appender to samza-log4j.

          Minor point: perhaps the log appender should be part of samza-yarn? Is the fact that the config appears in an environment variable called SAMZA_CONFIG quite specific to our use of YARN, and is the move of config to a HTTP endpoint (SAMZA-438) YARN-specific? OTOH, if other cluster managers (Mesos, SAMZA-375) are going to use the same config-passing mechanism, then samza-log4j is probably the right place.

          Show
          martinkl Martin Kleppmann added a comment - It seems like it might be better to "do the right thing" here. +1. Sorry Yan, we should have put more thought into it before you started implementing it. Move the appender to samza-log4j. Minor point: perhaps the log appender should be part of samza-yarn? Is the fact that the config appears in an environment variable called SAMZA_CONFIG quite specific to our use of YARN, and is the move of config to a HTTP endpoint ( SAMZA-438 ) YARN-specific? OTOH, if other cluster managers (Mesos, SAMZA-375 ) are going to use the same config-passing mechanism, then samza-log4j is probably the right place.
          Hide
          criccomini Chris Riccomini added a comment -

          Is the fact that the config appears in an environment variable called SAMZA_CONFIG quite specific to our use of YARN, and is the move of config to a HTTP endpoint (SAMZA-438) YARN-specific

          Nope, it's not YARN-specific. SAMZA_CONFIG is used by YARN, ProcessJob, and ThreadJob. Mesos would use it as well. Same deal for SAMZA-438.

          if other cluster managers (Mesos, SAMZA-375) are going to use the same config-passing mechanism, then samza-log4j is probably the right place

          The plan is to have a single config-passing scheme abstracted into a single job coordinator, which all cluster managers will use. I think samza-log4j is the right place for it. After SAMZA-438, the config passing will be via HTTP/JSON rather than environment variable, but the appender will be able to get the URL via an environment variable, and grab the HTTP/JSON config that way, so the pattern will remain the same.

          Show
          criccomini Chris Riccomini added a comment - Is the fact that the config appears in an environment variable called SAMZA_CONFIG quite specific to our use of YARN, and is the move of config to a HTTP endpoint ( SAMZA-438 ) YARN-specific Nope, it's not YARN-specific. SAMZA_CONFIG is used by YARN, ProcessJob, and ThreadJob. Mesos would use it as well. Same deal for SAMZA-438 . if other cluster managers (Mesos, SAMZA-375 ) are going to use the same config-passing mechanism, then samza-log4j is probably the right place The plan is to have a single config-passing scheme abstracted into a single job coordinator, which all cluster managers will use. I think samza-log4j is the right place for it. After SAMZA-438 , the config passing will be via HTTP/JSON rather than environment variable, but the appender will be able to get the URL via an environment variable, and grab the HTTP/JSON config that way, so the pattern will remain the same.
          Hide
          closeuris Yan Fang added a comment -

          No problem for me. It's always better to think through the feature rather than introducing some buggy feature.

          I think the compatibility issue is going to be a problem for us. I wouldn't feel comfortable rolling this out, and knowing we'll have to modify everyone's log4j files in a few months to update their appender.

          What is exact problem here? I am kind of lost. IMO, users can still use the KafkaLog4jAppender even when we have a new appender which uses SystemProducer to send messages. Why do they need to modify the log4j files?

          Show
          closeuris Yan Fang added a comment - No problem for me. It's always better to think through the feature rather than introducing some buggy feature. I think the compatibility issue is going to be a problem for us. I wouldn't feel comfortable rolling this out, and knowing we'll have to modify everyone's log4j files in a few months to update their appender. What is exact problem here? I am kind of lost. IMO, users can still use the KafkaLog4jAppender even when we have a new appender which uses SystemProducer to send messages. Why do they need to modify the log4j files?
          Hide
          criccomini Chris Riccomini added a comment -

          IMO, users can still use the KafkaLog4jAppender even when we have a new appender which uses SystemProducer to send messages

          If we have a SystemProducer appender, then there's no need for the kafka-specific one, I think. So, I was assuming we'd end up removing the Kafka one. This removal would break the log4j.xml files that people have. It'd be nice not to have to keep the Kafka one around just for backwards compatibility, if we can avoid all of this and just write the SystemProducer one instead.

          Show
          criccomini Chris Riccomini added a comment - IMO, users can still use the KafkaLog4jAppender even when we have a new appender which uses SystemProducer to send messages If we have a SystemProducer appender, then there's no need for the kafka-specific one, I think. So, I was assuming we'd end up removing the Kafka one. This removal would break the log4j.xml files that people have. It'd be nice not to have to keep the Kafka one around just for backwards compatibility, if we can avoid all of this and just write the SystemProducer one instead.
          Hide
          closeuris Yan Fang added a comment -

          yeah,that makes sense. Thank you.

          Show
          closeuris Yan Fang added a comment - yeah,that makes sense. Thank you.
          Hide
          criccomini Chris Riccomini added a comment -

          Oops, pressed 'i' on a search screen, and this ticket got assigned to me. Assigning back to Yan (unless you don't want it anymore, Yan).

          Show
          criccomini Chris Riccomini added a comment - Oops, pressed 'i' on a search screen, and this ticket got assigned to me. Assigning back to Yan (unless you don't want it anymore, Yan).
          Hide
          cpsoman Chinmay Soman added a comment -

          Hey Yan,

          Lemme know if you need any help with this. If there's any way to break this up into smaller pieces where others could help - lemme know. I'd be more than happy to contribute.

          Show
          cpsoman Chinmay Soman added a comment - Hey Yan, Lemme know if you need any help with this. If there's any way to break this up into smaller pieces where others could help - lemme know. I'd be more than happy to contribute.
          Hide
          closeuris Yan Fang added a comment -

          Hi Chinmay Soman , thanks. I think I am getting there. Just did not have enough time to finish it. Will try to get it done by Thursday, giving us some time to review.

          Show
          closeuris Yan Fang added a comment - Hi Chinmay Soman , thanks. I think I am getting there. Just did not have enough time to finish it. Will try to get it done by Thursday, giving us some time to review.
          Hide
          cpsoman Chinmay Soman added a comment -

          Yan Fang No rush at all ! Take your time.

          Show
          cpsoman Chinmay Soman added a comment - Yan Fang No rush at all ! Take your time.
          Hide
          closeuris Yan Fang added a comment -

          New RB: https://reviews.apache.org/r/28035/

          1. Use SystemProducer instead of Kafka
          2. Move to samza-log4j project
          3. Get config from Environment variable (AM) or the URL (containers)
          4. Add the threadLocal to eliminate infinite loop

          New implementation has at least one limitation:
          Since it's a general implementation, if the user chooses to use kafka as the system, he has to create the topic with the appropriate partitions manually.

          TODO:
          update the "task.log4j.system" in configuration.html

          Thanks.

          Show
          closeuris Yan Fang added a comment - New RB: https://reviews.apache.org/r/28035/ 1. Use SystemProducer instead of Kafka 2. Move to samza-log4j project 3. Get config from Environment variable (AM) or the URL (containers) 4. Add the threadLocal to eliminate infinite loop New implementation has at least one limitation: Since it's a general implementation, if the user chooses to use kafka as the system, he has to create the topic with the appropriate partitions manually. TODO: update the "task.log4j.system" in configuration.html Thanks.
          Hide
          closeuris Yan Fang added a comment -

          Updated the patch https://reviews.apache.org/r/28035/ according to Chris' comments.

          Thank you.

          Only issue I am still thinking is that, when the user is using Kafka, how can we automatically create the topic with correct partitions for them. Actually the coordinatorStream, changeLogStream and checkpointStream all have this need, but they are solved in the SystemAdmin level. While the SystemProducerAppender only uses the SystemProducer, it can not be solved with the same way.

          Show
          closeuris Yan Fang added a comment - Updated the patch https://reviews.apache.org/r/28035/ according to Chris' comments. Thank you. Only issue I am still thinking is that, when the user is using Kafka, how can we automatically create the topic with correct partitions for them. Actually the coordinatorStream, changeLogStream and checkpointStream all have this need, but they are solved in the SystemAdmin level. While the SystemProducerAppender only uses the SystemProducer, it can not be solved with the same way.
          Hide
          martinkl Martin Kleppmann added a comment -

          Nice work! I added some comments on the RB. I also tried adding this appender to hello-samza, and running the wikipedia-feed job. It partially worked:

          • AM logs appeared in Kafka, except that the first 35 or so lines of log were missing. Do you know why that would be? Is there any way we can capture logs right from the start (buffering them if necessary if the producer is not yet connected)?
          • When I looked at the output of kafka-console-consumer, the lines of the AM logs appeared in a different order from how they appeared in the regular log file. Any idea why? I thought using the container name as message key should make all the messages go to the same partition, and thus preserve their order.
          • Logs from the other container (non-AM) did not appear in Kafka. The following error appeared on the container's stdout:
          log4j:ERROR Could not create an Appender. Reported error follows.
          org.apache.samza.SamzaException: can not read the config from {"config":{"systems.kafka.consumer.zookeeper.connect":"localhost:2181/","systems.kafka.samza.factory":"org.apache.samza.system.kafka.KafkaSystemFactory","systems.wikipedia.port":"6667","task.inputs":"wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews","systems.wikipedia.host":"irc.wikimedia.org","systems.kafka.producer.producer.type":"sync","yarn.package.path":"file:///Users/martin/dev/samza/hello-samza/target/hello-samza-0.8.0-dist.tar.gz","job.factory.class":"org.apache.samza.job.yarn.YarnJobFactory","systems.kafka.producer.metadata.broker.list":"localhost:9092","task.class":"samza.examples.wikipedia.task.WikipediaFeedStreamTask","systems.kafka.producer.batch.num.messages":"1","systems.kafka.samza.msg.serde":"json","job.name":"wikipedia-feed","serializers.registry.json.class":"org.apache.samza.serializers.JsonSerdeFactory","task.log4j.system":"kafka","systems.wikipedia.samza.factory":"samza.examples.wikipedia.system.WikipediaSystemFactory"},"containers":{"0":{"container-id":0,"tasks":{"Partition 0":{"task-name":"Partition 0","system-stream-partitions":[{"system":"wikipedia","partition":0,"stream":"#en.wikinews"},{"system":"wikipedia","partition":0,"stream":"#en.wiktionary"},{"system":"wikipedia","partition":0,"stream":"#en.wikipedia"}],"changelog-partition":0}}}}}
          	at org.apache.samza.logging.log4j.SystemProducerAppender.getConfig(SystemProducerAppender.java:185)
          	at org.apache.samza.logging.log4j.SystemProducerAppender.activateOptions(SystemProducerAppender.java:82)
          	at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
          	at org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295)
          	at org.apache.log4j.xml.DOMConfigurator.findAppenderByName(DOMConfigurator.java:176)
          	at org.apache.log4j.xml.DOMConfigurator.findAppenderByReference(DOMConfigurator.java:191)
          	at org.apache.log4j.xml.DOMConfigurator.parseChildrenOfLoggerElement(DOMConfigurator.java:523)
          	at org.apache.log4j.xml.DOMConfigurator.parseRoot(DOMConfigurator.java:492)
          	at org.apache.log4j.xml.DOMConfigurator.parse(DOMConfigurator.java:1001)
          	at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:867)
          	at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:773)
          	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:483)
          	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
          	at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
          	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253)
          	at org.apache.samza.util.Logging$class.logger(Logging.scala:27)
          	at org.apache.samza.metrics.JmxServer.logger$lzycompute(JmxServer.scala:41)
          	at org.apache.samza.metrics.JmxServer.logger(JmxServer.scala:41)
          	at org.apache.samza.util.Logging$class.info(Logging.scala:54)
          	at org.apache.samza.metrics.JmxServer.info(JmxServer.scala:41)
          	at org.apache.samza.metrics.JmxServer.<init>(JmxServer.scala:73)
          	at org.apache.samza.metrics.JmxServer.<init>(JmxServer.scala:44)
          	at org.apache.samza.container.SamzaContainer$.safeMain$default$1(SamzaContainer.scala:72)
          	at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:69)
          	at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
          Caused by: org.codehaus.jackson.map.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token
           at [Source: N/A; line: -1, column: -1]
          	at org.codehaus.jackson.map.JsonMappingException.from(JsonMappingException.java:163)
          	at org.codehaus.jackson.map.deser.StdDeserializationContext.mappingException(StdDeserializationContext.java:198)
          	at org.codehaus.jackson.map.deser.StdDeserializer$StringDeserializer.deserialize(StdDeserializer.java:671)
          	at org.codehaus.jackson.map.deser.StdDeserializer$StringDeserializer.deserialize(StdDeserializer.java:640)
          	at org.codehaus.jackson.map.deser.MapDeserializer._readAndBind(MapDeserializer.java:235)
          	at org.codehaus.jackson.map.deser.MapDeserializer.deserialize(MapDeserializer.java:165)
          	at org.codehaus.jackson.map.deser.MapDeserializer.deserialize(MapDeserializer.java:25)
          	at org.codehaus.jackson.map.ObjectMapper._readValue(ObjectMapper.java:2376)
          	at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1747)
          	at org.apache.samza.serializers.model.SamzaObjectMapper$ConfigDeserializer.deserialize(SamzaObjectMapper.java:104)
          	at org.apache.samza.serializers.model.SamzaObjectMapper$ConfigDeserializer.deserialize(SamzaObjectMapper.java:99)
          	at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2395)
          	at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1595)
          	at org.apache.samza.logging.log4j.SystemProducerAppender.getConfig(SystemProducerAppender.java:182)
          	... 24 more
          
          Show
          martinkl Martin Kleppmann added a comment - Nice work! I added some comments on the RB. I also tried adding this appender to hello-samza, and running the wikipedia-feed job. It partially worked: AM logs appeared in Kafka, except that the first 35 or so lines of log were missing. Do you know why that would be? Is there any way we can capture logs right from the start (buffering them if necessary if the producer is not yet connected)? When I looked at the output of kafka-console-consumer, the lines of the AM logs appeared in a different order from how they appeared in the regular log file. Any idea why? I thought using the container name as message key should make all the messages go to the same partition, and thus preserve their order. Logs from the other container (non-AM) did not appear in Kafka. The following error appeared on the container's stdout: log4j:ERROR Could not create an Appender. Reported error follows. org.apache.samza.SamzaException: can not read the config from {"config":{"systems.kafka.consumer.zookeeper.connect":"localhost:2181/","systems.kafka.samza.factory":"org.apache.samza.system.kafka.KafkaSystemFactory","systems.wikipedia.port":"6667","task.inputs":"wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews","systems.wikipedia.host":"irc.wikimedia.org","systems.kafka.producer.producer.type":"sync","yarn.package.path":"file:///Users/martin/dev/samza/hello-samza/target/hello-samza-0.8.0-dist.tar.gz","job.factory.class":"org.apache.samza.job.yarn.YarnJobFactory","systems.kafka.producer.metadata.broker.list":"localhost:9092","task.class":"samza.examples.wikipedia.task.WikipediaFeedStreamTask","systems.kafka.producer.batch.num.messages":"1","systems.kafka.samza.msg.serde":"json","job.name":"wikipedia-feed","serializers.registry.json.class":"org.apache.samza.serializers.JsonSerdeFactory","task.log4j.system":"kafka","systems.wikipedia.samza.factory":"samza.examples.wikipedia.system.WikipediaSystemFactory"},"containers":{"0":{"container-id":0,"tasks":{"Partition 0":{"task-name":"Partition 0","system-stream-partitions":[{"system":"wikipedia","partition":0,"stream":"#en.wikinews"},{"system":"wikipedia","partition":0,"stream":"#en.wiktionary"},{"system":"wikipedia","partition":0,"stream":"#en.wikipedia"}],"changelog-partition":0}}}}} at org.apache.samza.logging.log4j.SystemProducerAppender.getConfig(SystemProducerAppender.java:185) at org.apache.samza.logging.log4j.SystemProducerAppender.activateOptions(SystemProducerAppender.java:82) at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307) at org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295) at org.apache.log4j.xml.DOMConfigurator.findAppenderByName(DOMConfigurator.java:176) at org.apache.log4j.xml.DOMConfigurator.findAppenderByReference(DOMConfigurator.java:191) at org.apache.log4j.xml.DOMConfigurator.parseChildrenOfLoggerElement(DOMConfigurator.java:523) at org.apache.log4j.xml.DOMConfigurator.parseRoot(DOMConfigurator.java:492) at org.apache.log4j.xml.DOMConfigurator.parse(DOMConfigurator.java:1001) at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:867) at org.apache.log4j.xml.DOMConfigurator.doConfigure(DOMConfigurator.java:773) at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:483) at org.apache.log4j.LogManager.<clinit>(LogManager.java:127) at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253) at org.apache.samza.util.Logging$class.logger(Logging.scala:27) at org.apache.samza.metrics.JmxServer.logger$lzycompute(JmxServer.scala:41) at org.apache.samza.metrics.JmxServer.logger(JmxServer.scala:41) at org.apache.samza.util.Logging$class.info(Logging.scala:54) at org.apache.samza.metrics.JmxServer.info(JmxServer.scala:41) at org.apache.samza.metrics.JmxServer.<init>(JmxServer.scala:73) at org.apache.samza.metrics.JmxServer.<init>(JmxServer.scala:44) at org.apache.samza.container.SamzaContainer$.safeMain$default$1(SamzaContainer.scala:72) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:69) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Caused by: org.codehaus.jackson.map.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token at [Source: N/A; line: -1, column: -1] at org.codehaus.jackson.map.JsonMappingException.from(JsonMappingException.java:163) at org.codehaus.jackson.map.deser.StdDeserializationContext.mappingException(StdDeserializationContext.java:198) at org.codehaus.jackson.map.deser.StdDeserializer$StringDeserializer.deserialize(StdDeserializer.java:671) at org.codehaus.jackson.map.deser.StdDeserializer$StringDeserializer.deserialize(StdDeserializer.java:640) at org.codehaus.jackson.map.deser.MapDeserializer._readAndBind(MapDeserializer.java:235) at org.codehaus.jackson.map.deser.MapDeserializer.deserialize(MapDeserializer.java:165) at org.codehaus.jackson.map.deser.MapDeserializer.deserialize(MapDeserializer.java:25) at org.codehaus.jackson.map.ObjectMapper._readValue(ObjectMapper.java:2376) at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1747) at org.apache.samza.serializers.model.SamzaObjectMapper$ConfigDeserializer.deserialize(SamzaObjectMapper.java:104) at org.apache.samza.serializers.model.SamzaObjectMapper$ConfigDeserializer.deserialize(SamzaObjectMapper.java:99) at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2395) at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1595) at org.apache.samza.logging.log4j.SystemProducerAppender.getConfig(SystemProducerAppender.java:182) ... 24 more
          Hide
          martinkl Martin Kleppmann added a comment -

          how can we automatically create the topic with correct partitions for them

          As you say, other streams have the same issue. As it stands, the topic will get auto-created with the broker's default number of partitions, which I think is ok. If the user wants a specific number of partitions, they can create them manually. As I understand it, the number of partitions shouldn't affect the correctness of the logging.

          Show
          martinkl Martin Kleppmann added a comment - how can we automatically create the topic with correct partitions for them As you say, other streams have the same issue. As it stands, the topic will get auto-created with the broker's default number of partitions, which I think is ok. If the user wants a specific number of partitions, they can create them manually. As I understand it, the number of partitions shouldn't affect the correctness of the logging.
          Hide
          closeuris Yan Fang added a comment -

          Thank you, Martin.

          Made changes according to Martin's comments in the RB. The same RB: https://reviews.apache.org/r/28035/

          AM logs appeared in Kafka, except that the first 35 or so lines of log were missing. Do you know why that would be? Is there any way we can capture logs right from the start (buffering them if necessary if the producer is not yet connected)?

          I looked into it a little. All the logs missed are those happen during activateOptions(). The tricky part is that the appender does not call "protected void append(LoggingEvent event)" during "activateOptions()" even we have "log" in it. Similarly, we miss the logs during initiating the SystemProducers.

          As discussed, if we can not figure out a better way, it seems acceptable because we do not lose the logs if the users specify other log appenders.

          When I looked at the output of kafka-console-consumer, the lines of the AM logs appeared in a different order from how they appeared in the regular log file. Any idea why? I thought using the container name as message key should make all the messages go to the same partition, and thus preserve their order.

          Could you check it now? I do not see the same problem. I only see the AM misses some logs due to (1).

          Logs from the other container (non-AM) did not appear in Kafka. The following error appeared on the container's stdout:

          fixed this.

          Show
          closeuris Yan Fang added a comment - Thank you, Martin. Made changes according to Martin's comments in the RB. The same RB: https://reviews.apache.org/r/28035/ AM logs appeared in Kafka, except that the first 35 or so lines of log were missing. Do you know why that would be? Is there any way we can capture logs right from the start (buffering them if necessary if the producer is not yet connected)? I looked into it a little. All the logs missed are those happen during activateOptions(). The tricky part is that the appender does not call "protected void append(LoggingEvent event)" during "activateOptions()" even we have "log" in it. Similarly, we miss the logs during initiating the SystemProducers. As discussed, if we can not figure out a better way, it seems acceptable because we do not lose the logs if the users specify other log appenders. When I looked at the output of kafka-console-consumer, the lines of the AM logs appeared in a different order from how they appeared in the regular log file. Any idea why? I thought using the container name as message key should make all the messages go to the same partition, and thus preserve their order. Could you check it now? I do not see the same problem. I only see the AM misses some logs due to (1). Logs from the other container (non-AM) did not appear in Kafka. The following error appeared on the container's stdout: fixed this.
          Hide
          criccomini Chris Riccomini added a comment -

          Commented on RB. Other notes:

          1. It looks like we're appending newlines for every log entry.
          2. It would be nice to be able to customize the log topic name, but we can open that as a separate ticket.
          3. We'll have to allow the OutgoingMessageEnvelope send to be overrideable so we don't force people to use String logging (we should give them raw LoggingEvent, so they can encode to JSON, Protobuf, etc). This, again, can be a separate ticket.
          4. It would be nice if you set MDC for container name for both SamzaContainer and the AM, so we can log the container in the log4j format.

          Re (1), using kafka-console-consumer shows:

          ...
          2014-11-24 10:59:07 SamzaContainer$ [INFO] Got metrics reporters: Set()
          
          2014-11-24 10:59:07 SamzaContainer$ [INFO] Got checkpoint manager: null
          
          2014-11-24 10:59:07 OffsetManager$ [INFO] No default offset for SystemStream [system=wikipedia, stream=#en.wikinews] defined. Using upcoming.
          
          2014-11-24 10:59:07 OffsetManager$ [INFO] No default offset for SystemStream [system=wikipedia, stream=#en.wikipedia] defined. Using upcoming.
          ...
          

          Recommend doing a trim().

          Show
          criccomini Chris Riccomini added a comment - Commented on RB. Other notes: It looks like we're appending newlines for every log entry. It would be nice to be able to customize the log topic name, but we can open that as a separate ticket. We'll have to allow the OutgoingMessageEnvelope send to be overrideable so we don't force people to use String logging (we should give them raw LoggingEvent, so they can encode to JSON, Protobuf, etc). This, again, can be a separate ticket. It would be nice if you set MDC for container name for both SamzaContainer and the AM, so we can log the container in the log4j format. Re (1), using kafka-console-consumer shows: ... 2014-11-24 10:59:07 SamzaContainer$ [INFO] Got metrics reporters: Set() 2014-11-24 10:59:07 SamzaContainer$ [INFO] Got checkpoint manager: null 2014-11-24 10:59:07 OffsetManager$ [INFO] No default offset for SystemStream [system=wikipedia, stream=#en.wikinews] defined. Using upcoming. 2014-11-24 10:59:07 OffsetManager$ [INFO] No default offset for SystemStream [system=wikipedia, stream=#en.wikipedia] defined. Using upcoming. ... Recommend doing a trim().
          Hide
          criccomini Chris Riccomini added a comment -

          Also, at first glance, things appeared to be in order. There was some intermingling, but I think it came from SamzaContainer and the AM logging to the same partition. To confirm this, it'd be useful to have the MDC set for container name, so we could log the container name as part of the log line (see (4) above).

          Show
          criccomini Chris Riccomini added a comment - Also, at first glance, things appeared to be in order. There was some intermingling, but I think it came from SamzaContainer and the AM logging to the same partition. To confirm this, it'd be useful to have the MDC set for container name, so we could log the container name as part of the log line (see (4) above).
          Hide
          closeuris Yan Fang added a comment - - edited

          It would be nice to be able to customize the log topic name, but we can open that as a separate ticket.

          Current, users can set the StreamName in log4j.xml to define the topic name.

          Show
          closeuris Yan Fang added a comment - - edited It would be nice to be able to customize the log topic name, but we can open that as a separate ticket. Current, users can set the StreamName in log4j.xml to define the topic name.
          Hide
          closeuris Yan Fang added a comment -

          Updated the RB based on Chris' and Chinmay's comments. https://reviews.apache.org/r/28035/

          It looks like we're appending newlines for every log entry.

          yes.

          We'll have to allow the OutgoingMessageEnvelope send to be overrideable so we don't force people to use String logging (we should give them raw LoggingEvent, so they can encode to JSON, Protobuf, etc). This, again, can be a separate ticket.

          yeah, Martin mentions the same issue in his comment. I will open a separate ticket for this.

          It would be nice if you set MDC for container name for both SamzaContainer and the AM, so we can log the container in the log4j format.

          done.

          things appeared to be in order

          it's interesting. I actually see one time that the order in stream appender is different from that in file appender in all my testings (20+). The following logs are reversed.

          samza-application-master 2014-11-24 12:06:54 SamzaAppMaster$ [INFO] got node manager http port: 8042
          
          samza-application-master 2014-11-24 12:06:54 SamzaAppMaster$ [INFO] got node manager port: 57744
          
          Show
          closeuris Yan Fang added a comment - Updated the RB based on Chris' and Chinmay's comments. https://reviews.apache.org/r/28035/ It looks like we're appending newlines for every log entry. yes. We'll have to allow the OutgoingMessageEnvelope send to be overrideable so we don't force people to use String logging (we should give them raw LoggingEvent, so they can encode to JSON, Protobuf, etc). This, again, can be a separate ticket. yeah, Martin mentions the same issue in his comment. I will open a separate ticket for this. It would be nice if you set MDC for container name for both SamzaContainer and the AM, so we can log the container in the log4j format. done. things appeared to be in order it's interesting. I actually see one time that the order in stream appender is different from that in file appender in all my testings (20+). The following logs are reversed. samza-application-master 2014-11-24 12:06:54 SamzaAppMaster$ [INFO] got node manager http port: 8042 samza-application-master 2014-11-24 12:06:54 SamzaAppMaster$ [INFO] got node manager port: 57744
          Hide
          criccomini Chris Riccomini added a comment -

          It looks like we're appending newlines for every log entry.

          Can we trim() the lines?

          Show
          criccomini Chris Riccomini added a comment - It looks like we're appending newlines for every log entry. Can we trim() the lines?
          Hide
          criccomini Chris Riccomini added a comment - - edited

          The following logs are reversed.

          Hmm, that's odd. These seems a bit scary. Given that it's the same sender (the AM), they should be in order. I can't think of a spot where there'd be a race condition. Partition key should be the same, source should be the same, and there should be only one AM running with a single thread. Kafka producer should serialize the output, and log4j should not re-order (AppenderSkeleton doesn't seem to do anything bad).

          Show
          criccomini Chris Riccomini added a comment - - edited The following logs are reversed. Hmm, that's odd. These seems a bit scary. Given that it's the same sender (the AM), they should be in order. I can't think of a spot where there'd be a race condition. Partition key should be the same, source should be the same, and there should be only one AM running with a single thread. Kafka producer should serialize the output, and log4j should not re-order ( AppenderSkeleton doesn't seem to do anything bad).
          Hide
          criccomini Chris Riccomini added a comment -

          Were you using sync or async producer in this test?

          Show
          criccomini Chris Riccomini added a comment - Were you using sync or async producer in this test?
          Hide
          closeuris Yan Fang added a comment - - edited

          I was using the sync producer. Yeah, it's a little odd.

          Show
          closeuris Yan Fang added a comment - - edited I was using the sync producer. Yeah, it's a little odd.
          Hide
          criccomini Chris Riccomini added a comment -

          I think I'm +1 on this, provided that you add a trim() to the log line.

          I've run several times, and haven't seen any out of order messaging. If it's consistently reproducible, can you investigate post-commit? (or file a follow on?) I just can't trigger this issue.

          Show
          criccomini Chris Riccomini added a comment - I think I'm +1 on this, provided that you add a trim() to the log line. I've run several times, and haven't seen any out of order messaging. If it's consistently reproducible, can you investigate post-commit? (or file a follow on?) I just can't trigger this issue.
          Hide
          criccomini Chris Riccomini added a comment -

          Also, please post the patch to JIRA before committing.

          Show
          criccomini Chris Riccomini added a comment - Also, please post the patch to JIRA before committing.
          Hide
          criccomini Chris Riccomini added a comment -

          Ack.. one other thought: it'd be nice if the MDC contained job name and job id as well.

          I'm coming at this from an ELK perspective, where we might have multiple job logs intermingled in a single stream. If so, we'd want job name/job id fields in the log lines.

          Show
          criccomini Chris Riccomini added a comment - Ack.. one other thought: it'd be nice if the MDC contained job name and job id as well. I'm coming at this from an ELK perspective, where we might have multiple job logs intermingled in a single stream. If so, we'd want job name/job id fields in the log lines.
          Hide
          closeuris Yan Fang added a comment -

          Tried a few tests. Can not trigger it again... Will put it into a separate ticket to keep exploring.

          Added trim() and jobName, jobId to MDC. Post to RB:https://reviews.apache.org/r/28035/

          If it's ok, will commit.

          Thanks.

          Show
          closeuris Yan Fang added a comment - Tried a few tests. Can not trigger it again... Will put it into a separate ticket to keep exploring. Added trim() and jobName, jobId to MDC. Post to RB: https://reviews.apache.org/r/28035/ If it's ok, will commit. Thanks.
          Hide
          closeuris Yan Fang added a comment -

          Added two tickets for exploring the order of logs (SAMZA-478) and having the logs format pluggable (SAMZA-479)

          Show
          closeuris Yan Fang added a comment - Added two tickets for exploring the order of logs ( SAMZA-478 ) and having the logs format pluggable ( SAMZA-479 )
          Hide
          criccomini Chris Riccomini added a comment -

          Noticed this:

          [] 2014-11-25 08:39:10 JmxServer [INFO] According to InetAddress.getLocalHost.getHostName we are criccomi-mn
          [] 2014-11-25 08:39:10 JmxServer [INFO] Started JmxServer registry port=61713 server port=61714 url=service:jmx:rmi://localhost:61714/jndi/rmi://localhost:61713/jmxrmi
          [] 2014-11-25 08:39:10 JmxServer [INFO] If you are tunneling, you might want to try JmxServer registry port=61713 server port=61714 url=service:jmx:rmi://criccomi-mn:61714/jndi/rmi://criccomi-mn:61713/jmxrmi
          

          It seems we have a chicken and egg problem between the log lines in the JMX server and the MDC data. Recommend moving the JmxServer creation after the putMDC calls in SamzaContainer. This is what we do in SamzaAppMaster.

          Sorry for so many iterations, Yan Fang, the patch looks great. Just being thorough.

          Show
          criccomini Chris Riccomini added a comment - Noticed this: [] 2014-11-25 08:39:10 JmxServer [INFO] According to InetAddress.getLocalHost.getHostName we are criccomi-mn [] 2014-11-25 08:39:10 JmxServer [INFO] Started JmxServer registry port=61713 server port=61714 url=service:jmx:rmi://localhost:61714/jndi/rmi://localhost:61713/jmxrmi [] 2014-11-25 08:39:10 JmxServer [INFO] If you are tunneling, you might want to try JmxServer registry port=61713 server port=61714 url=service:jmx:rmi://criccomi-mn:61714/jndi/rmi://criccomi-mn:61713/jmxrmi It seems we have a chicken and egg problem between the log lines in the JMX server and the MDC data. Recommend moving the JmxServer creation after the putMDC calls in SamzaContainer. This is what we do in SamzaAppMaster. Sorry for so many iterations, Yan Fang , the patch looks great. Just being thorough.
          Hide
          closeuris Yan Fang added a comment -

          Oh, yes, I did not notice the jmxServer creation in the constructor. I think putting MDC before the jmxServer creation makes more sense.

          Updated to the same RB. https://reviews.apache.org/r/28035

          Thank you.

          Show
          closeuris Yan Fang added a comment - Oh, yes, I did not notice the jmxServer creation in the constructor. I think putting MDC before the jmxServer creation makes more sense. Updated to the same RB. https://reviews.apache.org/r/28035 Thank you.
          Hide
          criccomini Chris Riccomini added a comment -

          I'd prefer having the job name/job id/container name all set before the jmx server. Motivation is that we'll want to filter ELK logs based on job id and job name, and we'll have the same issue if we don't set them before we start the jmx server.

          I propose changing the safeMain parameter to being () => JmxServer, and then calling that after the putMDC job name/job ID block. This lets you shrink the try/catch, as well, since we only need try/catch after jmxServer is created.

          Show
          criccomini Chris Riccomini added a comment - I'd prefer having the job name/job id/container name all set before the jmx server. Motivation is that we'll want to filter ELK logs based on job id and job name, and we'll have the same issue if we don't set them before we start the jmx server. I propose changing the safeMain parameter to being () => JmxServer, and then calling that after the putMDC job name/job ID block. This lets you shrink the try/catch, as well, since we only need try/catch after jmxServer is created.
          Hide
          closeuris Yan Fang added a comment -

          https://reviews.apache.org/r/28035/

          Create the JmxServer after putMDC jobName, jobId and containerId.

          After I shrink the try/catch, it makes the safeMain method untestable because it is using the environment variable. I have to delete one unit test. It's reported in the SAMZA-387. Maybe at least, we should make the environment variables mockable.

          Show
          closeuris Yan Fang added a comment - https://reviews.apache.org/r/28035/ Create the JmxServer after putMDC jobName, jobId and containerId. After I shrink the try/catch, it makes the safeMain method untestable because it is using the environment variable. I have to delete one unit test. It's reported in the SAMZA-387 . Maybe at least, we should make the environment variables mockable.
          Hide
          criccomini Chris Riccomini added a comment -

          +1 Thanks for all the work! We're going to start using this with ELK very soon.

          Show
          criccomini Chris Riccomini added a comment - +1 Thanks for all the work! We're going to start using this with ELK very soon.
          Hide
          closeuris Yan Fang added a comment -

          added the null check as i mentioned in the RB.

          Show
          closeuris Yan Fang added a comment - added the null check as i mentioned in the RB.
          Hide
          closeuris Yan Fang added a comment -

          Thank you for all the help! Committed to master. Glad to see how it works in ELK.

          Show
          closeuris Yan Fang added a comment - Thank you for all the help! Committed to master. Glad to see how it works in ELK.

            People

            • Assignee:
              closeuris Yan Fang
              Reporter:
              martinkl Martin Kleppmann
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development