Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-479

Make StreamAppender pluggable for different log formats

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.0
    • Fix Version/s: 0.9.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently the StreamAppender only accepts String and sends String as the format for all the logs. It will be useful to have StreamAppender to accept and send other formats, such as Avro, JSON, etc. So the idea is to move the encoding of the LoggingEvent to a serde.

      1. SAMZA-479.2.patch
        16 kB
        Yan Fang
      2. SAMZA-479.3.patch
        15 kB
        Yan Fang
      3. SAMZA-479.4.patch
        16 kB
        Yan Fang
      4. SAMZA-479.patch
        13 kB
        Yan Fang

        Issue Links

          Activity

          Hide
          closeuris Yan Fang added a comment -

          have been thinking over this. Because currently all the samza logs are string format, when a new format (e.g. json) log is added, the system needs to have two serdes. Technically, we can apply different serdes according to the package name specified in log4j properties file. Then, there is one concern, because we may also want to publish to a different stream for a new serd, ( otherwise, two different formats will be in one stream, which I think is a mess-up ), we will have two or more output streams for the logs, will that still be acceptable? And there will be one restriction, all the logs under one package name should have the same format. Is that acceptable as well? Am I thinking in the right direction?

          Show
          closeuris Yan Fang added a comment - have been thinking over this. Because currently all the samza logs are string format, when a new format (e.g. json) log is added, the system needs to have two serdes. Technically, we can apply different serdes according to the package name specified in log4j properties file. Then, there is one concern, because we may also want to publish to a different stream for a new serd, ( otherwise, two different formats will be in one stream, which I think is a mess-up ), we will have two or more output streams for the logs, will that still be acceptable? And there will be one restriction, all the logs under one package name should have the same format. Is that acceptable as well? Am I thinking in the right direction?
          Hide
          criccomini Chris Riccomini added a comment -

          the system needs to have two serdes

          Why does the system need to have two serdes?

          otherwise, two different formats will be in one stream, which I think is a mess-up

          Agreed.

          And there will be one restriction, all the logs under one package name should have the same format. Is that acceptable as well?

          Yes, I think so.

          Am I thinking in the right direction?

          Hard to say. I'm having trouble figuring out exactly what you're saying, and what the problem is. Can you provide an example?

          Show
          criccomini Chris Riccomini added a comment - the system needs to have two serdes Why does the system need to have two serdes? otherwise, two different formats will be in one stream, which I think is a mess-up Agreed. And there will be one restriction, all the logs under one package name should have the same format. Is that acceptable as well? Yes, I think so. Am I thinking in the right direction? Hard to say. I'm having trouble figuring out exactly what you're saying, and what the problem is. Can you provide an example?
          Hide
          closeuris Yan Fang added a comment -

          Chris Riccomini, Let me go one step back.

          1. This ticket was extracted from your comment (3) and Martin Kleppmann's comment in RB.

          2. So my understanding is that users may use logging, such as

          Map<String, Object> jsonObject
          jsonObject.put("foo", anyObject)
          log.info(jsonObject)
          

          instead of simply

          log.info("string")
          

          Right?

          3. In order to encode the json, we need to have serde, like what we have for messages and keys, such as, *.msg.serde=json.

          4.

          Why does the system need to have two serdes?

          This is because, the samza itself is using the string, whenever a user adds a json logging format, there will be string serde + json serde. I was thinking there was only one appender. After second thought, I think different serdes go to different appenders. So it will be like

          <appender name="string-serde-StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender"></appender>
          
          <appender name="json-serde-StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender"></appender>
          
          <root> <appender-ref ref="string-serde-StreamAppender"/> </root>
          
          <logger name="com.json.log.package"><appender-ref ref="json-serde-StreamAppender" /></logger>
          
          

          Is this what was in your mind ? After doing this, I think one stream will only have one serde. My previous statement is not correct.

          Show
          closeuris Yan Fang added a comment - Chris Riccomini , Let me go one step back. 1. This ticket was extracted from your comment (3) and Martin Kleppmann 's comment in RB . 2. So my understanding is that users may use logging, such as Map< String , Object > jsonObject jsonObject.put( "foo" , anyObject) log.info(jsonObject) instead of simply log.info( "string" ) Right? 3. In order to encode the json, we need to have serde, like what we have for messages and keys, such as, *.msg.serde=json. 4. Why does the system need to have two serdes? This is because, the samza itself is using the string, whenever a user adds a json logging format, there will be string serde + json serde. I was thinking there was only one appender. After second thought, I think different serdes go to different appenders. So it will be like <appender name= "string-serde-StreamAppender" class= "org.apache.samza.logging.log4j.StreamAppender" ></appender> <appender name= "json-serde-StreamAppender" class= "org.apache.samza.logging.log4j.StreamAppender" ></appender> <root> <appender-ref ref= "string-serde-StreamAppender" /> </root> <logger name= "com.json.log. package " ><appender-ref ref= "json-serde-StreamAppender" /></logger> Is this what was in your mind ? After doing this, I think one stream will only have one serde. My previous statement is not correct.
          Hide
          criccomini Chris Riccomini added a comment -

          Ahh, no I think Martin and I might not have been quite clear. We don't want to change the log.info interface. That should remain string. The thing part of the code that we are talking about is StreamAppender.append:

            protected void append(LoggingEvent event) {
              if (!recursiveCall.get()) {
                try {
                  recursiveCall.set(true);
                  OutgoingMessageEnvelope outgoingMessageEnvelope =
                      new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), subAppend(event).getBytes("UTF-8"));
                  systemProducer.send(SOURCE, outgoingMessageEnvelope);
                } catch (UnsupportedEncodingException e) {
                  throw new SamzaException("can not send the log messages", e);
                } finally {
                  recursiveCall.set(false);
                }
              }
            }
          

          This method takes in a full LoggingEvent, and sends the OutgoingMessageEnvelope's value as a string. If we were to send the value as a LoggingEvent instead, we'd make it easier for developers to provide other Serdes for LoggingEvent (e.g. one that uses Avro, Protobuf, JSON, etc), rather than just forcing a string serde.

          One complexity in doing this is that the subAppend method currently refers to this.layout:

            private String subAppend(LoggingEvent event) {
              if (this.layout == null) {
                return event.getRenderedMessage();
              } else {
                return this.layout.format(event).trim();
              }
            }
          

          If the OutgoingMessageEnvelope were to send the LoggingEvent, then the Serde that encodes the LoggingEvent wouldn't be able to easily get access to this.layout.format(). I'm not quite sure if there's a good way to fix that. This part is probably the most important piece of the logic, though, as it's what allows us to set custom log-line formats (to include things like container name, timestamp, class, etc) in the log4j.xml.

          Show
          criccomini Chris Riccomini added a comment - Ahh, no I think Martin and I might not have been quite clear. We don't want to change the log.info interface. That should remain string. The thing part of the code that we are talking about is StreamAppender.append: protected void append(LoggingEvent event) { if (!recursiveCall.get()) { try { recursiveCall.set( true ); OutgoingMessageEnvelope outgoingMessageEnvelope = new OutgoingMessageEnvelope(systemStream, key.getBytes( "UTF-8" ), subAppend(event).getBytes( "UTF-8" )); systemProducer.send(SOURCE, outgoingMessageEnvelope); } catch (UnsupportedEncodingException e) { throw new SamzaException( "can not send the log messages" , e); } finally { recursiveCall.set( false ); } } } This method takes in a full LoggingEvent, and sends the OutgoingMessageEnvelope's value as a string. If we were to send the value as a LoggingEvent instead, we'd make it easier for developers to provide other Serdes for LoggingEvent (e.g. one that uses Avro, Protobuf, JSON, etc), rather than just forcing a string serde. One complexity in doing this is that the subAppend method currently refers to this.layout: private String subAppend(LoggingEvent event) { if ( this .layout == null ) { return event.getRenderedMessage(); } else { return this .layout.format(event).trim(); } } If the OutgoingMessageEnvelope were to send the LoggingEvent, then the Serde that encodes the LoggingEvent wouldn't be able to easily get access to this.layout.format(). I'm not quite sure if there's a good way to fix that. This part is probably the most important piece of the logic, though, as it's what allows us to set custom log-line formats (to include things like container name, timestamp, class, etc) in the log4j.xml.
          Hide
          closeuris Yan Fang added a comment -

          I see.

          1. So basically what you mean is that, we want to see the logs like (in json format)

          {
            "logger":"org.apache.samza.system",
            "timestamp":"1439976540689",
            "level":"WARN",
            "containerName":"samza-container-1"
            "message":"foo messages here",
            "throwable":"java.Exception"
          },
          

          right?

          2. If 1 is right,

          If the OutgoingMessageEnvelope were to send the LoggingEvent, then the Serde that encodes the LoggingEvent wouldn't be able to easily get access to this.layout.format().

          I think in order to get this work, we need to either write new "Serde" for LoggingEvent or write new layout class.

          1) write new "Serde": then we let OutgoingMessageEnvelop accepts the LoggingEvent and do all the processing in the "Serde" class. But this Serde class will be totally different from what we have now, because it needs to process the LoggingEvent and transform that into JSON and then byte[]. It is true that we will not use any layout class.

          2) write new layout class: the new layout.format returns a JSON object (or other objects). This is not the original layout class anymore. Because original layout.format() always returns String, while what we want actually is a different kind of object. So actually the new layout class will not be compatible with other appenders. We can not use any existing layout classes to StreamAppender either. So then the subAppend will return a JSON object. Our current jsonSerde will work in this case.

          I am leaning to the approach 2. In fact both approaches get rid of the layout class. Is this acceptable? When the users are using the String, we still can use the layout class. There are some exiting json layout, avro layout, but I think none of them works in our situation.

          Any thoughts about this?

          Show
          closeuris Yan Fang added a comment - I see. 1. So basically what you mean is that, we want to see the logs like (in json format) { "logger" : "org.apache.samza.system" , "timestamp" : "1439976540689" , "level" : "WARN" , "containerName" : "samza-container-1" "message" : "foo messages here" , "throwable" : "java.Exception" }, right? 2. If 1 is right, If the OutgoingMessageEnvelope were to send the LoggingEvent, then the Serde that encodes the LoggingEvent wouldn't be able to easily get access to this.layout.format(). I think in order to get this work, we need to either write new "Serde" for LoggingEvent or write new layout class. 1) write new "Serde": then we let OutgoingMessageEnvelop accepts the LoggingEvent and do all the processing in the "Serde" class. But this Serde class will be totally different from what we have now, because it needs to process the LoggingEvent and transform that into JSON and then byte[]. It is true that we will not use any layout class. 2) write new layout class: the new layout.format returns a JSON object (or other objects). This is not the original layout class anymore. Because original layout.format() always returns String, while what we want actually is a different kind of object. So actually the new layout class will not be compatible with other appenders. We can not use any existing layout classes to StreamAppender either. So then the subAppend will return a JSON object. Our current jsonSerde will work in this case. I am leaning to the approach 2. In fact both approaches get rid of the layout class. Is this acceptable? When the users are using the String, we still can use the layout class. There are some exiting json layout , avro layout , but I think none of them works in our situation. Any thoughts about this?
          Hide
          criccomini Chris Riccomini added a comment -

          What about a variation of (2), where we construct a new LoggingEvent? Something like this:

            private LoggingEvent subLog(LoggingEvent le) {
              return new LoggingEvent(
                le.getFQNOfLoggerClass(), 
                le.getLogger(), 
                le.getTimeStamp(), 
                le.getLevel(), 
                subAppend(le), 
                le.getThreadName(),
                le.getThrowableInformation(), 
                le.getNDC(), 
                le.getLocationInformation(), 
                le.getProperties());
            }
          

          Then we can call:

                      new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), subLog(event));
          

          This approach would allow us to use any Serde<LoggingEvent>. I think that this approach is cleaner, overall. Although it's not as friendly for JsonSerde, it's friendlier for non JSON serdes (e.g. Avro, Protobuf, etc). In JSON's case, we'd just write a LoggingEventJsonSerde that extends JSON serde, and converts LoggingEvent to a Map before calling JsonSerde.toBytes.

          Show
          criccomini Chris Riccomini added a comment - What about a variation of (2), where we construct a new LoggingEvent ? Something like this: private LoggingEvent subLog(LoggingEvent le) { return new LoggingEvent( le.getFQNOfLoggerClass(), le.getLogger(), le.getTimeStamp(), le.getLevel(), subAppend(le), le.getThreadName(), le.getThrowableInformation(), le.getNDC(), le.getLocationInformation(), le.getProperties()); } Then we can call: new OutgoingMessageEnvelope(systemStream, key.getBytes( "UTF-8" ), subLog(event)); This approach would allow us to use any Serde<LoggingEvent>. I think that this approach is cleaner, overall. Although it's not as friendly for JsonSerde, it's friendlier for non JSON serdes (e.g. Avro, Protobuf, etc). In JSON's case, we'd just write a LoggingEventJsonSerde that extends JSON serde, and converts LoggingEvent to a Map before calling JsonSerde.toBytes.
          Hide
          closeuris Yan Fang added a comment - - edited

          1. Is the only difference between new LogingEvent and the original LoggingEvent "subAppend" part, which gives the logging messages format?

          2.

          This approach would allow us to use any Serde<LoggingEvent>

          Here, it confused me a little. Which one doe you mean?

          • The Serde<LoggingEvent> is more likely an interface, then LoggingEventAvroSerde inherits it, the log is sent as Avro-endoced and the receiver side decode as an Avro format
          • Or you are thinking of directly serializing the LoggingEvent, then the receiver side (such as ELK) deserializes back to LoggingEvent?
          Show
          closeuris Yan Fang added a comment - - edited 1. Is the only difference between new LogingEvent and the original LoggingEvent "subAppend" part, which gives the logging messages format? 2. This approach would allow us to use any Serde<LoggingEvent> Here, it confused me a little. Which one doe you mean? The Serde<LoggingEvent> is more likely an interface, then LoggingEventAvroSerde inherits it, the log is sent as Avro-endoced and the receiver side decode as an Avro format Or you are thinking of directly serializing the LoggingEvent, then the receiver side (such as ELK) deserializes back to LoggingEvent?
          Hide
          criccomini Chris Riccomini added a comment -

          1. Is the only difference between new LogingEvent and the original LoggingEvent "subAppend" part, which gives the logging messages format?

          Yea, that was my intention.

          Which one doe you mean?

          I meant your first bullet point.

          LoggingEventAvroSerde implements Serde<LoggingEvent> {
            public void toBytes(LoggingEvent e) { /* encode as Avro record */ }
            public LoggingEvent fromBytes(byte[] bytes) { /* construct logging event from Avro record */ }
          }
          

          It's up to the receiving side how they want to handle it. I think out of the box, we shouldn't worry about Avro. We'll definitely need a LoggingEventStringSerde, which should just extend StringSerde, and use LoggingEvent.getMessage. LoggingEventJsonSerde, which could (de)encode the LoggingEvent to/from a HashMap, would be nice to have as well, but not required.

          Show
          criccomini Chris Riccomini added a comment - 1. Is the only difference between new LogingEvent and the original LoggingEvent "subAppend" part, which gives the logging messages format? Yea, that was my intention. Which one doe you mean? I meant your first bullet point. LoggingEventAvroSerde implements Serde<LoggingEvent> { public void toBytes(LoggingEvent e) { /* encode as Avro record */ } public LoggingEvent fromBytes( byte [] bytes) { /* construct logging event from Avro record */ } } It's up to the receiving side how they want to handle it. I think out of the box, we shouldn't worry about Avro. We'll definitely need a LoggingEventStringSerde, which should just extend StringSerde, and use LoggingEvent.getMessage. LoggingEventJsonSerde, which could (de)encode the LoggingEvent to/from a HashMap, would be nice to have as well, but not required.
          Hide
          closeuris Yan Fang added a comment -

          OK. That makes sense. I will implement LoggingEventStringSerde and LoggingEventJsonSerde at the first step. Thank you.

          Show
          closeuris Yan Fang added a comment - OK. That makes sense. I will implement LoggingEventStringSerde and LoggingEventJsonSerde at the first step. Thank you.
          Hide
          closeuris Yan Fang added a comment -

          RB: https://reviews.apache.org/r/31034/

          Want to make sure this is what we discussed.

          Only added LoggingEventStringSerde so far. Prefer to add LoggingEventJsonSerde in another ticket.

          Thank you.

          Show
          closeuris Yan Fang added a comment - RB: https://reviews.apache.org/r/31034/ Want to make sure this is what we discussed. Only added LoggingEventStringSerde so far. Prefer to add LoggingEventJsonSerde in another ticket. Thank you.
          Hide
          closeuris Yan Fang added a comment -

          Updated according to Chris Riccomini's comments in RB.

          https://reviews.apache.org/r/31034/

          Thank you.

          Show
          closeuris Yan Fang added a comment - Updated according to Chris Riccomini 's comments in RB. https://reviews.apache.org/r/31034/ Thank you.
          Hide
          closeuris Yan Fang added a comment -

          Updated according to Chris Riccomini's comments.

          1. Changed from "" to null
          2. clean up the method

          https://reviews.apache.org/r/31034/

          Thank you.

          Show
          closeuris Yan Fang added a comment - Updated according to Chris Riccomini 's comments. 1. Changed from "" to null 2. clean up the method https://reviews.apache.org/r/31034/ Thank you.
          Hide
          criccomini Chris Riccomini added a comment -

          +1

          1. Typo in throw new SamzaException("Can not find serializers class. Please specify serializers.registry.s%.class property");. The %s is reversed.
          2. Please attach patch to JIRA.

          Feel free to commit without another round of RB.

          Show
          criccomini Chris Riccomini added a comment - +1 Typo in throw new SamzaException("Can not find serializers class. Please specify serializers.registry.s%.class property"); . The %s is reversed. Please attach patch to JIRA. Feel free to commit without another round of RB.
          Hide
          criccomini Chris Riccomini added a comment -

          Re: (1) above, could probably just use the SerdeConfig constant, rather than hard-coding the config param.

          Show
          criccomini Chris Riccomini added a comment - Re: (1) above, could probably just use the SerdeConfig constant, rather than hard-coding the config param.
          Hide
          closeuris Yan Fang added a comment -

          Use the SystemConfig/StreamConfig instead of the hard-code property.

          Show
          closeuris Yan Fang added a comment - Use the SystemConfig/StreamConfig instead of the hard-code property.
          Hide
          criccomini Chris Riccomini added a comment -

          +1

          Show
          criccomini Chris Riccomini added a comment - +1
          Hide
          closeuris Yan Fang added a comment -

          Committed. Thank you.

          Show
          closeuris Yan Fang added a comment - Committed. Thank you.

            People

            • Assignee:
              closeuris Yan Fang
              Reporter:
              closeuris Yan Fang
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development