Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Not A Bug
-
0.9
-
None
-
None
Description
Ideally late rerun, runs the instance if and when the data becomes available in the late rerun zone. This is not happening currently.
Attachments
Activity
Process definition :
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <process name="ProcessLateRerunTest-agregator" xmlns="uri:falcon:process:0.1"> <clusters> <cluster name="ProcessLateRerunTest-corp"> <validity start="2016-02-02T09:13Z" end="2016-02-02T09:43Z"/> </cluster> </clusters> <parallel>2</parallel> <order>FIFO</order> <frequency>minutes(5)</frequency> <timezone>UTC</timezone> <inputs> <input name="inputData" feed="ProcessLateRerunTest-raaw-logs" start="now(0,-1)" end="now(0,0)"/> </inputs> <outputs> <output name="outputData" feed="ProcessLateRerunTest-agregated" instance="now(0,0)"/> </outputs> <properties> <property name="queueName" value="default"/> </properties> <workflow path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/> <retry policy="periodic" delay="minutes(10)" attempts="3"/> <late-process policy="periodic" delay="minutes(4)"> <late-input input="inputData" workflow-path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/> </late-process> <ACL owner="pragya" group="dataqa" permission="*"/> </process>
pragya.mittal: Can you also attach feed definition and workflow.xml used for the above case.
Attaching all required definitions :
Process :
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <process name="ProcessLateRerunTest-agregator-coord16-bb54f97c" xmlns="uri:falcon:process:0.1"> <clusters> <cluster name="ProcessLateRerunTest-corp-bf31e225"> <validity start="2016-02-03T06:48Z" end="2016-02-03T07:18Z"/> </cluster> </clusters> <parallel>2</parallel> <order>FIFO</order> <frequency>minutes(5)</frequency> <timezone>UTC</timezone> <inputs> <input name="inputData" feed="ProcessLateRerunTest-raaw-logs16-3d7c1e49" start="now(0,-1)" end="now(0,0)"/> </inputs> <outputs> <output name="outputData" feed="ProcessLateRerunTest-agregated-logs16-975a0d4c" instance="now(0,0)"/> </outputs> <properties> <property name="queueName" value="default"/> </properties> <workflow path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/> <retry policy="periodic" delay="minutes(10)" attempts="3"/> <late-process policy="periodic" delay="minutes(4)"> <late-input input="inputData" workflow-path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/> </late-process> <ACL owner="pragya" group="dataqa" permission="*"/> </process>
Feed1 :
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <feed name="ProcessLateRerunTest-agregated-logs16-975a0d4c" description="clicks log" xmlns="uri:falcon:feed:0.1"> <frequency>minutes(5)</frequency> <timezone>UTC</timezone> <late-arrival cut-off="hours(6)"/> <clusters> <cluster name="ProcessLateRerunTest-corp-bf31e225" type="source"> <validity start="2009-01-01T01:00Z" end="2099-12-31T23:59Z"/> <retention limit="months(6)" action="delete"/> </cluster> </clusters> <locations> <location type="data" path="/tmp/falcon-regression/ProcessLateRerunTest/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/> <location type="stats" path="/projects/falcon/clicksStats"/> <location type="meta" path="/projects/falcon/clicksMetaData"/> </locations> <ACL owner="pragya" group="dataqa" permission="*"/> <schema location="/schema/clicks" provider="protobuf"/> <properties> <property name="field5" value="value1"/> <property name="field6" value="value2"/> </properties> </feed>
Feed2 :
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <feed name="ProcessLateRerunTest-raaw-logs16-3d7c1e49" description="clicks log" xmlns="uri:falcon:feed:0.1"> <frequency>minutes(1)</frequency> <timezone>UTC</timezone> <late-arrival cut-off="hours(6)"/> <clusters> <cluster name="ProcessLateRerunTest-corp-bf31e225" type="source"> <validity start="2009-01-01T00:00Z" end="2099-12-31T23:59Z"/> <retention limit="months(6)" action="delete"/> </cluster> </clusters> <locations> <location type="data" path="/tmp/falcon-regression/ProcessLateRerunTest/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/> <location type="stats" path="/projects/falcon/clicksStats"/> <location type="meta" path="/projects/falcon/clicksMetaData"/> </locations> <ACL owner="pragya" group="dataqa" permission="*"/> <schema location="/schema/clicks" provider="protobuf"/> <properties> <property name="field3" value="value1"/> <property name="field4" value="value2"/> </properties> </feed>
Workflow :
<workflow-app xmlns="uri:oozie:workflow:0.2" name="aggregator-wf"> <start to="aggregator"/> <action name="aggregator"> <map-reduce> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${outputData}"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> <property> <name>mapred.mapper.class</name> <value>org.apache.hadoop.mapred.lib.IdentityMapper</value> </property> <property> <name>mapred.reducer.class</name> <value>org.apache.hadoop.mapred.lib.IdentityReducer</value> </property> <property> <name>mapred.map.tasks</name> <value>1</value> </property> <property> <name>mapred.input.dir</name> <value>${inputData}</value> </property> <property> <name>mapred.output.dir</name> <value>${outputData}</value> </property> </configuration> </map-reduce> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
Also further debugging has shown that the messages are continuously getting enqueued and dequeued from the delayed/activemq, resulting huge amount of messages transmitted.
All this happening because of the the way getDelay in RerunEvent is being calculated. It considers nominal time as reference each time to consider for the time taken in processing.
But once the instance goes far away(future) from the nominal time the delay calculation always goes to -ve(enque instantly).
public long getDelay(TimeUnit unit) { return unit.convert((msgInsertTime - System.currentTimeMillis()) + delayInMilliSec, TimeUnit.MILLISECONDS); }
GitHub user sandeepSamudrala opened a pull request:
https://github.com/apache/falcon/pull/25
FALCON-1807. Late Rerun is rerunning as soon as the data is available…
… rather than waiting for the delay.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sandeepSamudrala/falcon 0.9
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/falcon/pull/25.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #25
commit 801c33b2bb4864f45590dca735312e3d7abd4d5e
Author: sandeep <sandysmdl@gmail.com>
Date: 2016-02-04T10:08:48Z
FALCON-1807. Late Rerun is rerunning as soon as the data is available rather than waiting for the delay.
pragya.mittal, can you please attach the entity definitions you used?