Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-16176

Configurable request timeouts in KafkaToIgniteCdcStreamerApplier and IgniteToKafkaCdcStreamer

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 2.13
    • extensions
    • Docs Required, Release Notes Required

    Description

      Now KafkaToIgniteCdcStreamerApplier[1] and IgniteToKafkaCdcStreamer[2] perform requests with a hard-coded timeout equal to DFLT_REQ_TIMEOUT:

      KafkaToIgniteCdcStreamerApplier
          /** */
          public static final int DFLT_REQ_TIMEOUT = 3;
      
          ...
      
          private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
              ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
      
              if (log.isDebugEnabled()) {
                  log.debug(
                      "Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
                  );
              }
      
              apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
      
              cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
          }
      
      IgniteToKafkaCdcStreamer
          /** Default kafka request timeout in seconds. */
          public static final int DFLT_REQ_TIMEOUT = 5;
          
          ...
      
          @Override public boolean onEvents(Iterator<CdcEvent> evts) {
              List<Future<RecordMetadata>> futs = new ArrayList<>();
      
          ...
      
              if (!futs.isEmpty()) {
                  try {
                      for (Future<RecordMetadata> fut : futs)
                          fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
      
                      msgsSnt.add(futs.size());
      
                      lastMsgTs.value(System.currentTimeMillis());
                  }
      

      We should have configurable timeout for requests to the Kafka.

      1. https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java#L203
      2. https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java#L197

      Attachments

        Issue Links

          Activity

            People

              PetrovMikhail Mikhail Petrov
              shishkovilja Ilya Shishkov
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: