Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
when I read gobblin-core code, i have some doubts about LatestWatermark and retry Policy.
Retry Policy supports failed workunits of previous job retry by add these workunits to new job's WorkUnits in QueryBasedSource.getWorkUnits(). However, QueryBasedSource.getLatestWatermarkFromMetadata will get lowest water mark of previous workunits if commitPolicy = JobCommitPolicy.COMMIT_ON_FULL_SUCCESS and hasFailedRun=true, which means new WorkUnits contains failed workunits 。so the failed workunits of previous job will run twice in new job.
Is there anything wrong with my understanding? thanks
``` java
public List<WorkUnit> getWorkunits(SourceState state) {
...
long previousWatermark = this.getLatestWatermarkFromMetadata(state);
Map<Long, Long> sortedPartitions = Maps.newTreeMap();
sortedPartitions.putAll(new Partitioner(state).getPartitions(previousWatermark));
// Use extract table name to create extract
SourceState partitionState = new SourceState();
partitionState.addAll(state);
Extract extract = partitionState.createExtract(tableType, nameSpaceName, extractTableName);
// Setting current time for the full extract
if (Boolean.valueOf(state.getProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY)))
for (Entry<Long, Long> entry : sortedPartitions.entrySet())
{ partitionState.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, entry.getKey()); partitionState.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, entry.getValue()); workUnits.add(partitionState.createWorkUnit(extract)); }LOG.info(Total number of work units for the current run: + workUnits.size());
List<WorkUnit> previousWorkUnits = this.getPreviousWorkUnitsForRetry(state);
LOG.info(Total number of incomplete tasks from the previous run: + previousWorkUnits.size());
workUnits.addAll(previousWorkUnits);
...
}
```
``` java
private long getLatestWatermarkFromMetadata(SourceState state) {
LOG.debug(Get latest watermark from the previous run);
long latestWaterMark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList(state.getPreviousWorkUnitStates());
List<Long> previousWorkUnitStateHighWatermarks = Lists.newArrayList();
List<Long> previousWorkUnitLowWatermarks = Lists.newArrayList();
if (previousWorkUnitStates.isEmpty())
{ LOG.info(No previous work unit states found; Latest watermark - Default watermark: + latestWaterMark); return latestWaterMark; } boolean hasFailedRun = false;
boolean isCommitOnFullSuccess = false;
boolean isDataProcessedInPreviousRun = false;
JobCommitPolicy commitPolicy = JobCommitPolicy
.forName(state.getProp(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, ConfigurationKeys.DEFAULT_JOB_COMMIT_POLICY));
if (commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS)
for (WorkUnitState workUnitState : previousWorkUnitStates) {
long processedRecordCount = 0;
LOG.info(State of the previous task: + workUnitState.getId() + : + workUnitState.getWorkingState());
if (workUnitState.getWorkingState() == WorkingState.FAILED
workUnitState.getWorkingState() == WorkingState.CANCELLED |
---|
workUnitState.getWorkingState() == WorkingState.RUNNING |
workUnitState.getWorkingState() == WorkingState.PENDING)
{
hasFailedRun = true;
}
else Unknown macro: { processedRecordCount = workUnitState.getPropAsLong(ConfigurationKeys.EXTRACTOR_ROWS_EXPECTED); if (processedRecordCount != 0) {
isDataProcessedInPreviousRun = true;
} } |
LOG.info(Low watermark of the previous task: + workUnitState.getId() + : + workUnitState.getWorkunit()
.getLowWaterMark());
LOG.info(
High watermark of the previous task: + workUnitState.getId() + : + workUnitState.getHighWaterMark());
LOG.info(Record count of the previous task: + processedRecordCount + \n);
// Consider high water mark of the previous work unit, if it is
// extracted any data
if (processedRecordCount != 0)
previousWorkUnitLowWatermarks.add(this.getLowWatermarkFromWorkUnit(workUnitState));
}
// If commit policy is full and it has failed run, get latest water mark
// as
// minimum of low water marks from previous states.
if (isCommitOnFullSuccess && hasFailedRun) {
long previousLowWatermark = Collections.min(previousWorkUnitLowWatermarks);
WorkUnitState previousState = previousWorkUnitStates.get(0);
ExtractType extractType =
ExtractType.valueOf(previousState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE).toUpperCase());
// add backup seconds only for snapshot extracts but not for appends
if (extractType == ExtractType.SNAPSHOT)
else
{ latestWaterMark = previousLowWatermark; } LOG.info(Previous job was COMMIT_ON_FULL_SUCCESS but it was failed; Latest watermark -
+ Min watermark from WorkUnits: + latestWaterMark);
}
// If commit policy is full and there are no failed tasks or commit
// policy is partial,
// get latest water mark as maximum of high water marks from previous
// tasks.
else {
if (isDataProcessedInPreviousRun)
else
{ latestWaterMark = Collections.min(previousWorkUnitLowWatermarks); LOG.info(Previous run was successful but no data found. Latest watermark - Min watermark from WorkUnitStates: + latestWaterMark); }}
return latestWaterMark;
}
```
Github Url : https://github.com/linkedin/gobblin/issues/899
Github Reporter : lamborryan
Github Created At : 2016-03-30T07:34:57Z
Github Updated At : 2016-05-13T23:12:49Z