Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Both Sink and Outputformat have the same logic: they support maxConcurrentRequests with a semaphore by counting the number of in-flight write requests and acquire/release semaphore permits with each new request begin/end. At close time, during the flush they wait for all the requests to be processed by acquiring all the permits set in the semaphore.
If a given write fails because there are still maxConcurrentRequests in flight after the given wait period, a TimeoutException will be thrown (while trying to acquire a permit from the semaphore) . Then Flink will catch this exception and close the sink/format. During the flush it will try to acquire permits from the semaphore once again and this will lead to another wait period and another TimeoutException.
Ideally, we should wait only once and get only one TimeoutException.