Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2857

Kafka Source/Channel/Sink does not restore default values when live update config

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.8.0
    • Component/s: Channel
    • Labels:
      None

      Description

      Been using the following config:

      tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
      tier1.channels.channel1.capacity = 10000
      tier1.channels.channel1.transactionCapacity = 10000
      tier1.channels.channel1.brokerList = 10.0.0.64:9092,10.0.0.65:9092,10.0.0.66:9092
      tier1.channels.channel1.topic = flume.aggregator.channel
      tier1.channels.channel1.zookeeperConnect = 10.0.0.64:2181
      tier1.channels.channel1.kafka.producer.type=async
      tier1.channels.channel1.kafka.batch.num.messages=200
      

      If I remove the producer.type and batch.num.messages (i.e. restore them to default values) when doing a live update config the new values do not take effect.

      1. FLUME-2857.patch
        9 kB
        Tristan Stevens

        Issue Links

          Activity

          Hide
          tmgstev Tristan Stevens added a comment -
          Show
          tmgstev Tristan Stevens added a comment - cc Bessenyei Balázs Donát
          Hide
          tmgstev Tristan Stevens added a comment -

          I actually believe this isn't specific to the Kafka Channel and applies to all live config updates in Flume when removing values and therefore expecting them to go back to defaults.

          Show
          tmgstev Tristan Stevens added a comment - I actually believe this isn't specific to the Kafka Channel and applies to all live config updates in Flume when removing values and therefore expecting them to go back to defaults.
          Hide
          bessbd Bessenyei Balázs Donát added a comment -

          Tristan Stevens: thank you for the bug report

          Would you like to provide a patch for it or would you like me to assign the ticket to myself and start working on it?

          Show
          bessbd Bessenyei Balázs Donát added a comment - Tristan Stevens : thank you for the bug report Would you like to provide a patch for it or would you like me to assign the ticket to myself and start working on it?
          Hide
          tmgstev Tristan Stevens added a comment -

          Bessenyei Balázs Donát Please take this one - hopefully it's a simple fix

          Show
          tmgstev Tristan Stevens added a comment - Bessenyei Balázs Donát Please take this one - hopefully it's a simple fix
          Hide
          tmgstev Tristan Stevens added a comment -

          Bessenyei Balázs Donát I think I have a fix for this - do you mind if I take it back?

          Show
          tmgstev Tristan Stevens added a comment - Bessenyei Balázs Donát I think I have a fix for this - do you mind if I take it back?
          Hide
          bessbd Bessenyei Balázs Donát added a comment -

          Tristan Stevens: awesome!

          Please, go ahead and reassign

          Show
          bessbd Bessenyei Balázs Donát added a comment - Tristan Stevens : awesome! Please, go ahead and reassign
          Hide
          tmgstev Tristan Stevens added a comment -

          Patch attached - it's a simple fix and I've added tests that definitely failed before and definitely pass now. Bessenyei Balázs Donát Do you think it needs a review board?

          Show
          tmgstev Tristan Stevens added a comment - Patch attached - it's a simple fix and I've added tests that definitely failed before and definitely pass now. Bessenyei Balázs Donát Do you think it needs a review board?
          Hide
          bessbd Bessenyei Balázs Donát added a comment -

          +1, LGTM

          I've checked: it doesn't seem to break anything and the tests seem to be correct, too.

          Side-note: we should check other sources/channels/sinks for such problems

          Show
          bessbd Bessenyei Balázs Donát added a comment - +1, LGTM I've checked: it doesn't seem to break anything and the tests seem to be correct, too. Side-note: we should check other sources/channels/sinks for such problems
          Hide
          tmgstev Tristan Stevens added a comment -

          Thanks Bessenyei Balázs Donát

          As a quick test, I've just inspected the following sources/sinks/channels:

          :~/Git/flume$ find . -name '*.java' | xargs grep -C3 'putAll' | grep getSubProperties
          ./flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java- Map<String, String> sysPropsNew = context.getSubProperties(
          ./flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java: producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
          ./flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java: consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
          ./flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java- Map<String, String> params = context.getSubProperties(
          ./flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java: elasticSearchClientContext.putAll(context.getSubProperties(CLIENT_PREFIX));
          ./flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java: serializerContext.putAll(context.getSubProperties(SERIALIZER_PREFIX));
          ./flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java- context.getSubProperties(INDEX_NAME_BUILDER_PREFIX));
          ./flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java: serializerContext.putAll(context.getSubProperties(
          ./flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java: serializerContext.putAll(context.getSubProperties(
          ./flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java: kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
          ./flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java: kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));

          All of those above seem to be okay (i.e. they are preceded by a re-initialisation). I would think that as most things use Context.get

          {type}

          (String name, [default-value]) we should be okay.

          Bessenyei Balázs Donát - thanks for pointing me in the right direction on this one - much appreciated.

          Show
          tmgstev Tristan Stevens added a comment - Thanks Bessenyei Balázs Donát As a quick test, I've just inspected the following sources/sinks/channels: :~/Git/flume$ find . -name '*.java' | xargs grep -C3 'putAll' | grep getSubProperties ./flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java- Map<String, String> sysPropsNew = context.getSubProperties( ./flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java: producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX)); ./flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java: consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX)); ./flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java- Map<String, String> params = context.getSubProperties( ./flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java: elasticSearchClientContext.putAll(context.getSubProperties(CLIENT_PREFIX)); ./flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java: serializerContext.putAll(context.getSubProperties(SERIALIZER_PREFIX)); ./flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java- context.getSubProperties(INDEX_NAME_BUILDER_PREFIX)); ./flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java: serializerContext.putAll(context.getSubProperties( ./flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java: serializerContext.putAll(context.getSubProperties( ./flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java: kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX)); ./flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java: kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX)); All of those above seem to be okay (i.e. they are preceded by a re-initialisation). I would think that as most things use Context.get {type} (String name, [default-value] ) we should be okay. Bessenyei Balázs Donát - thanks for pointing me in the right direction on this one - much appreciated.
          Hide
          bessbd Bessenyei Balázs Donát added a comment - - edited

          Tristan Stevens: I'll update the issue title according to the patch you've submitted to include (Kafka-) Source & Sink, too.

          Show
          bessbd Bessenyei Balázs Donát added a comment - - edited Tristan Stevens : I'll update the issue title according to the patch you've submitted to include (Kafka-) Source & Sink, too.
          Hide
          bessbd Bessenyei Balázs Donát added a comment -

          Tristan Stevens: The changes seem all fine, but when I was doing a last check, I've come across an issue:
          When I'm running the test for KafkaSource in debug mode, it seems to throw an exception in the middle and never run the asserts. (Which is silent, because there is a throws Exception on the method.)

          The problem seems to be around

          String topic = findUnusedTopic();
          kafkaServer.createTopic(topic, 1);

          I think, removing the parts of the test method that are not directly testing the current change would help.

          Can you please make a few changes to the patch? (Or let me know if I'm wrong.)

          Thank you

          PS. maybe a reviewboard request could help to discuss the code

          Show
          bessbd Bessenyei Balázs Donát added a comment - Tristan Stevens : The changes seem all fine, but when I was doing a last check, I've come across an issue: When I'm running the test for KafkaSource in debug mode, it seems to throw an exception in the middle and never run the asserts. (Which is silent, because there is a throws Exception on the method.) The problem seems to be around String topic = findUnusedTopic(); kafkaServer.createTopic(topic, 1); I think, removing the parts of the test method that are not directly testing the current change would help. Can you please make a few changes to the patch? (Or let me know if I'm wrong.) Thank you PS. maybe a reviewboard request could help to discuss the code
          Hide
          denes Denes Arvay added a comment -

          Actually it's not silent, the test fails due to the exception.

          kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
          
          	at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:77)
          	at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:236)
          	at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
          	at org.apache.flume.source.kafka.KafkaSourceEmbeddedKafka.createTopic(KafkaSourceEmbeddedKafka.java:134)
          	at org.apache.flume.source.kafka.TestKafkaSource.checkDefaultSettingsOnRestart(TestKafkaSource.java:645)
          	at org.apache.flume.source.kafka.TestKafkaSource.testDefaultSettingsOnReConfigure(TestKafkaSource.java:632)
          	...
          

          I agree with Donat, I also think that there is no need for starting the whole kafka source, calling two configure}}s with different context should be enough, as the context's properties are copied to the {{kafkaProps in KafkaSource, which is checked by the test. We'd hit two birds with one stone by removing the unneeded lines: the test would be simpler and the above mentioned issue wouldn't come up.

          Show
          denes Denes Arvay added a comment - Actually it's not silent, the test fails due to the exception. kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:77) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:236) at kafka.admin.AdminUtils.createTopic(AdminUtils.scala) at org.apache.flume.source.kafka.KafkaSourceEmbeddedKafka.createTopic(KafkaSourceEmbeddedKafka.java:134) at org.apache.flume.source.kafka.TestKafkaSource.checkDefaultSettingsOnRestart(TestKafkaSource.java:645) at org.apache.flume.source.kafka.TestKafkaSource.testDefaultSettingsOnReConfigure(TestKafkaSource.java:632) ... I agree with Donat, I also think that there is no need for starting the whole kafka source, calling two configure}}s with different context should be enough, as the context's properties are copied to the {{kafkaProps in KafkaSource , which is checked by the test. We'd hit two birds with one stone by removing the unneeded lines: the test would be simpler and the above mentioned issue wouldn't come up.
          Hide
          tmgstev Tristan Stevens added a comment -

          +1 to not needing to start the source... however the test shouldn't be failing for that reason, and it doesn't fail for me. I'll remove the code, but there may be some flakiness that needs investigating. I'll raise a review board with an updated patch

          Show
          tmgstev Tristan Stevens added a comment - +1 to not needing to start the source... however the test shouldn't be failing for that reason, and it doesn't fail for me. I'll remove the code, but there may be some flakiness that needs investigating. I'll raise a review board with an updated patch
          Hide
          tmgstev Tristan Stevens added a comment -

          Posted original and revised patch to review board. Link attached.

          Show
          tmgstev Tristan Stevens added a comment - Posted original and revised patch to review board. Link attached.
          Hide
          bessbd Bessenyei Balázs Donát added a comment -

          LGTM, +1

          I'm about to commit this

          Show
          bessbd Bessenyei Balázs Donát added a comment - LGTM, +1 I'm about to commit this
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 14fb4d84fd0e100253ca947bc96810c242e7a82b in flume's branch refs/heads/trunk from Tristan Stevens
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=14fb4d8 ]

          FLUME-2857. Make Kafka Source/Channel/Sink restore default values when live updating config

          This commit changes Kafka Channel, Sink and Source to fix an error where
          sub-configurations aren't tolerant of the configure() method being called more
          than once (as happens when a Live Config Update happens).

          Reviewers: Denes Arvay, Attila Simon, Bessenyei Balázs Donát

          (Tristan Stevens via Bessenyei Balázs Donát)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 14fb4d84fd0e100253ca947bc96810c242e7a82b in flume's branch refs/heads/trunk from Tristan Stevens [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=14fb4d8 ] FLUME-2857 . Make Kafka Source/Channel/Sink restore default values when live updating config This commit changes Kafka Channel, Sink and Source to fix an error where sub-configurations aren't tolerant of the configure() method being called more than once (as happens when a Live Config Update happens). Reviewers: Denes Arvay, Attila Simon, Bessenyei Balázs Donát (Tristan Stevens via Bessenyei Balázs Donát)
          Hide
          bessbd Bessenyei Balázs Donát added a comment -

          Tristan Stevens: thank you for the patch!

          Denes Arvay, Attila Simon: thank you for the reviews!

          Show
          bessbd Bessenyei Balázs Donát added a comment - Tristan Stevens : thank you for the patch! Denes Arvay , Attila Simon : thank you for the reviews!
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in Jenkins build Flume-trunk-hbase-1 #225 (See https://builds.apache.org/job/Flume-trunk-hbase-1/225/)
          FLUME-2857. Make Kafka Source/Channel/Sink restore default values when (bessbd: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=14fb4d84fd0e100253ca947bc96810c242e7a82b)

          • (edit) flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
          • (edit) flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
          • (edit) flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
          • (edit) flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
          • (edit) flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
          • (edit) flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in Jenkins build Flume-trunk-hbase-1 #225 (See https://builds.apache.org/job/Flume-trunk-hbase-1/225/ ) FLUME-2857 . Make Kafka Source/Channel/Sink restore default values when (bessbd: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=14fb4d84fd0e100253ca947bc96810c242e7a82b ) (edit) flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java (edit) flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java (edit) flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java (edit) flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java (edit) flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java (edit) flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java

            People

            • Assignee:
              tmgstev Tristan Stevens
              Reporter:
              tmgstev Tristan Stevens
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development