Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-969

HDFS Bolt can end up in an unrecoverable state

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.0.0
    • Component/s: storm-hdfs
    • Labels:
      None

      Description

      The body of the HDFSBolt.execute() method is essentially one try-catch block. The catch block reports the error and fails the current tuple. In some cases the bolt's FSDataOutputStream object (named 'out') is in an unrecoverable state and no subsequent calls to execute() can succeed.

      To produce this scenario:

      • process some tuples through HDFS bolt
      • put the underlying HDFS system into safemode
      • process some more tuples and receive a correct ClosedChannelException
      • take the underlying HDFS system out of safemode
      • subsequent tuples continue to fail with the same exception

      The three fundamental operations that execute takes (writing, sync'ing, rotating) need to be isolated so that errors from each are specifically handled.

        Issue Links

          Activity

          Hide
          dossett@gmail.com Aaron Dossett added a comment -

          I should have a PR for this shortly.

          Show
          dossett@gmail.com Aaron Dossett added a comment - I should have a PR for this shortly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user dossett opened a pull request:

          https://github.com/apache/storm/pull/664

          STORM-969: HDFS Bolt can end up in an unrecoverable state

          A few notes about this PR:

          • I updated the storm-hdfs pom.xml to align with other external modules. Most significant change was probably going from hdfs version 2.2 to $ {hadoop.version}

            (i.e. currently 2.6)

          • Many errors are recovered by forcing a file rotation which opens a new, valid File. So the rotation now occurs either according to rotation policy or when a serious error happens. Work could probably be done to reopen the same file name to reduce the number of rotations.
          • Added unittests with MiniDFSCluster

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/dossett/storm STORM-969

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/storm/pull/664.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #664


          commit 795aaf93af78bf664727b91c179e0d96f673f674
          Author: Aaron Dossett <aaron.dossett@target.com>
          Date: 2015-08-02T22:22:51Z

          STORM-969: HDFS Bolt can end up in an unrecoverable state


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user dossett opened a pull request: https://github.com/apache/storm/pull/664 STORM-969 : HDFS Bolt can end up in an unrecoverable state A few notes about this PR: I updated the storm-hdfs pom.xml to align with other external modules. Most significant change was probably going from hdfs version 2.2 to $ {hadoop.version} (i.e. currently 2.6) Many errors are recovered by forcing a file rotation which opens a new, valid File. So the rotation now occurs either according to rotation policy or when a serious error happens. Work could probably be done to reopen the same file name to reduce the number of rotations. Added unittests with MiniDFSCluster You can merge this pull request into a Git repository by running: $ git pull https://github.com/dossett/storm STORM-969 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/664.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #664 commit 795aaf93af78bf664727b91c179e0d96f673f674 Author: Aaron Dossett <aaron.dossett@target.com> Date: 2015-08-02T22:22:51Z STORM-969 : HDFS Bolt can end up in an unrecoverable state
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-131588005

          @arunmahadevan since you've made changes to HdfsBolt as part of STORM-837 can you take look at this PR check if its already handled by your changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-131588005 @arunmahadevan since you've made changes to HdfsBolt as part of STORM-837 can you take look at this PR check if its already handled by your changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-131602837

          @harshach the changes I made were in the trident implementation (HDFSState) which is independent of this. Anyways I reviewed the changes.

          Overall it appears that the exceptions are now handled individually at write, sync and rotate phases. But I see a few issues with the change.

          • the tuples are acked only on sync - if the tuple rates are low, they will never be acked and keep timing out and same tuples will be replayed again and again.
          • there is an attempt to sync even when the write fails with an IOException. Since write already threw an IOException, chances are high that the sync would also fail with IOException.

          I think it might be simpler to keep the existing logic and just rotate the file whenever we see an IOException (or maybe after a few times we repeatedly hit the IOException) and completely fail by propagating the exception up if the situation does not improve after a few rotations.

          Also I see that the existing implementation acks the tuples before actually syncing them to disk, which might result in data loss. I think we should change this to ack only after an hsync and have a sync policy that considers both count and time based thresholds.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-131602837 @harshach the changes I made were in the trident implementation (HDFSState) which is independent of this. Anyways I reviewed the changes. Overall it appears that the exceptions are now handled individually at write, sync and rotate phases. But I see a few issues with the change. the tuples are acked only on sync - if the tuple rates are low, they will never be acked and keep timing out and same tuples will be replayed again and again. there is an attempt to sync even when the write fails with an IOException. Since write already threw an IOException, chances are high that the sync would also fail with IOException. I think it might be simpler to keep the existing logic and just rotate the file whenever we see an IOException (or maybe after a few times we repeatedly hit the IOException) and completely fail by propagating the exception up if the situation does not improve after a few rotations. Also I see that the existing implementation acks the tuples before actually syncing them to disk, which might result in data loss. I think we should change this to ack only after an hsync and have a sync policy that considers both count and time based thresholds.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-131648086

          @arunmahadevan Thank you for the feedback! I have added a tick tuple feature to address your first point (I am already using this locally, I forgot to include it in this PR).

          You are right that I attempt to sync even when the write fails. I agree that this is unlikely to succeed, but it seems worthwhile to try to "save" as many tuples as possible.

          With this tick tuple change I think the code is very close to your recommendation. If the bolt is in a bad state the tick tuples will periodically try to rotate the file and all errors will be reported up through ```this.collector.reportError(e)```

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-131648086 @arunmahadevan Thank you for the feedback! I have added a tick tuple feature to address your first point (I am already using this locally, I forgot to include it in this PR). You are right that I attempt to sync even when the write fails. I agree that this is unlikely to succeed, but it seems worthwhile to try to "save" as many tuples as possible. With this tick tuple change I think the code is very close to your recommendation. If the bolt is in a bad state the tick tuples will periodically try to rotate the file and all errors will be reported up through ```this.collector.reportError(e)```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r37160568

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -80,6 +86,11 @@ public HdfsBolt addRotationAction(RotationAction action)

          { return this; }

          + public HdfsBolt withTickTupleIntervalSeconds(int interval) {
          — End diff –

          Could give a more meaningful name to convey the actual usage (e.g `withFlushIntervalSeconds`). I think we need to have a default value for this to ensure a sync is `always` done even if user doesn't specify this option and also ensure the option value is within some lower and upper thresholds so that tuples are acked within TOPOLOGY_MESSAGE_TIMEOUT_SECS and a sync doesn't happen too frequently.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37160568 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -80,6 +86,11 @@ public HdfsBolt addRotationAction(RotationAction action) { return this; } + public HdfsBolt withTickTupleIntervalSeconds(int interval) { — End diff – Could give a more meaningful name to convey the actual usage (e.g `withFlushIntervalSeconds`). I think we need to have a default value for this to ensure a sync is `always` done even if user doesn't specify this option and also ensure the option value is within some lower and upper thresholds so that tuples are acked within TOPOLOGY_MESSAGE_TIMEOUT_SECS and a sync doesn't happen too frequently.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r37160668

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector

          @Override
          public void execute(Tuple tuple) {

          • try {
          • byte[] bytes = this.format.format(tuple);
          • synchronized (this.writeLock) {
          • out.write(bytes);
          • this.offset += bytes.length;
            -
          • if (this.syncPolicy.mark(tuple, this.offset)) {
          • if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - }

            else

            { - this.out.hsync(); - }
          • this.syncPolicy.reset();
            + boolean forceRotate = false;
            + synchronized (this.writeLock) {
            + boolean forceSync = false;
            + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

            + else

            Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } }
          • this.collector.ack(tuple);
            + if (this.syncPolicy.mark(tuple, this.offset) || forceSync)
            Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + }

            + }

          • if(this.rotationPolicy.mark(tuple, this.offset)){
          • rotateOutputFile(); // synchronized
          • this.offset = 0;
          • this.rotationPolicy.reset();
            + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) {
            + try { + rotateAndReset(); + }

            catch (IOException e)

            { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. }
          • } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); }

            }

          + private void rotateAndReset() throws IOException

          { + rotateOutputFile(); // synchronized + this.offset = 0; + this.rotationPolicy.reset(); + }

          +
          + private void syncAndAckTuples() throws IOException {
          — End diff –

          An optimization could be to do to the sync only if some tuples were actually written out since last sync.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37160668 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector @Override public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); synchronized (this.writeLock) { out.write(bytes); this.offset += bytes.length; - if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } this.syncPolicy.reset(); + boolean forceRotate = false; + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } + else Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } } this.collector.ack(tuple); + if (this.syncPolicy.mark(tuple, this.offset) || forceSync) Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + } + } if(this.rotationPolicy.mark(tuple, this.offset)){ rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) { + try { + rotateAndReset(); + } catch (IOException e) { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. } } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); } } + private void rotateAndReset() throws IOException { + rotateOutputFile(); // synchronized + this.offset = 0; + this.rotationPolicy.reset(); + } + + private void syncAndAckTuples() throws IOException { — End diff – An optimization could be to do to the sync only if some tuples were actually written out since last sync.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r37161325

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector

          @Override
          public void execute(Tuple tuple) {

          • try {
          • byte[] bytes = this.format.format(tuple);
          • synchronized (this.writeLock) {
          • out.write(bytes);
          • this.offset += bytes.length;
            -
          • if (this.syncPolicy.mark(tuple, this.offset)) {
          • if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - }

            else

            { - this.out.hsync(); - }
          • this.syncPolicy.reset();
            + boolean forceRotate = false;
            + synchronized (this.writeLock) {
            + boolean forceSync = false;
            + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

            + else

            Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } }
          • this.collector.ack(tuple);
            + if (this.syncPolicy.mark(tuple, this.offset) || forceSync)
            Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + }

            + }

          • if(this.rotationPolicy.mark(tuple, this.offset)){
          • rotateOutputFile(); // synchronized
          • this.offset = 0;
          • this.rotationPolicy.reset();
            + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) {
            + try { + rotateAndReset(); + }

            catch (IOException e)

            { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. }
          • } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); }

            }

          + private void rotateAndReset() throws IOException {
          — End diff –

          What would be the behavior if the file system goes into an irrecoverable state where writes are continuously failing? Wont it end up creating a lot of empty files? You might want to consider cleaning this up and also killing the task by throwing a runtime exception after 'n' failures and hope that the new worker might be able to start writing successfully.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37161325 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector @Override public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); synchronized (this.writeLock) { out.write(bytes); this.offset += bytes.length; - if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } this.syncPolicy.reset(); + boolean forceRotate = false; + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } + else Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } } this.collector.ack(tuple); + if (this.syncPolicy.mark(tuple, this.offset) || forceSync) Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + } + } if(this.rotationPolicy.mark(tuple, this.offset)){ rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) { + try { + rotateAndReset(); + } catch (IOException e) { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. } } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); } } + private void rotateAndReset() throws IOException { — End diff – What would be the behavior if the file system goes into an irrecoverable state where writes are continuously failing? Wont it end up creating a lot of empty files? You might want to consider cleaning this up and also killing the task by throwing a runtime exception after 'n' failures and hope that the new worker might be able to start writing successfully.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r37241329

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -80,6 +86,11 @@ public HdfsBolt addRotationAction(RotationAction action)

          { return this; }

          + public HdfsBolt withTickTupleIntervalSeconds(int interval) {
          — End diff –

          I like that change since it separates the function from the implementation. A similar change could be made to HiveBolt, to which I also added a tick tuple functionality recently. If this PR is otherwise ok, I'd like to open a new JIRA to change this bolt and HiveBolt at the same time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37241329 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -80,6 +86,11 @@ public HdfsBolt addRotationAction(RotationAction action) { return this; } + public HdfsBolt withTickTupleIntervalSeconds(int interval) { — End diff – I like that change since it separates the function from the implementation. A similar change could be made to HiveBolt, to which I also added a tick tuple functionality recently. If this PR is otherwise ok, I'd like to open a new JIRA to change this bolt and HiveBolt at the same time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r37241514

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector

          @Override
          public void execute(Tuple tuple) {

          • try {
          • byte[] bytes = this.format.format(tuple);
          • synchronized (this.writeLock) {
          • out.write(bytes);
          • this.offset += bytes.length;
            -
          • if (this.syncPolicy.mark(tuple, this.offset)) {
          • if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - }

            else

            { - this.out.hsync(); - }
          • this.syncPolicy.reset();
            + boolean forceRotate = false;
            + synchronized (this.writeLock) {
            + boolean forceSync = false;
            + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

            + else

            Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } }
          • this.collector.ack(tuple);
            + if (this.syncPolicy.mark(tuple, this.offset) || forceSync)
            Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + }

            + }

          • if(this.rotationPolicy.mark(tuple, this.offset)){
          • rotateOutputFile(); // synchronized
          • this.offset = 0;
          • this.rotationPolicy.reset();
            + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) {
            + try { + rotateAndReset(); + }

            catch (IOException e)

            { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. }
          • } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); }

            }

          + private void rotateAndReset() throws IOException {
          — End diff –

          I suppose it's possible that empty files would be created, but if a filesystem is in such a bad state I would expect creating new files to fail as well.

          Killing the task after enough failures is an interesting idea... what default value for # of attempts would you consider reasonable?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37241514 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector @Override public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); synchronized (this.writeLock) { out.write(bytes); this.offset += bytes.length; - if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } this.syncPolicy.reset(); + boolean forceRotate = false; + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } + else Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } } this.collector.ack(tuple); + if (this.syncPolicy.mark(tuple, this.offset) || forceSync) Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + } + } if(this.rotationPolicy.mark(tuple, this.offset)){ rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) { + try { + rotateAndReset(); + } catch (IOException e) { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. } } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); } } + private void rotateAndReset() throws IOException { — End diff – I suppose it's possible that empty files would be created, but if a filesystem is in such a bad state I would expect creating new files to fail as well. Killing the task after enough failures is an interesting idea... what default value for # of attempts would you consider reasonable?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r37242296

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector

          @Override
          public void execute(Tuple tuple) {

          • try {
          • byte[] bytes = this.format.format(tuple);
          • synchronized (this.writeLock) {
          • out.write(bytes);
          • this.offset += bytes.length;
            -
          • if (this.syncPolicy.mark(tuple, this.offset)) {
          • if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - }

            else

            { - this.out.hsync(); - }
          • this.syncPolicy.reset();
            + boolean forceRotate = false;
            + synchronized (this.writeLock) {
            + boolean forceSync = false;
            + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

            + else

            Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } }
          • this.collector.ack(tuple);
            + if (this.syncPolicy.mark(tuple, this.offset) || forceSync)
            Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + }

            + }

          • if(this.rotationPolicy.mark(tuple, this.offset)){
          • rotateOutputFile(); // synchronized
          • this.offset = 0;
          • this.rotationPolicy.reset();
            + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) {
            + try { + rotateAndReset(); + }

            catch (IOException e)

            { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. }
          • } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); }

            }

          + private void rotateAndReset() throws IOException

          { + rotateOutputFile(); // synchronized + this.offset = 0; + this.rotationPolicy.reset(); + }

          +
          + private void syncAndAckTuples() throws IOException {
          — End diff –

          Agreed, just pushed a commit to do that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37242296 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector @Override public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); synchronized (this.writeLock) { out.write(bytes); this.offset += bytes.length; - if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } this.syncPolicy.reset(); + boolean forceRotate = false; + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } + else Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } } this.collector.ack(tuple); + if (this.syncPolicy.mark(tuple, this.offset) || forceSync) Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + } + } if(this.rotationPolicy.mark(tuple, this.offset)){ rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) { + try { + rotateAndReset(); + } catch (IOException e) { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. } } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); } } + private void rotateAndReset() throws IOException { + rotateOutputFile(); // synchronized + this.offset = 0; + this.rotationPolicy.reset(); + } + + private void syncAndAckTuples() throws IOException { — End diff – Agreed, just pushed a commit to do that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r37263240

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -80,6 +86,11 @@ public HdfsBolt addRotationAction(RotationAction action)

          { return this; }

          + public HdfsBolt withTickTupleIntervalSeconds(int interval) {
          — End diff –

          Assuming that would address default, min and max values for the interval, should be fine.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37263240 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -80,6 +86,11 @@ public HdfsBolt addRotationAction(RotationAction action) { return this; } + public HdfsBolt withTickTupleIntervalSeconds(int interval) { — End diff – Assuming that would address default, min and max values for the interval, should be fine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r37263411

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector

          @Override
          public void execute(Tuple tuple) {

          • try {
          • byte[] bytes = this.format.format(tuple);
          • synchronized (this.writeLock) {
          • out.write(bytes);
          • this.offset += bytes.length;
            -
          • if (this.syncPolicy.mark(tuple, this.offset)) {
          • if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - }

            else

            { - this.out.hsync(); - }
          • this.syncPolicy.reset();
            + boolean forceRotate = false;
            + synchronized (this.writeLock) {
            + boolean forceSync = false;
            + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

            + else

            Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } }
          • this.collector.ack(tuple);
            + if (this.syncPolicy.mark(tuple, this.offset) || forceSync)
            Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + }

            + }

          • if(this.rotationPolicy.mark(tuple, this.offset)){
          • rotateOutputFile(); // synchronized
          • this.offset = 0;
          • this.rotationPolicy.reset();
            + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) {
            + try { + rotateAndReset(); + }

            catch (IOException e)

            { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. }
          • } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); }

            }

          + private void rotateAndReset() throws IOException {
          — End diff –

          You could have a low default (say 10 continuous failures or so) and also add an option for the users to change this. @harshach can you recommend a default number of retries after which the task can exit so that storm would spawn the task in another worker where it might succeed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r37263411 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector @Override public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); synchronized (this.writeLock) { out.write(bytes); this.offset += bytes.length; - if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } this.syncPolicy.reset(); + boolean forceRotate = false; + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } + else Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } } this.collector.ack(tuple); + if (this.syncPolicy.mark(tuple, this.offset) || forceSync) Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + } + } if(this.rotationPolicy.mark(tuple, this.offset)){ rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) { + try { + rotateAndReset(); + } catch (IOException e) { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. } } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); } } + private void rotateAndReset() throws IOException { — End diff – You could have a low default (say 10 continuous failures or so) and also add an option for the users to change this. @harshach can you recommend a default number of retries after which the task can exit so that storm would spawn the task in another worker where it might succeed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on a diff in the pull request:

          https://github.com/apache/storm/pull/664#discussion_r38272576

          — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java —
          @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector

          @Override
          public void execute(Tuple tuple) {

          • try {
          • byte[] bytes = this.format.format(tuple);
          • synchronized (this.writeLock) {
          • out.write(bytes);
          • this.offset += bytes.length;
            -
          • if (this.syncPolicy.mark(tuple, this.offset)) {
          • if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - }

            else

            { - this.out.hsync(); - }
          • this.syncPolicy.reset();
            + boolean forceRotate = false;
            + synchronized (this.writeLock) {
            + boolean forceSync = false;
            + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + }

            + else

            Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } }
          • this.collector.ack(tuple);
            + if (this.syncPolicy.mark(tuple, this.offset) || forceSync)
            Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + }

            + }

          • if(this.rotationPolicy.mark(tuple, this.offset)){
          • rotateOutputFile(); // synchronized
          • this.offset = 0;
          • this.rotationPolicy.reset();
            + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) {
            + try { + rotateAndReset(); + }

            catch (IOException e)

            { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. }
          • } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); }

            }

          + private void rotateAndReset() throws IOException {
          — End diff –

          @dossett
          "I suppose it's possible that empty files would be created, but if a filesystem is in such a bad state I would expect creating new files to fail as well."
          This might bring down namenode with too many files.

          as @arunmahadevan pointed we should have retryCount I would start with 3 failed attempts and a configurable option

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/664#discussion_r38272576 — Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java — @@ -88,35 +99,94 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector @Override public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); synchronized (this.writeLock) { out.write(bytes); this.offset += bytes.length; - if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } this.syncPolicy.reset(); + boolean forceRotate = false; + synchronized (this.writeLock) { + boolean forceSync = false; + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK! forcing a file system flush"); + forceSync = true; + } + else Unknown macro: { + try { + writeAndAddTuple(tuple); + } catch (IOException e) { + //If the write failed, try to sync anything already written + LOG.info("Tuple failed to write, forcing a flush of existing data."); + this.collector.reportError(e); + forceSync = true; + this.collector.fail(tuple); } } this.collector.ack(tuple); + if (this.syncPolicy.mark(tuple, this.offset) || forceSync) Unknown macro: { + try { + syncAndAckTuples(); + } catch (IOException e) { + LOG.warn("Data could not be synced to filesystem, failing this batch of tuples"); + this.collector.reportError(e); + //Force rotation to get a new file handle + forceRotate = true; + for (Tuple t : tupleBatch) + this.collector.fail(t); + tupleBatch.clear(); + } + } + } if(this.rotationPolicy.mark(tuple, this.offset)){ rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); + if(this.rotationPolicy.mark(tuple, this.offset) || forceRotate) { + try { + rotateAndReset(); + } catch (IOException e) { + this.collector.reportError(e); + LOG.warn("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. } } catch (IOException e) { - this.collector.reportError(e); - this.collector.fail(tuple); } } + private void rotateAndReset() throws IOException { — End diff – @dossett "I suppose it's possible that empty files would be created, but if a filesystem is in such a bad state I would expect creating new files to fail as well." This might bring down namenode with too many files. as @arunmahadevan pointed we should have retryCount I would start with 3 failed attempts and a configurable option
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-136433305

          Thank you @harshach and @arunmahadevan, I will incorporate this feedback soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-136433305 Thank you @harshach and @arunmahadevan, I will incorporate this feedback soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-136499556

          @harshach @arunmahadevan Changes made based on feedback, PR also rebased and squashed to one commit. Additional feedback welcome, thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-136499556 @harshach @arunmahadevan Changes made based on feedback, PR also rebased and squashed to one commit. Additional feedback welcome, thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-136775368

          +1 overall the patch looks good once the [comment] (https://github.com/apache/storm/pull/664#commitcomment-13003095) is addressed

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-136775368 +1 overall the patch looks good once the [comment] ( https://github.com/apache/storm/pull/664#commitcomment-13003095 ) is addressed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-136876170

          @arunmahadevan Our approach is to set the tick tuple frequency to be half of the message timeout setting for the topology. Can the bolt get access to that topology setting? prepare() passes a TopologyContext to the bolt but I don't see a way to get configuration information out of it. However, I am not that familiar with TopologyContext.

          If that's not possible, the bolt could just set a flush frequency of 15 seconds since the storm timeout default is 30 seconds and assume that anyone changing the timeout setting should also be adjusting the flush frequency as well

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-136876170 @arunmahadevan Our approach is to set the tick tuple frequency to be half of the message timeout setting for the topology. Can the bolt get access to that topology setting? prepare() passes a TopologyContext to the bolt but I don't see a way to get configuration information out of it. However, I am not that familiar with TopologyContext. If that's not possible, the bolt could just set a flush frequency of 15 seconds since the storm timeout default is 30 seconds and assume that anyone changing the timeout setting should also be adjusting the flush frequency as well
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-136889084

          The value of `topology.message.timeout.secs` can be read from the `conf` that gets passed as the first parameter to `doPrepare`

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-136889084 The value of `topology.message.timeout.secs` can be read from the `conf` that gets passed as the first parameter to `doPrepare`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-137064748

          @arunmahadevan So it can! PR updated.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-137064748 @arunmahadevan So it can! PR updated.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-138607186

          @arunmahadevan @harshach Do you have any further comments about my updated PR? Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-138607186 @arunmahadevan @harshach Do you have any further comments about my updated PR? Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arunmahadevan commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-138639789

          Looks good to me.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-138639789 Looks good to me.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dossett commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-143341056

          @harshach (or other commiters) do you have feedback about this PR?

          Anecdotally, this has been very useful to us in production. We had an HDFS restart, which created the exact situation I tested with (failed writes due to HDFS safe mode) but the bolt recovered without a topology restart.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dossett commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-143341056 @harshach (or other commiters) do you have feedback about this PR? Anecdotally, this has been very useful to us in production. We had an HDFS restart, which created the exact situation I tested with (failed writes due to HDFS safe mode) but the bolt recovered without a topology restart.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-143884369

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-143884369 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user harshach commented on the pull request:

          https://github.com/apache/storm/pull/664#issuecomment-143894338

          Thanks @dossett merged into master.

          Show
          githubbot ASF GitHub Bot added a comment - Github user harshach commented on the pull request: https://github.com/apache/storm/pull/664#issuecomment-143894338 Thanks @dossett merged into master.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/storm/pull/664

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/664

            People

            • Assignee:
              dossett Aaron Dossett
              Reporter:
              dossett@gmail.com Aaron Dossett
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development