Details
-
Sub-task
-
Status: Resolved
-
Blocker
-
Resolution: Resolved
-
1.20.0
Description
Description
In FLIP-451 we added Timeout configuration to AsyncSinkWriter, with default value of 10 minutes and default failOnTimeout to false.
We need to test the new feature on different levels
- Functional Testing
- Performance Testing
- Regression Testing
Common Utils
The feature introduced affects an abstract AsyncSinkWriter class. we need to use an implementation sink for our tests, Any implementation where we can track delivery of elements is accepted in our tests, an example is:
class DiscardingElementWriter extends AsyncSinkWriter<T, String> { SeparateThreadExecutor executor = new SeparateThreadExecutor(r -> new Thread(r, "DiscardingElementWriter")); public DiscardingElementWriter( Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection<BufferedRequestState<String>> bufferedRequestStates) { super( (element, context1) -> element.toString(), context, configuration, bufferedRequestStates); } @Override protected long getSizeInBytes(String requestEntry) { return requestEntry.length(); } @Override protected void submitRequestEntries( List<String> requestEntries, ResultHandler<String> resultHandler) { executor.execute( () -> { long delayMillis = new Random().nextInt(5000); try { Thread.sleep(delayMillis); } catch (InterruptedException ignored) { } for (String entry : requestEntries) { LOG.info("Discarding {} after {} ms", entry, delayMillis); } resultHandler.complete(); }); } }
We will also need a simple Flink Job that writes data using the sink
final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); env.setParallelism(1); env.fromSequence(0, 100) .map(Object::toString) .sinkTo(new DiscardingTestAsyncSink<>());
We can use least values for batch size and inflight requests to increase number of requests that are subject to timeout
public class DiscardingTestAsyncSink<T> extends AsyncSinkBase<T, String> { private static final Logger LOG = LoggerFactory.getLogger(DiscardingTestAsyncSink.class); public DiscardingTestAsyncSink(long requestTimeoutMS, boolean failOnTimeout) { super( (element, context) -> element.toString(), 1, // maxBatchSize 1, // maxInflightRequests 10, // maxBufferedRequests 1000L, // maxBatchsize 100, // MaxTimeInBuffer 500L, // maxRecordSize requestTimeoutMS, failOnTimeout); } @Override public SinkWriter<T> createWriter(WriterInitContext context) throws IOException { return new DiscardingElementWriter( new InitContextWrapper(context), AsyncSinkWriterConfiguration.builder() .setMaxBatchSize(this.getMaxBatchSize()) .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes()) .setMaxInFlightRequests(this.getMaxInFlightRequests()) .setMaxBufferedRequests(this.getMaxBufferedRequests()) .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS()) .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes()) .setFailOnTimeout(this.getFailOnTimeout()) .setRequestTimeoutMS(this.getRequestTimeoutMS()) .build(), Collections.emptyList()); } @Override public StatefulSinkWriter<T, BufferedRequestState<String>> restoreWriter( WriterInitContext context, Collection<BufferedRequestState<String>> recoveredState) throws IOException { return new DiscardingElementWriter( new InitContextWrapper(context), AsyncSinkWriterConfiguration.builder() .setMaxBatchSize(this.getMaxBatchSize()) .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes()) .setMaxInFlightRequests(this.getMaxInFlightRequests()) .setMaxBufferedRequests(this.getMaxBufferedRequests()) .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS()) .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes()) .setFailOnTimeout(this.getFailOnTimeout()) .setRequestTimeoutMS(this.getRequestTimeoutMS()) .build(), recoveredState); }
Functional tests
These are common tests to verify the new feature works correctly withing Flink jobs
Test Timeout Requests are retried ensuring at least once semantics
Steps
- pull and compile release-1.20 branch
- start flink cluster from flink-dist ./start-cluster.sh
- Configure the requestTimeout value in your job to sample number of timed out requests.
- compile and package your test job
- open Flink Dashboard, upload the job jar and submit the job
- Verify from the logs that all elements are delivered and all elemements delivered with delay more than the configured timeout are resubmitted.
hint 1 : It is advised to use timeout closed to the simulated delay in the sink so that the retried requests are not too much to track
hint 2: It is also advised to use the debugger to check timed out requests on the fly.
Test Fail on timeout fails Job
Steps
Follow same steps of setting up job and cluster but use enable the failOnTimeout flag
- Verify that the jobs fails with TimeoutException visible in Job logs
Test With Checkpoints enabled
Steps
Follow same steps of setting up job and cluster with checkpoints enabled
- Verify checkpoints are taken successfully
Performance testing
We want to verify the introduced feature doesn't affect performance of the sink, specifically it doesn't introduce unnecessary backpressure
Steps
Execute same steps for setting up job and cluster but use a datagen source in the test job to control throughput and use default values for requestTimeOut = 10 minutes
- Run the job till it is in stable state
- Verify the Sink doesn't introduce backpressure to the job.
Regression Testing
We need to verify the Sink doesn't cause regression in existing implementers, for example we want to make sure there is no significant load of duplicate data due to timeouts on default values using an existing sink.
We have the following implementers under the community support
Kinesis, Firehose, DynamoDb, ElasticSearch
It is advisable to test with all of them
Steps
- Run a simple job that sinks data from Datagen source to the used sink
- Benchmark the throughput to the Sink destination
- clone the sink connector repo as in https://github.com/apache/flink-connector-aws
- update the Flink version in the repo to 1.20-SNAPSHOT
- Rerun the job and compare the throughput metrics with the benchmark.
- Verify there is no regression between the 2 cases.
Attachments
Issue Links
- mentioned in
-
Page Loading...