Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35602 [Umbrella] Test Flink Release 1.20
  3. FLINK-35697

Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Open
    • Blocker
    • Resolution: Unresolved
    • None
    • 1.20.0
    • Connectors / Common
    • None

    Description

      Description

      In FLIP-451 we added Timeout configuration to AsyncSinkWriter, with default value of 10 minutes and default failOnTimeout to false.
      We need to test the new feature on different levels

      • Functional Testing
      • Performance Testing
      • Regression Testing

      Common Utils

      The feature introduced affects an abstract AsyncSinkWriter class. we need to use an implementation sink for our tests, Any implementation where we can track delivery of elements is accepted in our tests, an example is:

      class DiscardingElementWriter extends AsyncSinkWriter<T, String> {
              SeparateThreadExecutor executor =
                      new SeparateThreadExecutor(r -> new Thread(r, "DiscardingElementWriter"));
      
              public DiscardingElementWriter(
                      Sink.InitContext context,
                      AsyncSinkWriterConfiguration configuration,
                      Collection<BufferedRequestState<String>> bufferedRequestStates) {
                  super(
                          (element, context1) -> element.toString(),
                          context,
                          configuration,
                          bufferedRequestStates);
              }
      
              @Override
              protected long getSizeInBytes(String requestEntry) {
                  return requestEntry.length();
              }
      
              @Override
              protected void submitRequestEntries(
                      List<String> requestEntries, ResultHandler<String> resultHandler) {
                  executor.execute(
                          () -> {
                              long delayMillis = new Random().nextInt(5000);
                              try {
                                  Thread.sleep(delayMillis);
                              } catch (InterruptedException ignored) {
                              }
                              for (String entry : requestEntries) {
                                  LOG.info("Discarding {} after {} ms", entry, delayMillis);
                              }
      
                              resultHandler.complete();
                          });
              }
          }
      

      We will also need a simple Flink Job that writes data using the sink

          final StreamExecutionEnvironment env = StreamExecutionEnvironment
                      .getExecutionEnvironment();
              env.setParallelism(1);
              env.fromSequence(0, 100)
                      .map(Object::toString)
                      .sinkTo(new DiscardingTestAsyncSink<>());
      

      We can use least values for batch size and inflight requests to increase number of requests that are subject to timeout

      public class DiscardingTestAsyncSink<T> extends AsyncSinkBase<T, String> {
          private static final Logger LOG = LoggerFactory.getLogger(DiscardingTestAsyncSink.class);
      
          public DiscardingTestAsyncSink(long requestTimeoutMS, boolean failOnTimeout) {
              super(
                      (element, context) -> element.toString(),
                      1, // maxBatchSize
                      1, // maxInflightRequests
                      10, // maxBufferedRequests
                      1000L, // maxBatchsize
                      100, // MaxTimeInBuffer
                      500L, // maxRecordSize
                      requestTimeoutMS,
                      failOnTimeout);
          }
      
          @Override
          public SinkWriter<T> createWriter(WriterInitContext context) throws IOException {
              return new DiscardingElementWriter(
                      new InitContextWrapper(context),
                      AsyncSinkWriterConfiguration.builder()
                              .setMaxBatchSize(this.getMaxBatchSize())
                              .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
                              .setMaxInFlightRequests(this.getMaxInFlightRequests())
                              .setMaxBufferedRequests(this.getMaxBufferedRequests())
                              .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
                              .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
                              .setFailOnTimeout(this.getFailOnTimeout())
                              .setRequestTimeoutMS(this.getRequestTimeoutMS())
                              .build(),
                      Collections.emptyList());
          }
      
          @Override
          public StatefulSinkWriter<T, BufferedRequestState<String>> restoreWriter(
                  WriterInitContext context, Collection<BufferedRequestState<String>> recoveredState)
                  throws IOException {
              return new DiscardingElementWriter(
                      new InitContextWrapper(context),
                      AsyncSinkWriterConfiguration.builder()
                              .setMaxBatchSize(this.getMaxBatchSize())
                              .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
                              .setMaxInFlightRequests(this.getMaxInFlightRequests())
                              .setMaxBufferedRequests(this.getMaxBufferedRequests())
                              .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
                              .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
                              .setFailOnTimeout(this.getFailOnTimeout())
                              .setRequestTimeoutMS(this.getRequestTimeoutMS())
                              .build(),
                      recoveredState);
          }
      

      Functional tests

      These are common tests to verify the new feature works correctly withing Flink jobs

      Test Timeout Requests are retried ensuring at least once semantics

      Steps

      • pull and compile release-1.20 branch
      • start flink cluster from flink-dist ./start-cluster.sh
      • Configure the requestTimeout value in your job to sample number of timed out requests.
      • compile and package your test job
      • open Flink Dashboard, upload the job jar and submit the job
      • Verify from the logs that all elements are delivered and all elemements delivered with delay more than the configured timeout are resubmitted.

      hint 1 : It is advised to use timeout closed to the simulated delay in the sink so that the retried requests are not too much to track
      hint 2: It is also advised to use the debugger to check timed out requests on the fly.

      Test Fail on timeout fails Job

      Steps

      Follow same steps of setting up job and cluster but use enable the failOnTimeout flag

      • Verify that the jobs fails with TimeoutException visible in Job logs

      Test With Checkpoints enabled

      Steps

      Follow same steps of setting up job and cluster with checkpoints enabled

      • Verify checkpoints are taken successfully

      Performance testing

      We want to verify the introduced feature doesn't affect performance of the sink, specifically it doesn't introduce unnecessary backpressure

      Steps

      Execute same steps for setting up job and cluster but use a datagen source in the test job to control throughput and use default values for requestTimeOut = 10 minutes

      • Run the job till it is in stable state
      • Verify the Sink doesn't introduce backpressure to the job.

      Regression Testing

      We need to verify the Sink doesn't cause regression in existing implementers, for example we want to make sure there is no significant load of duplicate data due to timeouts on default values using an existing sink.

      We have the following implementers under the community support
      Kinesis, Firehose, DynamoDb, ElasticSearch

      It is advisable to test with all of them

      Steps

      • Run a simple job that sinks data from Datagen source to the used sink
      • Benchmark the throughput to the Sink destination
      • clone the sink connector repo as in https://github.com/apache/flink-connector-aws
      • update the Flink version in the repo to 1.20-SNAPSHOT
      • Rerun the job and compare the throughput metrics with the benchmark.
      • Verify there is no regression between the 2 cases.

      Attachments

        Activity

          People

            Unassigned Unassigned
            chalixar Ahmed Hamdy
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: