Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-1073

SequenceFileBolt can end up in an unrecoverable state

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.0.0
    • Component/s: storm-hdfs
    • Labels:
      None

      Description

      SequenceFileBolt has the same issues that HdfsBolt has in STORM-969. This is also an opportunity to refactor AbstractHdfsBolt to most efficiently include these changes:

      Abstract HdfsBolt should define a concrete execute method and define abstract methods for:

      • writing a tuple
      • syncronizing file output

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user dossett opened a pull request:

          https://github.com/apache/storm/pull/767

          STORM-1073: Refactor AbstractHdfsBolt

          STORM-969 changed the execution of HdfsBolt to be more robust to errors. This change moves that logic up to AbstractHdfsBolt and adds two new abstract methods to AbstractHdfsBolt: writeTuple and syncTuples. In other words AbstractHdfsBolt contains all of the error handling and retry logic for writing to HDFS, and its implementing classes will specify how to write specific content to HDFS.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/dossett/storm HdfsBolt

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/storm/pull/767.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #767


          commit 375d70a4134ba93be9ff8040334d63551542ca78
          Author: Aaron Dossett <aaron.dossett@target.com>
          Date: 2015-09-29T02:30:14Z

          STORM-1073: Refactor AbstractHdfsBolt


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user dossett opened a pull request: https://github.com/apache/storm/pull/767 STORM-1073 : Refactor AbstractHdfsBolt STORM-969 changed the execution of HdfsBolt to be more robust to errors. This change moves that logic up to AbstractHdfsBolt and adds two new abstract methods to AbstractHdfsBolt: writeTuple and syncTuples. In other words AbstractHdfsBolt contains all of the error handling and retry logic for writing to HDFS, and its implementing classes will specify how to write specific content to HDFS. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dossett/storm HdfsBolt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/767.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #767 commit 375d70a4134ba93be9ff8040334d63551542ca78 Author: Aaron Dossett <aaron.dossett@target.com> Date: 2015-09-29T02:30:14Z STORM-1073 : Refactor AbstractHdfsBolt
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-145260302

          @arunmahadevan please review this

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-145260302 @arunmahadevan please review this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/767#discussion_r41111208

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java —
          @@ -34,13 +37,11 @@
          import org.slf4j.LoggerFactory;

          import java.io.IOException;
          -import java.util.ArrayList;
          -import java.util.Map;
          -import java.util.Timer;
          -import java.util.TimerTask;
          +import java.util.*;
          — End diff –

          Better not to use wildcard imports.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/767#discussion_r41111208 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java — @@ -34,13 +37,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; — End diff – Better not to use wildcard imports.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/767#discussion_r41111274

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java —
          @@ -89,6 +89,11 @@ public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compres
          return this;
          }

          + public SequenceFileBolt withTickTupleIntervalSeconds(int interval)

          { + this.tickTupleInterval = interval; + return this; + }

          +
          — End diff –

          Add option to set retry count as well in SequnceFileBolt.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/767#discussion_r41111274 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java — @@ -89,6 +89,11 @@ public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compres return this; } + public SequenceFileBolt withTickTupleIntervalSeconds(int interval) { + this.tickTupleInterval = interval; + return this; + } + — End diff – Add option to set retry count as well in SequnceFileBolt.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-145433935

          +1 Overall it looks good once the minor comments are addressed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-145433935 +1 Overall it looks good once the minor comments are addressed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-145528207

          Thank you for catching those, @arunmahadevan. PR updated per your feedback.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-145528207 Thank you for catching those, @arunmahadevan. PR updated per your feedback.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-149346069

          @arunmahadevan @harshach Do either of you have additional feedback on this PR? Thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-149346069 @arunmahadevan @harshach Do either of you have additional feedback on this PR? Thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-149425136

          looks good to me

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-149425136 looks good to me
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/storm/pull/767#discussion_r42497270

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java —
          @@ -99,6 +109,13 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle
          }
          }

          + // If interval is non-zero then it has already been explicitly set and we should not default it
          + if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0)
          + {
          + Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
          — End diff –

          Can we please use `Utils.getInt(conf.get(Conf.TOPOLOGY_MESSAGE_TIMEOUT_SECS))` instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/767#discussion_r42497270 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java — @@ -99,6 +109,13 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle } } + // If interval is non-zero then it has already been explicitly set and we should not default it + if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0) + { + Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString()); — End diff – Can we please use `Utils.getInt(conf.get(Conf.TOPOLOGY_MESSAGE_TIMEOUT_SECS))` instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/storm/pull/767#discussion_r42497567

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java —
          @@ -127,9 +144,110 @@ public void run() {
          }

          @Override
          + public final void execute(Tuple tuple) {
          +
          + synchronized (this.writeLock) {
          + boolean forceSync = false;
          + if (TupleUtils.isTick(tuple))

          { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

          else {
          + try

          { + writeTuple(tuple); + tupleBatch.add(tuple); + }

          catch (IOException e)

          { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); + }

          + }
          +
          + if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
          + int attempts = 0;
          + boolean success = false;
          + IOException lastException = null;
          + // Make every attempt to sync the data we have. If it can't be done then kill the bolt with
          + // a runtime exception. The filesystem is presumably in a very bad state.
          + while (success == false && attempts < fileRetryCount) {
          + attempts += 1;
          + try {
          + syncTuples();
          + LOG.debug("Data synced to filesystem. Ack'ing [" + tupleBatch.size() + "] tuples");
          — End diff –

          Please use the slf4j logging replacement API especially for debug logs.

          ```LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());```

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/767#discussion_r42497567 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java — @@ -127,9 +144,110 @@ public void run() { } @Override + public final void execute(Tuple tuple) { + + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } else { + try { + writeTuple(tuple); + tupleBatch.add(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); + } + } + + if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) { + int attempts = 0; + boolean success = false; + IOException lastException = null; + // Make every attempt to sync the data we have. If it can't be done then kill the bolt with + // a runtime exception. The filesystem is presumably in a very bad state. + while (success == false && attempts < fileRetryCount) { + attempts += 1; + try { + syncTuples(); + LOG.debug("Data synced to filesystem. Ack'ing [" + tupleBatch.size() + "] tuples"); — End diff – Please use the slf4j logging replacement API especially for debug logs. ```LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/storm/pull/767#discussion_r42497617

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java —
          @@ -127,9 +144,110 @@ public void run() {
          }

          @Override
          + public final void execute(Tuple tuple) {
          +
          + synchronized (this.writeLock) {
          + boolean forceSync = false;
          + if (TupleUtils.isTick(tuple))

          { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

          else {
          + try

          { + writeTuple(tuple); + tupleBatch.add(tuple); + }

          catch (IOException e)

          { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); + }

          + }
          +
          + if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
          + int attempts = 0;
          + boolean success = false;
          + IOException lastException = null;
          + // Make every attempt to sync the data we have. If it can't be done then kill the bolt with
          + // a runtime exception. The filesystem is presumably in a very bad state.
          + while (success == false && attempts < fileRetryCount) {
          + attempts += 1;
          + try {
          + syncTuples();
          + LOG.debug("Data synced to filesystem. Ack'ing [" + tupleBatch.size() + "] tuples");
          + for (Tuple t : tupleBatch)
          — End diff –

          We prefer to have all bodies of for loops in curly braces '{}'

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/767#discussion_r42497617 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java — @@ -127,9 +144,110 @@ public void run() { } @Override + public final void execute(Tuple tuple) { + + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } else { + try { + writeTuple(tuple); + tupleBatch.add(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); + } + } + + if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) { + int attempts = 0; + boolean success = false; + IOException lastException = null; + // Make every attempt to sync the data we have. If it can't be done then kill the bolt with + // a runtime exception. The filesystem is presumably in a very bad state. + while (success == false && attempts < fileRetryCount) { + attempts += 1; + try { + syncTuples(); + LOG.debug("Data synced to filesystem. Ack'ing [" + tupleBatch.size() + "] tuples"); + for (Tuple t : tupleBatch) — End diff – We prefer to have all bodies of for loops in curly braces '{}'
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-149576539

          @dossett the code looks fine. I have two minor nits, but they both existed in the previous code prior to the re-factor so I am +1 either way.

          The build failure looks like it was a network hiccup with the maven repository.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-149576539 @dossett the code looks fine. I have two minor nits, but they both existed in the previous code prior to the re-factor so I am +1 either way. The build failure looks like it was a network hiccup with the maven repository.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-149578473

          @revans2 Thank you for feedback. I was the author of the previous code, so I have address those issues in this PR. Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-149578473 @revans2 Thank you for feedback. I was the author of the previous code, so I have address those issues in this PR. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-149588958

          The code looks fine I am +1 on this, but I would like to give @harshach and @arunmahadevan some time if they want to comment. Any feedback?

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-149588958 The code looks fine I am +1 on this, but I would like to give @harshach and @arunmahadevan some time if they want to comment. Any feedback?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/767#discussion_r42546136

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java —
          @@ -127,9 +145,112 @@ public void run() {
          }

          @Override
          + public final void execute(Tuple tuple) {
          +
          + synchronized (this.writeLock) {
          + boolean forceSync = false;
          + if (TupleUtils.isTick(tuple))

          { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

          else {
          + try

          { + writeTuple(tuple); + tupleBatch.add(tuple); + }

          catch (IOException e)

          { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); + }

          + }
          +
          + if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
          + int attempts = 0;
          + boolean success = false;
          + IOException lastException = null;
          + // Make every attempt to sync the data we have. If it can't be done then kill the bolt with
          + // a runtime exception. The filesystem is presumably in a very bad state.
          + while (success == false && attempts < fileRetryCount) {
          + attempts += 1;
          + try {
          + syncTuples();
          + LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
          + for (Tuple t : tupleBatch)

          { + this.collector.ack(t); + }

          + tupleBatch.clear();
          + syncPolicy.reset();
          + success = true;
          + } catch (IOException e) {
          + LOG.warn("Data could not be synced to filesystem on attempt [" + attempts + "]");
          — End diff –

          @dossett as @revans2 brought up, I think its better to use the curly brace {} anchor for logging throughout to be consistent.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/767#discussion_r42546136 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java — @@ -127,9 +145,112 @@ public void run() { } @Override + public final void execute(Tuple tuple) { + + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } else { + try { + writeTuple(tuple); + tupleBatch.add(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); + } + } + + if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) { + int attempts = 0; + boolean success = false; + IOException lastException = null; + // Make every attempt to sync the data we have. If it can't be done then kill the bolt with + // a runtime exception. The filesystem is presumably in a very bad state. + while (success == false && attempts < fileRetryCount) { + attempts += 1; + try { + syncTuples(); + LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size()); + for (Tuple t : tupleBatch) { + this.collector.ack(t); + } + tupleBatch.clear(); + syncPolicy.reset(); + success = true; + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem on attempt [" + attempts + "] "); — End diff – @dossett as @revans2 brought up, I think its better to use the curly brace {} anchor for logging throughout to be consistent.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/767#discussion_r42546202

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java —
          @@ -127,9 +145,112 @@ public void run() {
          }

          @Override
          + public final void execute(Tuple tuple) {
          +
          + synchronized (this.writeLock) {
          + boolean forceSync = false;
          + if (TupleUtils.isTick(tuple))

          { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

          else {
          + try

          { + writeTuple(tuple); + tupleBatch.add(tuple); + }

          catch (IOException e)

          { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); + }

          + }
          +
          + if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
          + int attempts = 0;
          + boolean success = false;
          + IOException lastException = null;
          + // Make every attempt to sync the data we have. If it can't be done then kill the bolt with
          + // a runtime exception. The filesystem is presumably in a very bad state.
          + while (success == false && attempts < fileRetryCount) {
          + attempts += 1;
          + try {
          + syncTuples();
          + LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
          + for (Tuple t : tupleBatch)

          { + this.collector.ack(t); + }

          + tupleBatch.clear();
          + syncPolicy.reset();
          + success = true;
          + } catch (IOException e)

          { + LOG.warn("Data could not be synced to filesystem on attempt [" + attempts + "]"); + this.collector.reportError(e); + lastException = e; + }

          + }
          +
          + // If unsuccesful fail the pending tuples
          + if (success == false) {
          + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples");
          + for (Tuple t : tupleBatch)

          { + this.collector.fail(t); + }

          + tupleBatch.clear();
          +
          + throw new RuntimeException("Sync failed [" + attempts + "] times.", lastException);
          + }
          + }
          +
          + if(this.rotationPolicy.mark(tuple, this.offset)) {
          + try

          { + rotateOutputFile(); + this.rotationPolicy.reset(); + this.offset = 0; + }

          catch (IOException e)

          { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. + }

          + }
          + }
          + }
          +
          + @Override
          + public Map<String, Object> getComponentConfiguration() {
          + Map<String, Object> conf = super.getComponentConfiguration();
          + if (conf == null)
          + conf = new Config();
          +
          + if (tickTupleInterval > 0) {
          + LOG.info("Enabling tick tuple with interval [" + tickTupleInterval + "]");
          — End diff –

          Curly brace anchor for logging as above.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/767#discussion_r42546202 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java — @@ -127,9 +145,112 @@ public void run() { } @Override + public final void execute(Tuple tuple) { + + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } else { + try { + writeTuple(tuple); + tupleBatch.add(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); + } + } + + if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) { + int attempts = 0; + boolean success = false; + IOException lastException = null; + // Make every attempt to sync the data we have. If it can't be done then kill the bolt with + // a runtime exception. The filesystem is presumably in a very bad state. + while (success == false && attempts < fileRetryCount) { + attempts += 1; + try { + syncTuples(); + LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size()); + for (Tuple t : tupleBatch) { + this.collector.ack(t); + } + tupleBatch.clear(); + syncPolicy.reset(); + success = true; + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem on attempt [" + attempts + "]"); + this.collector.reportError(e); + lastException = e; + } + } + + // If unsuccesful fail the pending tuples + if (success == false) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + for (Tuple t : tupleBatch) { + this.collector.fail(t); + } + tupleBatch.clear(); + + throw new RuntimeException("Sync failed [" + attempts + "] times.", lastException); + } + } + + if(this.rotationPolicy.mark(tuple, this.offset)) { + try { + rotateOutputFile(); + this.rotationPolicy.reset(); + this.offset = 0; + } catch (IOException e) { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. + } + } + } + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = super.getComponentConfiguration(); + if (conf == null) + conf = new Config(); + + if (tickTupleInterval > 0) { + LOG.info("Enabling tick tuple with interval [" + tickTupleInterval + "] "); — End diff – Curly brace anchor for logging as above.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-149691038

          Thanks for catching those @arunmahadevan. Fixed those and squashed everything into one commit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-149691038 Thanks for catching those @arunmahadevan. Fixed those and squashed everything into one commit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/767#issuecomment-150937269

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/767#issuecomment-150937269 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/storm/pull/767

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/767

            People

            • Assignee:
              dossett@gmail.com Aaron Dossett
              Reporter:
              dossett@gmail.com Aaron Dossett
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development