Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
None
-
Docs Required, Release Notes Required
Description
Now KafkaToIgniteCdcStreamerApplier[1] and IgniteToKafkaCdcStreamer[2] perform requests with a hard-coded timeout equal to DFLT_REQ_TIMEOUT:
KafkaToIgniteCdcStreamerApplier
/** */ public static final int DFLT_REQ_TIMEOUT = 3; ... private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException { ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT)); if (log.isDebugEnabled()) { log.debug( "Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']' ); } apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key()))); cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT)); }
IgniteToKafkaCdcStreamer
/** Default kafka request timeout in seconds. */ public static final int DFLT_REQ_TIMEOUT = 5; ... @Override public boolean onEvents(Iterator<CdcEvent> evts) { List<Future<RecordMetadata>> futs = new ArrayList<>(); ... if (!futs.isEmpty()) { try { for (Future<RecordMetadata> fut : futs) fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS); msgsSnt.add(futs.size()); lastMsgTs.value(System.currentTimeMillis()); }
We should have configurable timeout for requests to the Kafka.
- https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java#L203
- https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java#L197
Attachments
Issue Links
- fixes
-
IGNITE-16586 Provide named parameters for Cdc streamers
- Resolved
- is cloned by
-
IGNITE-16586 Provide named parameters for Cdc streamers
- Resolved
-
IGNITE-16664 Add support of named parameters to IgniteToIgniteCdcStreamer
- Resolved
- links to