Uploaded image for project: 'Falcon'
  1. Falcon
  2. FALCON-1807

Late Rerun is rerunning as soon as the data is available rather than waiting for the delay.

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Not A Bug
    • 0.9
    • None
    • rerun
    • None

    Description

      Ideally late rerun, runs the instance if and when the data becomes available in the late rerun zone. This is not happening currently.

      Attachments

        Activity

          pallavi.rao Pallavi Rao added a comment -

          pragya.mittal, can you please attach the entity definitions you used?

          pallavi.rao Pallavi Rao added a comment - pragya.mittal , can you please attach the entity definitions you used?
          pragya.mittal Pragya Mittal added a comment -

          Process definition :

          <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
          <process name="ProcessLateRerunTest-agregator" xmlns="uri:falcon:process:0.1">
              <clusters>
                  <cluster name="ProcessLateRerunTest-corp">
                      <validity start="2016-02-02T09:13Z" end="2016-02-02T09:43Z"/>
                  </cluster>
              </clusters>
              <parallel>2</parallel>
              <order>FIFO</order>
              <frequency>minutes(5)</frequency>
              <timezone>UTC</timezone>
              <inputs>
                  <input name="inputData" feed="ProcessLateRerunTest-raaw-logs" start="now(0,-1)" end="now(0,0)"/>
              </inputs>
              <outputs>
                  <output name="outputData" feed="ProcessLateRerunTest-agregated" instance="now(0,0)"/>
              </outputs>
              <properties>
                  <property name="queueName" value="default"/>
              </properties>
              <workflow path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/>
              <retry policy="periodic" delay="minutes(10)" attempts="3"/>
              <late-process policy="periodic" delay="minutes(4)">
                  <late-input input="inputData" workflow-path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/>
              </late-process>
              <ACL owner="pragya" group="dataqa" permission="*"/>
          </process>
          
          pragya.mittal Pragya Mittal added a comment - Process definition : <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <process name="ProcessLateRerunTest-agregator" xmlns="uri:falcon:process:0.1"> <clusters> <cluster name="ProcessLateRerunTest-corp"> <validity start="2016-02-02T09:13Z" end="2016-02-02T09:43Z"/> </cluster> </clusters> <parallel>2</parallel> <order>FIFO</order> <frequency>minutes(5)</frequency> <timezone>UTC</timezone> <inputs> <input name="inputData" feed="ProcessLateRerunTest-raaw-logs" start="now(0,-1)" end="now(0,0)"/> </inputs> <outputs> <output name="outputData" feed="ProcessLateRerunTest-agregated" instance="now(0,0)"/> </outputs> <properties> <property name="queueName" value="default"/> </properties> <workflow path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/> <retry policy="periodic" delay="minutes(10)" attempts="3"/> <late-process policy="periodic" delay="minutes(4)"> <late-input input="inputData" workflow-path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/> </late-process> <ACL owner="pragya" group="dataqa" permission="*"/> </process>

          pragya.mittal: Can you also attach feed definition and workflow.xml used for the above case.

          sandeep.samudrala sandeep samudrala added a comment - pragya.mittal : Can you also attach feed definition and workflow.xml used for the above case.
          pragya.mittal Pragya Mittal added a comment -

          Attaching all required definitions :
          Process :

          <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
          <process name="ProcessLateRerunTest-agregator-coord16-bb54f97c" xmlns="uri:falcon:process:0.1">
              <clusters>
                  <cluster name="ProcessLateRerunTest-corp-bf31e225">
                      <validity start="2016-02-03T06:48Z" end="2016-02-03T07:18Z"/>
                  </cluster>
              </clusters>
              <parallel>2</parallel>
              <order>FIFO</order>
              <frequency>minutes(5)</frequency>
              <timezone>UTC</timezone>
              <inputs>
                  <input name="inputData" feed="ProcessLateRerunTest-raaw-logs16-3d7c1e49" start="now(0,-1)" end="now(0,0)"/>
              </inputs>
              <outputs>
                  <output name="outputData" feed="ProcessLateRerunTest-agregated-logs16-975a0d4c" instance="now(0,0)"/>
              </outputs>
              <properties>
                  <property name="queueName" value="default"/>
              </properties>
              <workflow path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/>
              <retry policy="periodic" delay="minutes(10)" attempts="3"/>
              <late-process policy="periodic" delay="minutes(4)">
                  <late-input input="inputData" workflow-path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/>
              </late-process>
              <ACL owner="pragya" group="dataqa" permission="*"/>
          </process>
          

          Feed1 :

          <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
          <feed name="ProcessLateRerunTest-agregated-logs16-975a0d4c" description="clicks log" xmlns="uri:falcon:feed:0.1">
              <frequency>minutes(5)</frequency>
              <timezone>UTC</timezone>
              <late-arrival cut-off="hours(6)"/>
              <clusters>
                  <cluster name="ProcessLateRerunTest-corp-bf31e225" type="source">
                      <validity start="2009-01-01T01:00Z" end="2099-12-31T23:59Z"/>
                      <retention limit="months(6)" action="delete"/>
                  </cluster>
              </clusters>
              <locations>
                  <location type="data" path="/tmp/falcon-regression/ProcessLateRerunTest/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/>
                  <location type="stats" path="/projects/falcon/clicksStats"/>
                  <location type="meta" path="/projects/falcon/clicksMetaData"/>
              </locations>
              <ACL owner="pragya" group="dataqa" permission="*"/>
              <schema location="/schema/clicks" provider="protobuf"/>
              <properties>
                  <property name="field5" value="value1"/>
                  <property name="field6" value="value2"/>
              </properties>
          </feed>
          

          Feed2 :

          <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
          <feed name="ProcessLateRerunTest-raaw-logs16-3d7c1e49" description="clicks log" xmlns="uri:falcon:feed:0.1">
              <frequency>minutes(1)</frequency>
              <timezone>UTC</timezone>
              <late-arrival cut-off="hours(6)"/>
              <clusters>
                  <cluster name="ProcessLateRerunTest-corp-bf31e225" type="source">
                      <validity start="2009-01-01T00:00Z" end="2099-12-31T23:59Z"/>
                      <retention limit="months(6)" action="delete"/>
                  </cluster>
              </clusters>
              <locations>
                  <location type="data" path="/tmp/falcon-regression/ProcessLateRerunTest/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/>
                  <location type="stats" path="/projects/falcon/clicksStats"/>
                  <location type="meta" path="/projects/falcon/clicksMetaData"/>
              </locations>
              <ACL owner="pragya" group="dataqa" permission="*"/>
              <schema location="/schema/clicks" provider="protobuf"/>
              <properties>
                  <property name="field3" value="value1"/>
                  <property name="field4" value="value2"/>
              </properties>
          </feed>
          

          Workflow :

          <workflow-app xmlns="uri:oozie:workflow:0.2" name="aggregator-wf">
              <start to="aggregator"/>
              <action name="aggregator">
                  <map-reduce>
                      <job-tracker>${jobTracker}</job-tracker>
                      <name-node>${nameNode}</name-node>
                      <prepare>
                          <delete path="${outputData}"/>
                      </prepare>
                      <configuration>
                          <property>
                              <name>mapred.job.queue.name</name>
                              <value>${queueName}</value>
                          </property>
                          <property>
                              <name>mapred.mapper.class</name>
                              <value>org.apache.hadoop.mapred.lib.IdentityMapper</value>
                          </property>
                          <property>
                              <name>mapred.reducer.class</name>
                              <value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
                          </property>
                          <property>
                              <name>mapred.map.tasks</name>
                              <value>1</value>
                          </property>
                          <property>
                              <name>mapred.input.dir</name>
                              <value>${inputData}</value>
                          </property>
                          <property>
                              <name>mapred.output.dir</name>
                              <value>${outputData}</value>
                          </property>
                      </configuration>
                  </map-reduce>
                  <ok to="end"/>
                  <error to="fail"/>
              </action>
              <kill name="fail">
                  <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
              </kill>
              <end name="end"/>
          </workflow-app>
          
          pragya.mittal Pragya Mittal added a comment - Attaching all required definitions : Process : <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <process name="ProcessLateRerunTest-agregator-coord16-bb54f97c" xmlns="uri:falcon:process:0.1"> <clusters> <cluster name="ProcessLateRerunTest-corp-bf31e225"> <validity start="2016-02-03T06:48Z" end="2016-02-03T07:18Z"/> </cluster> </clusters> <parallel>2</parallel> <order>FIFO</order> <frequency>minutes(5)</frequency> <timezone>UTC</timezone> <inputs> <input name="inputData" feed="ProcessLateRerunTest-raaw-logs16-3d7c1e49" start="now(0,-1)" end="now(0,0)"/> </inputs> <outputs> <output name="outputData" feed="ProcessLateRerunTest-agregated-logs16-975a0d4c" instance="now(0,0)"/> </outputs> <properties> <property name="queueName" value="default"/> </properties> <workflow path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/> <retry policy="periodic" delay="minutes(10)" attempts="3"/> <late-process policy="periodic" delay="minutes(4)"> <late-input input="inputData" workflow-path="/tmp/falcon-regression/ProcessLateRerunTest/aggregator"/> </late-process> <ACL owner="pragya" group="dataqa" permission="*"/> </process> Feed1 : <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <feed name="ProcessLateRerunTest-agregated-logs16-975a0d4c" description="clicks log" xmlns="uri:falcon:feed:0.1"> <frequency>minutes(5)</frequency> <timezone>UTC</timezone> <late-arrival cut-off="hours(6)"/> <clusters> <cluster name="ProcessLateRerunTest-corp-bf31e225" type="source"> <validity start="2009-01-01T01:00Z" end="2099-12-31T23:59Z"/> <retention limit="months(6)" action="delete"/> </cluster> </clusters> <locations> <location type="data" path="/tmp/falcon-regression/ProcessLateRerunTest/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/> <location type="stats" path="/projects/falcon/clicksStats"/> <location type="meta" path="/projects/falcon/clicksMetaData"/> </locations> <ACL owner="pragya" group="dataqa" permission="*"/> <schema location="/schema/clicks" provider="protobuf"/> <properties> <property name="field5" value="value1"/> <property name="field6" value="value2"/> </properties> </feed> Feed2 : <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <feed name="ProcessLateRerunTest-raaw-logs16-3d7c1e49" description="clicks log" xmlns="uri:falcon:feed:0.1"> <frequency>minutes(1)</frequency> <timezone>UTC</timezone> <late-arrival cut-off="hours(6)"/> <clusters> <cluster name="ProcessLateRerunTest-corp-bf31e225" type="source"> <validity start="2009-01-01T00:00Z" end="2099-12-31T23:59Z"/> <retention limit="months(6)" action="delete"/> </cluster> </clusters> <locations> <location type="data" path="/tmp/falcon-regression/ProcessLateRerunTest/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/> <location type="stats" path="/projects/falcon/clicksStats"/> <location type="meta" path="/projects/falcon/clicksMetaData"/> </locations> <ACL owner="pragya" group="dataqa" permission="*"/> <schema location="/schema/clicks" provider="protobuf"/> <properties> <property name="field3" value="value1"/> <property name="field4" value="value2"/> </properties> </feed> Workflow : <workflow-app xmlns="uri:oozie:workflow:0.2" name="aggregator-wf"> <start to="aggregator"/> <action name="aggregator"> <map-reduce> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${outputData}"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> <property> <name>mapred.mapper.class</name> <value>org.apache.hadoop.mapred.lib.IdentityMapper</value> </property> <property> <name>mapred.reducer.class</name> <value>org.apache.hadoop.mapred.lib.IdentityReducer</value> </property> <property> <name>mapred.map.tasks</name> <value>1</value> </property> <property> <name>mapred.input.dir</name> <value>${inputData}</value> </property> <property> <name>mapred.output.dir</name> <value>${outputData}</value> </property> </configuration> </map-reduce> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>

          Also further debugging has shown that the messages are continuously getting enqueued and dequeued from the delayed/activemq, resulting huge amount of messages transmitted.
          All this happening because of the the way getDelay in RerunEvent is being calculated. It considers nominal time as reference each time to consider for the time taken in processing.
          But once the instance goes far away(future) from the nominal time the delay calculation always goes to -ve(enque instantly).

          sandeep.samudrala sandeep samudrala added a comment - Also further debugging has shown that the messages are continuously getting enqueued and dequeued from the delayed/activemq, resulting huge amount of messages transmitted. All this happening because of the the way getDelay in RerunEvent is being calculated. It considers nominal time as reference each time to consider for the time taken in processing. But once the instance goes far away(future) from the nominal time the delay calculation always goes to -ve(enque instantly).
          public long getDelay(TimeUnit unit) {
                  return unit.convert((msgInsertTime - System.currentTimeMillis())
                          + delayInMilliSec, TimeUnit.MILLISECONDS);
              }
          
          sandeep.samudrala sandeep samudrala added a comment - public long getDelay(TimeUnit unit) { return unit.convert((msgInsertTime - System .currentTimeMillis()) + delayInMilliSec, TimeUnit.MILLISECONDS); }
          githubbot ASF GitHub Bot added a comment -

          GitHub user sandeepSamudrala opened a pull request:

          https://github.com/apache/falcon/pull/25

          FALCON-1807. Late Rerun is rerunning as soon as the data is available…

          … rather than waiting for the delay.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sandeepSamudrala/falcon 0.9

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/falcon/pull/25.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #25


          commit 801c33b2bb4864f45590dca735312e3d7abd4d5e
          Author: sandeep <sandysmdl@gmail.com>
          Date: 2016-02-04T10:08:48Z

          FALCON-1807. Late Rerun is rerunning as soon as the data is available rather than waiting for the delay.


          githubbot ASF GitHub Bot added a comment - GitHub user sandeepSamudrala opened a pull request: https://github.com/apache/falcon/pull/25 FALCON-1807 . Late Rerun is rerunning as soon as the data is available… … rather than waiting for the delay. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sandeepSamudrala/falcon 0.9 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/falcon/pull/25.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #25 commit 801c33b2bb4864f45590dca735312e3d7abd4d5e Author: sandeep <sandysmdl@gmail.com> Date: 2016-02-04T10:08:48Z FALCON-1807 . Late Rerun is rerunning as soon as the data is available rather than waiting for the delay.
          pragya.mittal Pragya Mittal added a comment -

          This happens when activemq is deployed explicitly by user.

          pragya.mittal Pragya Mittal added a comment - This happens when activemq is deployed explicitly by user.
          githubbot ASF GitHub Bot added a comment -

          Github user sandeepSamudrala closed the pull request at:

          https://github.com/apache/falcon/pull/25

          githubbot ASF GitHub Bot added a comment - Github user sandeepSamudrala closed the pull request at: https://github.com/apache/falcon/pull/25

          People

            sandeep.samudrala sandeep samudrala
            pragya.mittal Pragya Mittal
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: