Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16543

Support setting schedule mode by config for Blink planner in batch mode

    XMLWordPrintableJSON

Details

    Description

      Currently Blink planner is bound to use the LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST schedule mode in batch mode. It is hard coded in the ExecutorUtils.setBatchProperties method.

      public static void setBatchProperties(StreamGraph streamGraph, TableConfig tableConfig) {
      	streamGraph.getStreamNodes().forEach(
      			sn -> sn.setResources(ResourceSpec.UNKNOWN, ResourceSpec.UNKNOWN));
      	streamGraph.setChaining(true);
      	streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
      	streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
      	streamGraph.setStateBackend(null);
      	if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
      		throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
      	}
      	if (ExecutorUtils.isShuffleModeAllBatch(tableConfig)) {
      		streamGraph.setBlockingConnectionsBetweenChains(true);
      	}
      }
      

      By under certain use cases where execution time is short, especially under OLAP use cases, LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST might not be the best choice, as it will cause data to be spilled onto disks when shuffling. Under such use cases, EAGER schedule mode with PIPELINED shuffle mode is preferred.

      Currently we can set shuffle mode by the table.exec.shuffle-mode table config, and we would like to add another config to change the schedule mode for Blink planner in batch mode.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: