Uploaded image for project: 'Apache Twill'
  1. Apache Twill
  2. TWILL-199

Get next offset and handle offset error in KafkaConsumer.MessageCallback

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.0
    • Component/s: core
    • Labels:
      None

      Description

      The method void onReceived(Iterator<FetchedMessage> messages) in KafkaConsumer.MessageCallback can be more flexible with the change to Long onReceived(Iterator<FetchedMessage> messages) so that it can provide additional functionalities:
      1. To return the next offset to be fetched
      2. To handle offset non-existence or offset mismatch error and take action on the error

      This method will return null for backward compatibility when it doesn't need to provide the next offset.

      In concrete implementation, a class of a new interface KafkaOffsetProvider can be added as a member in KafkaConsumer.MessageCallback to perform the offset error handling and provide the next offset. Besides, KafkaOffsetProvider also has methods to provide the following functionalities:
      1. To fetch earliest/latest offset in Kafka
      2. To find the offset of a message with timestamp equal to the given timestamp in Kafka
      For backward compatibility, if KafkaOffsetProvider instance is not provided, its default value will be null and none of its methods will be called.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/twill/pull/16

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/twill/pull/16
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on the issue:

          https://github.com/apache/twill/pull/16

          Thank you! Just squashed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on the issue: https://github.com/apache/twill/pull/16 Thank you! Just squashed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on the issue:

          https://github.com/apache/twill/pull/16

          LGTM. Please squash the commits to one.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on the issue: https://github.com/apache/twill/pull/16 LGTM. Please squash the commits to one.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r95203552

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          I think the `FetchedMessage` doesn't carry the "current" offset, but the next message offset, right? Meaning one can not keep consuming the same message (e.g. there is some failure that it needs to retry on the same message).

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r95203552 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – I think the `FetchedMessage` doesn't carry the "current" offset, but the next message offset, right? Meaning one can not keep consuming the same message (e.g. there is some failure that it needs to retry on the same message).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r95000005

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          Then `startOffset` is not necessary here? `onReceived` can just keep the first message's offset by itself and the caller should guarantee that no empty iterator is passed to `onReceived` as in `SimpleKafkaConsumer`. Or should we still keep `startOffset` to allow empty iterator to be passed in?

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r95000005 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – Then `startOffset` is not necessary here? `onReceived` can just keep the first message's offset by itself and the caller should guarantee that no empty iterator is passed to `onReceived` as in `SimpleKafkaConsumer`. Or should we still keep `startOffset` to allow empty iterator to be passed in?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94918807

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          Wrap it with a peeking iterator from guava

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94918807 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – Wrap it with a peeking iterator from guava
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94915705

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          how can I get the offset of this first message in this case? `message.getNextOffset()--`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94915705 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – how can I get the offset of this first message in this case? `message.getNextOffset()--`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94914909

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          Not exactly. E.g. let say the offset submitted to Kafka for fetching is "0", but the message with offset "0" is already gone, so the actual first message fetched is having a larger offset.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94914909 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – Not exactly. E.g. let say the offset submitted to Kafka for fetching is "0", but the message with offset "0" is already gone, so the actual first message fetched is having a larger offset.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94914331

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          Yes, the current implementation follows this at line 454. Is just the caller's responsibility to honor this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94914331 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – Yes, the current implementation follows this at line 454. Is just the caller's responsibility to honor this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94914034

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          The description yes, but I think you need to change the implementation to honor it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94914034 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – The description yes, but I think you need to change the implementation to honor it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94913855

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          I see. So keep the original description of `startOffset`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94913855 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – I see. So keep the original description of `startOffset`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94913560

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          I understand the behavior, but is it a good API? As you see, it is quite difficult to explain, hence documenting it correctly. I feel it's much easier to explain and to use if we say the `startOffset` is the offset of the first message in the given iterator.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94913560 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – I understand the behavior, but is it a good API? As you see, it is quite difficult to explain, hence documenting it correctly. I feel it's much easier to explain and to use if we say the `startOffset` is the offset of the first message in the given iterator.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94912977

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          Yes, I think one should be able to use `-1L` to fetch an upcoming new message, which also aligns with the usage of `-1L` in Kafka's `SimpleConsumer`. If the latest message is read, and it's the first message in the iterator, one can just specify `startOffset` as the latest message offset if one doesn't want to skip it. If the latest message is not the first message in the iterator and its previous message was consumed, `onReceived` will still return the latest message offset.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94912977 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – Yes, I think one should be able to use `-1L` to fetch an upcoming new message, which also aligns with the usage of `-1L` in Kafka's `SimpleConsumer`. If the latest message is read, and it's the first message in the iterator, one can just specify `startOffset` as the latest message offset if one doesn't want to skip it. If the latest message is not the first message in the iterator and its previous message was consumed, `onReceived` will still return the latest message offset.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94911854

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          Is it? Or shall it return the offset of the first message being read? Other, e.g. if `-1L` was specified, and the latest message was read, but not being consumed by the `onReceived` method. If it returns `-1L`, then that particular message won't get fetched again if there is a newer message available. Is that the intention?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94911854 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – Is it? Or shall it return the offset of the first message being read? Other, e.g. if `-1L` was specified, and the latest message was read, but not being consumed by the `onReceived` method. If it returns `-1L`, then that particular message won't get fetched again if there is a newer message available. Is that the intention?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94874821

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          If one specified `-1L` or `-2L`, but no message is consumed in the method, then `-1L` or `-2L` still has to be returned

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94874821 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – If one specified `-1L` or `-2L`, but no message is consumed in the method, then `-1L` or `-2L` still has to be returned
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94874547

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          or you change the code in a way that this statement is correct.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94874547 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – or you change the code in a way that this statement is correct.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94861374

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          How about "The default offset to return as the offset to fetch next message if no message is consumed in this method"

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94861374 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – How about "The default offset to return as the offset to fetch next message if no message is consumed in this method"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94858965

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the first {@link FetchedMessage}

            in the iterator of new messages.

              • End diff –

          Is this true? If someone specified `-1L` or `-2L` as the offset, would the `startOffset` here be `-1L` or `-2L` or be the actual offset (which won't be negative) of the first message in the iterator?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94858965 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. End diff – Is this true? If someone specified `-1L` or `-2L` as the offset, would the `startOffset` here be `-1L` or `-2L` or be the actual offset (which won't be negative) of the first message in the iterator?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94817038

          — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java —
          @@ -189,6 +194,49 @@ public void finished() {
          }

          @Test
          + public void testKafkaClientSkipNext() throws Exception {
          + String topic = "testClient";
          + // Publish 30 messages with indecies the same as offsets within the range 0 - 29
          + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
          + t1.start();
          + t1.join();
          + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
          + t2.start();
          + t2.join();
          + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
          + t3.start();
          + t3.join();
          +
          + final CountDownLatch stopLatch = new CountDownLatch(1);
          + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
          + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(
          + new KafkaConsumer.MessageCallback() {
          + @Override
          + public long onReceived(long startOffset, Iterator<FetchedMessage> messages) {
          + if (messages.hasNext())

          { + offsetQueue.offer(startOffset); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + return message.getNextOffset() + 1; + }

          + return startOffset;
          + }
          +
          + @Override
          + public void finished()

          { + stopLatch.countDown(); + }

          + });
          + // 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read
          + for (int i = 0; i < 30; i += 2)

          { + Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS)); + }

          + Assert.assertEquals(0, offsetQueue.size());
          — End diff –

          should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94817038 — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java — @@ -189,6 +194,49 @@ public void finished() { } @Test + public void testKafkaClientSkipNext() throws Exception { + String topic = "testClient"; + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + final CountDownLatch stopLatch = new CountDownLatch(1); + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>(); + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume( + new KafkaConsumer.MessageCallback() { + @Override + public long onReceived(long startOffset, Iterator<FetchedMessage> messages) { + if (messages.hasNext()) { + offsetQueue.offer(startOffset); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + return message.getNextOffset() + 1; + } + return startOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + // 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read + for (int i = 0; i < 30; i += 2) { + Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS)); + } + Assert.assertEquals(0, offsetQueue.size()); — End diff – should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94691443

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -33,14 +33,16 @@

          /**

          • Invoked when new messages is available.
            + * @param startOffset Offset of the current message to be consumed.
              • End diff –

          This is not exactly clear. The `startOffset` is the offset used to fetch the messages, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94691443 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -33,14 +33,16 @@ /** Invoked when new messages is available. + * @param startOffset Offset of the current message to be consumed. End diff – This is not exactly clear. The `startOffset` is the offset used to fetch the messages, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94687750

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -35,12 +35,14 @@

          • Invoked when new messages is available.
          • @param messages Iterator of new messages. The {@link FetchedMessage}

            instance maybe reused in the Iterator

          • and across different invocation.
            + * @param startOffset Offset of the current message to be consumed.
            + * @return The offset of the message to be fetched next.
            */
          • void onReceived(Iterator<FetchedMessage> messages);
            + long onReceived(Iterator<FetchedMessage> messages, long startOffset);
              • End diff –

          Can you revert the ordering of the parameters? Logically it fetches from the `startOffset` to produce the `Iterator`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94687750 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -35,12 +35,14 @@ Invoked when new messages is available. @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator and across different invocation. + * @param startOffset Offset of the current message to be consumed. + * @return The offset of the message to be fetched next. */ void onReceived(Iterator<FetchedMessage> messages); + long onReceived(Iterator<FetchedMessage> messages, long startOffset); End diff – Can you revert the ordering of the parameters? Logically it fetches from the `startOffset` to produce the `Iterator`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94688058

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages, final long startOffset) {
            if (stopped.get()) { - return; + return startOffset; }
          • Futures.getUnchecked(executor.submit(new Runnable() {
            + return Futures.getUnchecked(executor.submit(new Callable<Long>() {
            + long nextOffset = startOffset;
              • End diff –

          You don't need this local variable. If the consumer is stopped, it always returns the `startOffset` inside the callable. Otherwise, the callable returns whatever returned by the `callback` delegate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94688058 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages, final long startOffset) { if (stopped.get()) { - return; + return startOffset; } Futures.getUnchecked(executor.submit(new Runnable() { + return Futures.getUnchecked(executor.submit(new Callable<Long>() { + long nextOffset = startOffset; End diff – You don't need this local variable. If the consumer is stopped, it always returns the `startOffset` inside the callable. Otherwise, the callable returns whatever returned by the `callback` delegate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94688950

          — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java —
          @@ -170,11 +172,57 @@ public void testKafkaClient() throws Exception {
          Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
          .MessageCallback() {
          @Override

          • public void onReceived(Iterator<FetchedMessage> messages) {
            + public long onReceived(Iterator<FetchedMessage> messages, long startOffset)
            Unknown macro: { + long nextOffset = startOffset; while (messages.hasNext()) { - LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString()); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); latch.countDown(); } + return nextOffset; + }

            +
            + @Override
            + public void finished()

            { + stopLatch.countDown(); + }

            + });
            +
            + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
            + cancel.cancel();
            + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
            + }
            +
            + @Test
            + public void testKafkaClientSkipNext() throws Exception {
            + String topic = "testClient";
            + // Publish 30 messages with indecies the same as offsets within the range 0 - 29
            + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
            + t1.start();
            + t1.join();
            + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
            + t2.start();
            + t2.join();
            + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
            + t3.start();
            + t3.join();
            +
            + // 15 messages will be counted since onReceived returns `message.getNextOffset() + 1` as next offset to read

              • End diff –

          I don't think the test is correct. You published 30 messages in three message set, hence the `onReceived` method will be called three times. The first time with messages 0-9 and you return 11. The second call with 11-19, and you return 20. The last call with 21-29. So in total there will be more than 15 messages.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94688950 — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java — @@ -170,11 +172,57 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages, long startOffset) Unknown macro: { + long nextOffset = startOffset; while (messages.hasNext()) { - LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString()); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); latch.countDown(); } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientSkipNext() throws Exception { + String topic = "testClient"; + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + // 15 messages will be counted since onReceived returns `message.getNextOffset() + 1` as next offset to read End diff – I don't think the test is correct. You published 30 messages in three message set, hence the `onReceived` method will be called three times. The first time with messages 0-9 and you return 11. The second call with 11-19, and you return 20. The last call with 21-29. So in total there will be more than 15 messages.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94658662

          — Diff: twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java —
          @@ -32,7 +32,7 @@
          Cancellable announce(String serviceName, int port);

          /**

          • * Registers an endpoint that could be discovered by external party with a payload
            + * Registers an endpoint that could be discovered by external party with a payload.
              • End diff –

          checkstyle fix

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94658662 — Diff: twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java — @@ -32,7 +32,7 @@ Cancellable announce(String serviceName, int port); /** * Registers an endpoint that could be discovered by external party with a payload + * Registers an endpoint that could be discovered by external party with a payload. End diff – checkstyle fix
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94561460

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) { - return; + return Long.MIN_VALUE; }
          • Futures.getUnchecked(executor.submit(new Runnable() {
            + return Futures.getUnchecked(executor.submit(new Callable<Long>() {
            + long nextOffset = Long.MIN_VALUE;
            @Override
          • public void run() {
            + public Long call() {
            if (stopped.get()) { - return; + return nextOffset; }
          • callback.onReceived(messages);
            + nextOffset = callback.onReceived(messages);
              • End diff –

          I was assuming that this `onReceived` can be called multiple times. Therefore, at line 286 when stopped is set to true, messages processed in previous calls of `onReceived` should be skipped.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94561460 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { - return; + return Long.MIN_VALUE; } Futures.getUnchecked(executor.submit(new Runnable() { + return Futures.getUnchecked(executor.submit(new Callable<Long>() { + long nextOffset = Long.MIN_VALUE; @Override public void run() { + public Long call() { if (stopped.get()) { - return; + return nextOffset; } callback.onReceived(messages); + nextOffset = callback.onReceived(messages); End diff – I was assuming that this `onReceived` can be called multiple times. Therefore, at line 286 when stopped is set to true, messages processed in previous calls of `onReceived` should be skipped.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94561337

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) {
          • return;
            + return Long.MIN_VALUE;
              • End diff –

          Using AtomicLong as a mutable parameter is not good. In general, immutability gives cleaner API

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94561337 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { return; + return Long.MIN_VALUE; End diff – Using AtomicLong as a mutable parameter is not good. In general, immutability gives cleaner API
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94559673

          — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java —
          @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
          Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
          .MessageCallback() {
          @Override

          • public void onReceived(Iterator<FetchedMessage> messages) {
            + public long onReceived(Iterator<FetchedMessage> messages)
            Unknown macro: { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + }

            +
            + @Override
            + public void finished()

            { + stopLatch.countDown(); + }

            + });
            +
            + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
            + cancel.cancel();
            + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
            + }
            +
            + @Test
            + public void testKafkaClientReadFromIdx() throws Exception {
            + String topic = "testClient";
            +
            + // Publish 30 messages with indecies the same as offsets within the range 0 - 29
            + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
            + t1.start();
            + t1.join();
            + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
            + t2.start();
            + t2.join();
            + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
            + t3.start();
            + t3.join();
            +
            + final int startIdx = 15;
            + final CountDownLatch latch = new CountDownLatch(30 - startIdx);
            + final CountDownLatch stopLatch = new CountDownLatch(1);
            + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
            + // Creater a consumer
            + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer();
            + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
            + .MessageCallback() {
            + long minOffset = -2; // earliest msg
            + long maxOffset = -1; // latest msg
            + @Override
            + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset

              • End diff –

          Right, since the offsets and indices of the messages are known, it can directly jump to the desired idx without binary search.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94559673 — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java — @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages) Unknown macro: { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientReadFromIdx() throws Exception { + String topic = "testClient"; + + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + final int startIdx = 15; + final CountDownLatch latch = new CountDownLatch(30 - startIdx); + final CountDownLatch stopLatch = new CountDownLatch(1); + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>(); + // Creater a consumer + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer(); + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer + .MessageCallback() { + long minOffset = -2; // earliest msg + long maxOffset = -1; // latest msg + @Override + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset End diff – Right, since the offsets and indices of the messages are known, it can directly jump to the desired idx without binary search.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94558645

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -68,7 +69,7 @@
          /**

          • A {@link KafkaConsumer}

            implementation using the scala kafka api.
            */
            -final class SimpleKafkaConsumer implements KafkaConsumer {
            +public final class SimpleKafkaConsumer implements KafkaConsumer {

              • End diff –

          Yes, it's an implementation detail. Just wondering for the future work.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94558645 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -68,7 +69,7 @@ /** A {@link KafkaConsumer} implementation using the scala kafka api. */ -final class SimpleKafkaConsumer implements KafkaConsumer { +public final class SimpleKafkaConsumer implements KafkaConsumer { End diff – Yes, it's an implementation detail. Just wondering for the future work.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94558457

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) {
          • return;
            + return Long.MIN_VALUE;
              • End diff –

          Maybe to pass an `AtomicLong` to this method, and it contains the initial offset and can be set to the next offset? Just realized this is probably an implementation detail in `SimpleKafkaConsumer` and is not general enough to be done in the interface.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94558457 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { return; + return Long.MIN_VALUE; End diff – Maybe to pass an `AtomicLong` to this method, and it contains the initial offset and can be set to the next offset? Just realized this is probably an implementation detail in `SimpleKafkaConsumer` and is not general enough to be done in the interface.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94557551

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -68,7 +69,7 @@
          /**

          • A {@link KafkaConsumer}

            implementation using the scala kafka api.
            */
            -final class SimpleKafkaConsumer implements KafkaConsumer {
            +public final class SimpleKafkaConsumer implements KafkaConsumer {

              • End diff –

          It's independent of this PR, isn't it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94557551 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -68,7 +69,7 @@ /** A {@link KafkaConsumer} implementation using the scala kafka api. */ -final class SimpleKafkaConsumer implements KafkaConsumer { +public final class SimpleKafkaConsumer implements KafkaConsumer { End diff – It's independent of this PR, isn't it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94557480

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) {
          • return;
            + return Long.MIN_VALUE;
              • End diff –

          Why? The offset passed to this method is the offset being used for fetching, hence the creation of the iterator. The offset returned, on the other hand governs the offset to use for the next fetch

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94557480 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { return; + return Long.MIN_VALUE; End diff – Why? The offset passed to this method is the offset being used for fetching, hence the creation of the iterator. The offset returned, on the other hand governs the offset to use for the next fetch
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94556442

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) {
          • return;
            + return Long.MIN_VALUE;
              • End diff –

          If the offset is passed to `onReceived`, it seems that there's no need to return an offset? Because now the returned offset is used to set the offset outside of the `onReceived` method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94556442 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { return; + return Long.MIN_VALUE; End diff – If the offset is passed to `onReceived`, it seems that there's no need to return an offset? Because now the returned offset is used to set the offset outside of the `onReceived` method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94555234

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -68,7 +69,7 @@
          /**

          • A {@link KafkaConsumer}

            implementation using the scala kafka api.
            */
            -final class SimpleKafkaConsumer implements KafkaConsumer {
            +public final class SimpleKafkaConsumer implements KafkaConsumer {

              • End diff –

          Right, this can be done in the unit test. To phrase my question better, what can be a better way to perform the same logic as `getLastOffset` outside of `SimpleKafkaConsumer`? Maybe expose the parameter passed to `SimpleKafkaConsumer` constructor in `ZKKafkaClientService`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94555234 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -68,7 +69,7 @@ /** A {@link KafkaConsumer} implementation using the scala kafka api. */ -final class SimpleKafkaConsumer implements KafkaConsumer { +public final class SimpleKafkaConsumer implements KafkaConsumer { End diff – Right, this can be done in the unit test. To phrase my question better, what can be a better way to perform the same logic as `getLastOffset` outside of `SimpleKafkaConsumer`? Maybe expose the parameter passed to `SimpleKafkaConsumer` constructor in `ZKKafkaClientService`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on the issue:

          https://github.com/apache/twill/pull/16

          There are checkstyle failure, please fix them as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on the issue: https://github.com/apache/twill/pull/16 There are checkstyle failure, please fix them as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94551913

          — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java —
          @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
          Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
          .MessageCallback() {
          @Override

          • public void onReceived(Iterator<FetchedMessage> messages) {
            + public long onReceived(Iterator<FetchedMessage> messages)
            Unknown macro: { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + }

            +
            + @Override
            + public void finished()

            { + stopLatch.countDown(); + }

            + });
            +
            + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
            + cancel.cancel();
            + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
            + }
            +
            + @Test
            + public void testKafkaClientReadFromIdx() throws Exception {
            + String topic = "testClient";
            +
            + // Publish 30 messages with indecies the same as offsets within the range 0 - 29
            + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
            + t1.start();
            + t1.join();
            + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
            + t2.start();
            + t2.join();
            + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
            + t3.start();
            + t3.join();
            +
            + final int startIdx = 15;
            + final CountDownLatch latch = new CountDownLatch(30 - startIdx);
            + final CountDownLatch stopLatch = new CountDownLatch(1);
            + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
            + // Creater a consumer
            + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer();
            + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
            + .MessageCallback() {
            + long minOffset = -2; // earliest msg
            + long maxOffset = -1; // latest msg
            + @Override
            + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset

              • End diff –

          Seems like this unit-test is unnecessarily complicated. All you want to test is that the offset being returned from the `onReceived` method is being honored, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94551913 — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java — @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages) Unknown macro: { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientReadFromIdx() throws Exception { + String topic = "testClient"; + + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + final int startIdx = 15; + final CountDownLatch latch = new CountDownLatch(30 - startIdx); + final CountDownLatch stopLatch = new CountDownLatch(1); + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>(); + // Creater a consumer + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer(); + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer + .MessageCallback() { + long minOffset = -2; // earliest msg + long maxOffset = -1; // latest msg + @Override + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset End diff – Seems like this unit-test is unnecessarily complicated. All you want to test is that the offset being returned from the `onReceived` method is being honored, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94551779

          — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java —
          @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
          Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
          .MessageCallback() {
          @Override

          • public void onReceived(Iterator<FetchedMessage> messages) {
            + public long onReceived(Iterator<FetchedMessage> messages)
            Unknown macro: { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + }

            +
            + @Override
            + public void finished()

            { + stopLatch.countDown(); + }

            + });
            +
            + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
            + cancel.cancel();
            + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
            + }
            +
            + @Test
            + public void testKafkaClientReadFromIdx() throws Exception {
            + String topic = "testClient";
            +
            + // Publish 30 messages with indecies the same as offsets within the range 0 - 29
            + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
            + t1.start();
            + t1.join();
            + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
            + t2.start();
            + t2.join();
            + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
            + t3.start();
            + t3.join();
            +
            + final int startIdx = 15;
            + final CountDownLatch latch = new CountDownLatch(30 - startIdx);
            + final CountDownLatch stopLatch = new CountDownLatch(1);
            + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
            + // Creater a consumer
            + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer();
            + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
            + .MessageCallback() {
            + long minOffset = -2; // earliest msg
            + long maxOffset = -1; // latest msg
            + @Override
            + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset
            + // to fetch message until the matching message is found.
            + public long onReceived(Iterator<FetchedMessage> messages) {
            + while (messages.hasNext()) {
            + FetchedMessage currentMsg = messages.next();
            + long currentOffset = currentMsg.getNextOffset() - 1;
            + String decodedMsg = Charsets.UTF_8.decode(currentMsg.getPayload()).toString();
            + LOG.info(decodedMsg);
            + int currentIdx = Integer.valueOf(decodedMsg.split(" ")[0]);
            + LOG.info("Current offset = {}, currentIdx = {}. minOffset = {}", currentOffset, currentIdx, minOffset);
            + if (currentIdx == startIdx) {
            + if (offsetQueue.size() == 0) {
            + offsetQueue.offer(currentOffset);
            + LOG.info("currentOffset = {} matches startIdx {}", currentOffset, startIdx);
            + }
            + return currentOffset;
            + }
            + // If minOffset and maxOffset still have their initial values, set the minOffset to currentOffset and return
            + // the offset of the last received message
            + if (minOffset == -2 && maxOffset == -1) {
            + minOffset = currentOffset;
            + LOG.info("minOffset = {}, return maxOffset = {}", minOffset, maxOffset);
            + // Returns the offset of the last received messages. Cannot return -1 because -1 will be translated as
            + // the next offset after the last received message
            + return consumer.getLastOffset(currentMsg.getTopicPartition(), -1) - 1;
            + }
            + if (maxOffset == -1)

            { + maxOffset = currentOffset; + }

            + LOG.info("minOffset = {}, maxOffset = {}", minOffset, maxOffset);
            + // If minOffset > maxOffset, the startIdx cannot be found in the current range of offset.
            + // Restore the initial values of minOffset and maxOffset and read from the beginning again
            + if (minOffset > maxOffset) {
            + minOffset = -2;
            + maxOffset = -1;
            + LOG.info("minOffset > maxOffset, return minOffset = {}", minOffset);
            + return minOffset;
            + }
            + if (currentIdx > startIdx) {
            + maxOffset = currentOffset - 1;
            + long newOffset = minOffset + (maxOffset - minOffset)/2;
            + LOG.info("currentIdx > startIdx, return newOffset {}", newOffset);
            + return newOffset;
            + }
            + if (currentIdx < startIdx) {
            + minOffset = currentOffset + 1;
            + long newOffset = minOffset + (maxOffset - minOffset)/2;
            + LOG.info("currentIdx < startIdx, return newOffset {}", newOffset);
            + return newOffset;
            + }
            + }
            + return Long.MIN_VALUE;
            + }
            +
            + @Override
            + public void finished()

            { + //no-op + }

            + });
            +
            + long startOffset = offsetQueue.poll(360, TimeUnit.SECONDS);
            + initCancel.cancel();
            +
            + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, startOffset).consume(new KafkaConsumer
            + .MessageCallback() {

              • End diff –

          This is an awkward line break.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94551779 — Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java — @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages) Unknown macro: { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientReadFromIdx() throws Exception { + String topic = "testClient"; + + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + final int startIdx = 15; + final CountDownLatch latch = new CountDownLatch(30 - startIdx); + final CountDownLatch stopLatch = new CountDownLatch(1); + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>(); + // Creater a consumer + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer(); + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer + .MessageCallback() { + long minOffset = -2; // earliest msg + long maxOffset = -1; // latest msg + @Override + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset + // to fetch message until the matching message is found. + public long onReceived(Iterator<FetchedMessage> messages) { + while (messages.hasNext()) { + FetchedMessage currentMsg = messages.next(); + long currentOffset = currentMsg.getNextOffset() - 1; + String decodedMsg = Charsets.UTF_8.decode(currentMsg.getPayload()).toString(); + LOG.info(decodedMsg); + int currentIdx = Integer.valueOf(decodedMsg.split(" ") [0] ); + LOG.info("Current offset = {}, currentIdx = {}. minOffset = {}", currentOffset, currentIdx, minOffset); + if (currentIdx == startIdx) { + if (offsetQueue.size() == 0) { + offsetQueue.offer(currentOffset); + LOG.info("currentOffset = {} matches startIdx {}", currentOffset, startIdx); + } + return currentOffset; + } + // If minOffset and maxOffset still have their initial values, set the minOffset to currentOffset and return + // the offset of the last received message + if (minOffset == -2 && maxOffset == -1) { + minOffset = currentOffset; + LOG.info("minOffset = {}, return maxOffset = {}", minOffset, maxOffset); + // Returns the offset of the last received messages. Cannot return -1 because -1 will be translated as + // the next offset after the last received message + return consumer.getLastOffset(currentMsg.getTopicPartition(), -1) - 1; + } + if (maxOffset == -1) { + maxOffset = currentOffset; + } + LOG.info("minOffset = {}, maxOffset = {}", minOffset, maxOffset); + // If minOffset > maxOffset, the startIdx cannot be found in the current range of offset. + // Restore the initial values of minOffset and maxOffset and read from the beginning again + if (minOffset > maxOffset) { + minOffset = -2; + maxOffset = -1; + LOG.info("minOffset > maxOffset, return minOffset = {}", minOffset); + return minOffset; + } + if (currentIdx > startIdx) { + maxOffset = currentOffset - 1; + long newOffset = minOffset + (maxOffset - minOffset)/2; + LOG.info("currentIdx > startIdx, return newOffset {}", newOffset); + return newOffset; + } + if (currentIdx < startIdx) { + minOffset = currentOffset + 1; + long newOffset = minOffset + (maxOffset - minOffset)/2; + LOG.info("currentIdx < startIdx, return newOffset {}", newOffset); + return newOffset; + } + } + return Long.MIN_VALUE; + } + + @Override + public void finished() { + //no-op + } + }); + + long startOffset = offsetQueue.poll(360, TimeUnit.SECONDS); + initCancel.cancel(); + + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, startOffset).consume(new KafkaConsumer + .MessageCallback() { End diff – This is an awkward line break.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94551691

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) { - return; + return Long.MIN_VALUE; }
          • Futures.getUnchecked(executor.submit(new Runnable() {
            + return Futures.getUnchecked(executor.submit(new Callable<Long>() {
            + long nextOffset = Long.MIN_VALUE;
            @Override
          • public void run() {
            + public Long call() {
            if (stopped.get()) { - return; + return nextOffset; }
          • callback.onReceived(messages);
            + nextOffset = callback.onReceived(messages);
              • End diff –

          From the current code logic, line 286 always return `Long.MIN_VALUE`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94551691 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { - return; + return Long.MIN_VALUE; } Futures.getUnchecked(executor.submit(new Runnable() { + return Futures.getUnchecked(executor.submit(new Callable<Long>() { + long nextOffset = Long.MIN_VALUE; @Override public void run() { + public Long call() { if (stopped.get()) { - return; + return nextOffset; } callback.onReceived(messages); + nextOffset = callback.onReceived(messages); End diff – From the current code logic, line 286 always return `Long.MIN_VALUE`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94551508

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) {
          • return;
            + return Long.MIN_VALUE;
              • End diff –

          Using special value is an anti-pattern. Since you are changing the `MessageCallback` API already, probably better to have the offset being used for the fetching call being passed to the `onReceived` method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94551508 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { return; + return Long.MIN_VALUE; End diff – Using special value is an anti-pattern. Since you are changing the `MessageCallback` API already, probably better to have the offset being used for the fetching call being passed to the `onReceived` method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94551352

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -68,7 +69,7 @@
          /**

          • A {@link KafkaConsumer}

            implementation using the scala kafka api.
            */
            -final class SimpleKafkaConsumer implements KafkaConsumer {
            +public final class SimpleKafkaConsumer implements KafkaConsumer {

              • End diff –

          That's not a good reason to turn this class to public. Why the unit test have to use this method? Can't the unit test just return some offset that it know of?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94551352 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -68,7 +69,7 @@ /** A {@link KafkaConsumer} implementation using the scala kafka api. */ -final class SimpleKafkaConsumer implements KafkaConsumer { +public final class SimpleKafkaConsumer implements KafkaConsumer { End diff – That's not a good reason to turn this class to public. Why the unit test have to use this method? Can't the unit test just return some offset that it know of?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94522513

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) {
          • return;
            + return Long.MIN_VALUE;
              • End diff –

          In this case, no message is processed, so the offset should remain unchanged to start reading from the current offset again next time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94522513 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { return; + return Long.MIN_VALUE; End diff – In this case, no message is processed, so the offset should remain unchanged to start reading from the current offset again next time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94522256

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -68,7 +69,7 @@
          /**

          • A {@link KafkaConsumer}

            implementation using the scala kafka api.
            */
            -final class SimpleKafkaConsumer implements KafkaConsumer {
            +public final class SimpleKafkaConsumer implements KafkaConsumer {

              • End diff –

          I wanted to use the `getLastOffset` method in the test. Haven't figured out a way to extract the logic in `getLastOffset` to other places without exposing the `SimpleKafkaConsumer` class because `getLastOffset` uses a lot of private members in `SimpleKafkaConsumer`

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94522256 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -68,7 +69,7 @@ /** A {@link KafkaConsumer} implementation using the scala kafka api. */ -final class SimpleKafkaConsumer implements KafkaConsumer { +public final class SimpleKafkaConsumer implements KafkaConsumer { End diff – I wanted to use the `getLastOffset` method in the test. Haven't figured out a way to extract the logic in `getLastOffset` to other places without exposing the `SimpleKafkaConsumer` class because `getLastOffset` uses a lot of private members in `SimpleKafkaConsumer`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94521832

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) { - return; + return Long.MIN_VALUE; }
          • Futures.getUnchecked(executor.submit(new Runnable() {
            + return Futures.getUnchecked(executor.submit(new Callable<Long>() {
            + long nextOffset = Long.MIN_VALUE;
            @Override
          • public void run() {
            + public Long call() {
            if (stopped.get()) { - return; + return nextOffset; }
          • callback.onReceived(messages);
            + nextOffset = callback.onReceived(messages);
              • End diff –

          Because in line 286, the stored `nextOffset` should be returned

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94521832 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { - return; + return Long.MIN_VALUE; } Futures.getUnchecked(executor.submit(new Runnable() { + return Futures.getUnchecked(executor.submit(new Callable<Long>() { + long nextOffset = Long.MIN_VALUE; @Override public void run() { + public Long call() { if (stopped.get()) { - return; + return nextOffset; } callback.onReceived(messages); + nextOffset = callback.onReceived(messages); End diff – Because in line 286, the stored `nextOffset` should be returned
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94521210

          — Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java —
          @@ -206,9 +206,12 @@ private LogMessageCallback(Iterable<LogHandler> logHandlers) {
          }

          @Override

          • public void onReceived(Iterator<FetchedMessage> messages) {
            + public long onReceived(Iterator<FetchedMessage> messages) {
            + long nextOffset = Long.MIN_VALUE;
              • End diff –

          Why it is min value?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94521210 — Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java — @@ -206,9 +206,12 @@ private LogMessageCallback(Iterable<LogHandler> logHandlers) { } @Override public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages) { + long nextOffset = Long.MIN_VALUE; End diff – Why it is min value?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94521320

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) {
          • return;
            + return Long.MIN_VALUE;
              • End diff –

          Why min value?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94521320 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { return; + return Long.MIN_VALUE; End diff – Why min value?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94521420

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -450,7 +453,10 @@ private boolean sleepIfEmpty(ByteBufferMessageSet messages) {
          private void invokeCallback(ByteBufferMessageSet messages, AtomicLong offset) {
          long savedOffset = offset.get();
          try {

          • callback.onReceived(createFetchedMessages(messages, offset));
            + Long nextOffset = callback.onReceived(createFetchedMessages(messages, offset));
              • End diff –

          Why use `Long`? Shouldn't it be `long`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94521420 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -450,7 +453,10 @@ private boolean sleepIfEmpty(ByteBufferMessageSet messages) { private void invokeCallback(ByteBufferMessageSet messages, AtomicLong offset) { long savedOffset = offset.get(); try { callback.onReceived(createFetchedMessages(messages, offset)); + Long nextOffset = callback.onReceived(createFetchedMessages(messages, offset)); End diff – Why use `Long`? Shouldn't it be `long`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94521381

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback,
          final AtomicBoolean stopped = new AtomicBoolean();
          return new MessageCallback() {
          @Override

          • public void onReceived(final Iterator<FetchedMessage> messages) {
            + public long onReceived(final Iterator<FetchedMessage> messages) {
            if (stopped.get()) { - return; + return Long.MIN_VALUE; }
          • Futures.getUnchecked(executor.submit(new Runnable() {
            + return Futures.getUnchecked(executor.submit(new Callable<Long>() {
            + long nextOffset = Long.MIN_VALUE;
            @Override
          • public void run() {
            + public Long call() {
            if (stopped.get()) { - return; + return nextOffset; }
          • callback.onReceived(messages);
            + nextOffset = callback.onReceived(messages);
              • End diff –

          Just `return callback.onReceived(messages)`. No need to store it to a local variable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94521381 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override public void onReceived(final Iterator<FetchedMessage> messages) { + public long onReceived(final Iterator<FetchedMessage> messages) { if (stopped.get()) { - return; + return Long.MIN_VALUE; } Futures.getUnchecked(executor.submit(new Runnable() { + return Futures.getUnchecked(executor.submit(new Callable<Long>() { + long nextOffset = Long.MIN_VALUE; @Override public void run() { + public Long call() { if (stopped.get()) { - return; + return nextOffset; } callback.onReceived(messages); + nextOffset = callback.onReceived(messages); End diff – Just `return callback.onReceived(messages)`. No need to store it to a local variable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94521515

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -35,8 +35,10 @@

          • Invoked when new messages is available.
          • @param messages Iterator of new messages. The {@link FetchedMessage}

            instance maybe reused in the Iterator

          • and across different invocation.
            + * @return The offset of the message to be fetched next. Returns {@code Long.MIN_VALUE}

            to keep the current offset

              • End diff –

          In what case is unchange?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94521515 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -35,8 +35,10 @@ Invoked when new messages is available. @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator and across different invocation. + * @return The offset of the message to be fetched next. Returns {@code Long.MIN_VALUE} to keep the current offset End diff – In what case is unchange?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r94521288

          — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java —
          @@ -68,7 +69,7 @@
          /**

          • A {@link KafkaConsumer}

            implementation using the scala kafka api.
            */
            -final class SimpleKafkaConsumer implements KafkaConsumer {
            +public final class SimpleKafkaConsumer implements KafkaConsumer {

              • End diff –

          Why turn this to public? This is an internal implementation of `KafkaConsumer`, which shouldn't be public

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94521288 — Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java — @@ -68,7 +69,7 @@ /** A {@link KafkaConsumer} implementation using the scala kafka api. */ -final class SimpleKafkaConsumer implements KafkaConsumer { +public final class SimpleKafkaConsumer implements KafkaConsumer { End diff – Why turn this to public? This is an internal implementation of `KafkaConsumer`, which shouldn't be public
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user albertshau commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r90149679

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java —
          @@ -0,0 +1,36 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.twill.kafka.client;
          +
          +/**
          + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
          + * not met, the method will return the next offset to continue searching for the message meeting this condition.
          + */
          +public interface KafkaOffsetProvider {
          — End diff –

          doesn't look like this interface belongs in Twill

          Show
          githubbot ASF GitHub Bot added a comment - Github user albertshau commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r90149679 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java — @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { — End diff – doesn't look like this interface belongs in Twill
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user albertshau commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r90152039

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -35,8 +35,10 @@

          • Invoked when new messages is available.
          • @param messages Iterator of new messages. The {@link FetchedMessage}

            instance maybe reused in the Iterator

          • and across different invocation.
            + * @return A long larger than zero as the offset to restart fetching messages when offset error is caught,
              • End diff –

          what happens if the offset returned is out of bounds? Should document the expected behavior in such circumstances. Looking at the implementation, it seems like we read from the earliest offset.

          Show
          githubbot ASF GitHub Bot added a comment - Github user albertshau commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r90152039 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -35,8 +35,10 @@ Invoked when new messages is available. @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator and across different invocation. + * @return A long larger than zero as the offset to restart fetching messages when offset error is caught, End diff – what happens if the offset returned is out of bounds? Should document the expected behavior in such circumstances. Looking at the implementation, it seems like we read from the earliest offset.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user albertshau commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r90151909

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -35,8 +35,10 @@

          • Invoked when new messages is available.
          • @param messages Iterator of new messages. The {@link FetchedMessage}

            instance maybe reused in the Iterator

          • and across different invocation.
            + * @return A long larger than zero as the offset to restart fetching messages when offset error is caught,
            + * {@code -1}

            if the error cannot be resolved. Returns

            {@code 0}

            if no need to restart fetching.
            */

          • void onReceived(Iterator<FetchedMessage> messages);
            + long onReceived(Iterator<FetchedMessage> messages);
              • End diff –

          -1 for latest and -2 for earliest seems to make sense. 0 is a valid kafka offset, though I suppose practically speaking, any use of 0 could be replaced by -2.

          It is unclear what an 'error' means. Does it mean the consumer should stop consuming? I think we can just leave that out.

          Show
          githubbot ASF GitHub Bot added a comment - Github user albertshau commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r90151909 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -35,8 +35,10 @@ Invoked when new messages is available. @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator and across different invocation. + * @return A long larger than zero as the offset to restart fetching messages when offset error is caught, + * {@code -1} if the error cannot be resolved. Returns {@code 0} if no need to restart fetching. */ void onReceived(Iterator<FetchedMessage> messages); + long onReceived(Iterator<FetchedMessage> messages); End diff – -1 for latest and -2 for earliest seems to make sense. 0 is a valid kafka offset, though I suppose practically speaking, any use of 0 could be replaced by -2. It is unclear what an 'error' means. Does it mean the consumer should stop consuming? I think we can just leave that out.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r89195623

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java —
          @@ -0,0 +1,36 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.twill.kafka.client;
          +
          +/**
          + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
          + * not met, the method will return the next offset to continue searching for the message meeting this condition.
          + */
          +public interface KafkaOffsetProvider {
          +
          + /**
          + * Check whether a message meets a given condition. If the condition is not met, return the next offset to
          — End diff –

          Right, how about changing this to an abstract class instead?

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89195623 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java — @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { + + /** + * Check whether a message meets a given condition. If the condition is not met, return the next offset to — End diff – Right, how about changing this to an abstract class instead?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r89185930

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java —
          @@ -0,0 +1,36 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.twill.kafka.client;
          +
          +/**
          + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
          + * not met, the method will return the next offset to continue searching for the message meeting this condition.
          + */
          +public interface KafkaOffsetProvider {
          +
          + /**
          + * Check whether a message meets a given condition. If the condition is not met, return the next offset to
          — End diff –

          It seems like this is not a clean contract. This is an interface, why we have to assume something passed in the constructor of the implementation class? Seem very confusing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89185930 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java — @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { + + /** + * Check whether a message meets a given condition. If the condition is not met, return the next offset to — End diff – It seems like this is not a clean contract. This is an interface, why we have to assume something passed in the constructor of the implementation class? Seem very confusing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r89042464

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java —
          @@ -0,0 +1,36 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.twill.kafka.client;
          +
          +/**
          + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
          + * not met, the method will return the next offset to continue searching for the message meeting this condition.
          + */
          +public interface KafkaOffsetProvider {
          — End diff –

          It will be implemented outside of Twill. The objects will be passed to the constructor of `MessageCallback`

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89042464 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java — @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { — End diff – It will be implemented outside of Twill. The objects will be passed to the constructor of `MessageCallback`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r89042335

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java —
          @@ -0,0 +1,36 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.twill.kafka.client;
          +
          +/**
          + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
          + * not met, the method will return the next offset to continue searching for the message meeting this condition.
          + */
          +public interface KafkaOffsetProvider {
          +
          + /**
          + * Check whether a message meets a given condition. If the condition is not met, return the next offset to
          — End diff –

          It will be provided in the constructor. For instance, the timestamp will be passed as a parameter in the constructor. Other parameters can also be passed instead, and they can be used in `getCandidateOffset` method

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89042335 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java — @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { + + /** + * Check whether a message meets a given condition. If the condition is not met, return the next offset to — End diff – It will be provided in the constructor. For instance, the timestamp will be passed as a parameter in the constructor. Other parameters can also be passed instead, and they can be used in `getCandidateOffset` method
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maochf commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r89042093

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java —
          @@ -0,0 +1,36 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.twill.kafka.client;
          +
          +/**
          + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
          + * not met, the method will return the next offset to continue searching for the message meeting this condition.
          + */
          +public interface KafkaOffsetProvider {
          +
          + /**
          + * Check whether a message meets a given condition. If the condition is not met, return the next offset to
          + * continue searching for the message meeting this condition.
          + * @param message

          {@link FetchedMessage}

          to check.
          + * @return A

          {@code long}

          larger than zero as the next offset to continue searching for the message meeting the
          + * given condition if the current message doesn't meet the condition. Return

          {code 0}

          if the current
          + * message meets the given condition. Return the earliest offset

          {@code -2}

          if no message meeting the
          + * condition can be found.
          + */
          + public long getCandidateOffset(FetchedMessage message);
          — End diff –

          I wanted to mean that the offset returned is the next message to be checked, but it is not guaranteed to satisfy the given condition.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89042093 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java — @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { + + /** + * Check whether a message meets a given condition. If the condition is not met, return the next offset to + * continue searching for the message meeting this condition. + * @param message {@link FetchedMessage} to check. + * @return A {@code long} larger than zero as the next offset to continue searching for the message meeting the + * given condition if the current message doesn't meet the condition. Return {code 0} if the current + * message meets the given condition. Return the earliest offset {@code -2} if no message meeting the + * condition can be found. + */ + public long getCandidateOffset(FetchedMessage message); — End diff – I wanted to mean that the offset returned is the next message to be checked, but it is not guaranteed to satisfy the given condition.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r89038561

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java —
          @@ -0,0 +1,36 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.twill.kafka.client;
          +
          +/**
          + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
          + * not met, the method will return the next offset to continue searching for the message meeting this condition.
          + */
          +public interface KafkaOffsetProvider {
          — End diff –

          How is this interface being used? I don't see any usage of this class.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89038561 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java — @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { — End diff – How is this interface being used? I don't see any usage of this class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r89038468

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java —
          @@ -0,0 +1,36 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.twill.kafka.client;
          +
          +/**
          + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is
          + * not met, the method will return the next offset to continue searching for the message meeting this condition.
          + */
          +public interface KafkaOffsetProvider {
          +
          + /**
          + * Check whether a message meets a given condition. If the condition is not met, return the next offset to
          — End diff –

          I don't see any condition. How is the condition provided?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89038468 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java — @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { + + /** + * Check whether a message meets a given condition. If the condition is not met, return the next offset to — End diff – I don't see any condition. How is the condition provided?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/16#discussion_r89038384

          — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java —
          @@ -35,8 +35,10 @@

          • Invoked when new messages is available.
          • @param messages Iterator of new messages. The {@link FetchedMessage}

            instance maybe reused in the Iterator

          • and across different invocation.
            + * @return A long larger than zero as the offset to restart fetching messages when offset error is caught,
            + * {@code -1}

            if the error cannot be resolved. Returns

            {@code 0}

            if no need to restart fetching.
            */

          • void onReceived(Iterator<FetchedMessage> messages);
            + long onReceived(Iterator<FetchedMessage> messages);
              • End diff –

          How to indicate to restart from earliest or latest?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89038384 — Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java — @@ -35,8 +35,10 @@ Invoked when new messages is available. @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator and across different invocation. + * @return A long larger than zero as the offset to restart fetching messages when offset error is caught, + * {@code -1} if the error cannot be resolved. Returns {@code 0} if no need to restart fetching. */ void onReceived(Iterator<FetchedMessage> messages); + long onReceived(Iterator<FetchedMessage> messages); End diff – How to indicate to restart from earliest or latest?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user maochf opened a pull request:

          https://github.com/apache/twill/pull/16

          TWILL-199 Handle offset error and return next offset in KafkaConsumer.MessageCallback

          https://issues.apache.org/jira/browse/TWILL-199

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/maochf/twill feature/find-correct-offset

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/twill/pull/16.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #16


          commit 6e5dc84793a4eb35a1a0b10a36ed5a00b5275414
          Author: Chengfeng <mao@cask.co>
          Date: 2016-11-22T03:04:52Z

          add KafkaOffsetProvider interface and return long in MessageCallback#onReceived


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user maochf opened a pull request: https://github.com/apache/twill/pull/16 TWILL-199 Handle offset error and return next offset in KafkaConsumer.MessageCallback https://issues.apache.org/jira/browse/TWILL-199 You can merge this pull request into a Git repository by running: $ git pull https://github.com/maochf/twill feature/find-correct-offset Alternatively you can review and apply these changes as the patch at: https://github.com/apache/twill/pull/16.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16 commit 6e5dc84793a4eb35a1a0b10a36ed5a00b5275414 Author: Chengfeng <mao@cask.co> Date: 2016-11-22T03:04:52Z add KafkaOffsetProvider interface and return long in MessageCallback#onReceived

            People

            • Assignee:
              cmao Chengfeng Mao
              Reporter:
              cmao Chengfeng Mao
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development