Uploaded image for project: 'Apache Gobblin'
  1. Apache Gobblin
  2. GOBBLIN-114

getLatestWatermarkFromMetadata and retry

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None

    Description

      when I read gobblin-core code, i have some doubts about LatestWatermark and retry Policy.

      Retry Policy supports failed workunits of previous job retry by add these workunits to new job's WorkUnits in QueryBasedSource.getWorkUnits(). However, QueryBasedSource.getLatestWatermarkFromMetadata will get lowest water mark of previous workunits if commitPolicy = JobCommitPolicy.COMMIT_ON_FULL_SUCCESS and hasFailedRun=true, which means new WorkUnits contains failed workunits 。so the failed workunits of previous job will run twice in new job.

      Is there anything wrong with my understanding? thanks

      ``` java
      public List<WorkUnit> getWorkunits(SourceState state) {
      ...
      long previousWatermark = this.getLatestWatermarkFromMetadata(state);

      Map<Long, Long> sortedPartitions = Maps.newTreeMap();
      sortedPartitions.putAll(new Partitioner(state).getPartitions(previousWatermark));

      // Use extract table name to create extract
      SourceState partitionState = new SourceState();
      partitionState.addAll(state);
      Extract extract = partitionState.createExtract(tableType, nameSpaceName, extractTableName);

      // Setting current time for the full extract
      if (Boolean.valueOf(state.getProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY)))

      { extract.setFullTrue(System.currentTimeMillis()); }

      for (Entry<Long, Long> entry : sortedPartitions.entrySet())

      { partitionState.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, entry.getKey()); partitionState.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, entry.getValue()); workUnits.add(partitionState.createWorkUnit(extract)); }

      LOG.info(Total number of work units for the current run: + workUnits.size());

      List<WorkUnit> previousWorkUnits = this.getPreviousWorkUnitsForRetry(state);
      LOG.info(Total number of incomplete tasks from the previous run: + previousWorkUnits.size());
      workUnits.addAll(previousWorkUnits);
      ...

      ```

      ``` java
      private long getLatestWatermarkFromMetadata(SourceState state) {
      LOG.debug(Get latest watermark from the previous run);
      long latestWaterMark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;

      List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList(state.getPreviousWorkUnitStates());
      List<Long> previousWorkUnitStateHighWatermarks = Lists.newArrayList();
      List<Long> previousWorkUnitLowWatermarks = Lists.newArrayList();

      if (previousWorkUnitStates.isEmpty())

      { LOG.info(No previous work unit states found; Latest watermark - Default watermark: + latestWaterMark); return latestWaterMark; }

      boolean hasFailedRun = false;
      boolean isCommitOnFullSuccess = false;
      boolean isDataProcessedInPreviousRun = false;

      JobCommitPolicy commitPolicy = JobCommitPolicy
      .forName(state.getProp(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, ConfigurationKeys.DEFAULT_JOB_COMMIT_POLICY));
      if (commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS)

      { isCommitOnFullSuccess = true; }

      for (WorkUnitState workUnitState : previousWorkUnitStates) {
      long processedRecordCount = 0;
      LOG.info(State of the previous task: + workUnitState.getId() + : + workUnitState.getWorkingState());
      if (workUnitState.getWorkingState() == WorkingState.FAILED

      workUnitState.getWorkingState() == WorkingState.CANCELLED
      workUnitState.getWorkingState() == WorkingState.RUNNING
      workUnitState.getWorkingState() == WorkingState.PENDING) { hasFailedRun = true; }

      else

      Unknown macro: { processedRecordCount = workUnitState.getPropAsLong(ConfigurationKeys.EXTRACTOR_ROWS_EXPECTED); if (processedRecordCount != 0) { isDataProcessedInPreviousRun = true; } }

      LOG.info(Low watermark of the previous task: + workUnitState.getId() + : + workUnitState.getWorkunit()
      .getLowWaterMark());
      LOG.info(
      High watermark of the previous task: + workUnitState.getId() + : + workUnitState.getHighWaterMark());
      LOG.info(Record count of the previous task: + processedRecordCount + \n);

      // Consider high water mark of the previous work unit, if it is
      // extracted any data
      if (processedRecordCount != 0)

      { previousWorkUnitStateHighWatermarks.add(workUnitState.getHighWaterMark()); }

      previousWorkUnitLowWatermarks.add(this.getLowWatermarkFromWorkUnit(workUnitState));
      }

      // If commit policy is full and it has failed run, get latest water mark
      // as
      // minimum of low water marks from previous states.
      if (isCommitOnFullSuccess && hasFailedRun) {
      long previousLowWatermark = Collections.min(previousWorkUnitLowWatermarks);

      WorkUnitState previousState = previousWorkUnitStates.get(0);
      ExtractType extractType =
      ExtractType.valueOf(previousState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE).toUpperCase());

      // add backup seconds only for snapshot extracts but not for appends
      if (extractType == ExtractType.SNAPSHOT)

      { int backupSecs = previousState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_LOW_WATERMARK_BACKUP_SECS, 0); String watermarkType = previousState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_WATERMARK_TYPE); latestWaterMark = this.addBackedUpSeconds(previousLowWatermark, backupSecs, watermarkType); }

      else

      { latestWaterMark = previousLowWatermark; }

      LOG.info(Previous job was COMMIT_ON_FULL_SUCCESS but it was failed; Latest watermark -
      + Min watermark from WorkUnits: + latestWaterMark);
      }

      // If commit policy is full and there are no failed tasks or commit
      // policy is partial,
      // get latest water mark as maximum of high water marks from previous
      // tasks.
      else {
      if (isDataProcessedInPreviousRun)

      { latestWaterMark = Collections.max(previousWorkUnitStateHighWatermarks); LOG.info( Previous run was successful. Latest watermark - Max watermark from WorkUnitStates: + latestWaterMark); }

      else

      { latestWaterMark = Collections.min(previousWorkUnitLowWatermarks); LOG.info(Previous run was successful but no data found. Latest watermark - Min watermark from WorkUnitStates: + latestWaterMark); }

      }

      return latestWaterMark;
      }
      ```

      Github Url : https://github.com/linkedin/gobblin/issues/899
      Github Reporter : lamborryan
      Github Created At : 2016-03-30T07:34:57Z
      Github Updated At : 2016-05-13T23:12:49Z

      Attachments

        Activity

          People

            Unassigned Unassigned
            abti Abhishek Tiwari
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: