Uploaded image for project: 'Tephra'
  1. Tephra
  2. TEPHRA-243

Fix concurrency issues in Transaction log writer; also improve the logging

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.12.0-incubating
    • Fix Version/s: 0.13.0-incubating
    • Component/s: None
    • Labels:
      None

      Description

      Currently we get this message:

      2017-08-12 00:59:46,938 - INFO [TTransactionServer-rpc-857:o.a.t.p.AbstractTransactionLog@102] - Slow append to log txlog.1502517541689, took 1431 msec.
      

      It would be more useful to know how many bytes were written, how many edits were in this sync.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/incubator-tephra/pull/53

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/incubator-tephra/pull/53
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138759678

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -85,105 +100,145 @@ public long getTimestamp() {

          @Override
          public void append(TransactionEdit edit) throws IOException {

          • long startTime = System.nanoTime();
          • synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - }

            -

          • // wait for sync to complete
          • sync();
          • long durationMillis = (System.nanoTime() - startTime) / 1000000L;
          • if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - }
            + append(Collections.singletonList(edit));
            }

            @Override
            public void append(List<TransactionEdit> edits) throws IOException {
            - long startTime = System.nanoTime();
            - synchronized (this) {
            - ensureAvailable();
            -
            + if (closing) { // or closed, which implies closing + throw new IOException("Log " + getName() + " is closing or already closed, cannot append"); + }
            + if (!initialized) { + init(); + }
            + // synchronizing here ensures that elements in the queue are ordered by seq number
            + synchronized (logSequence) {
            for (TransactionEdit edit : edits) { - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); + pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()), edit)); }
            }
            -
            - // wait for sync to complete
            + // try to sync all pending edits (competing for this with other threads)
            sync();
            - long durationMillis = (System.nanoTime() - startTime) / 1000000L;
            - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - }

            }

          • private void ensureAvailable() throws IOException {
          • if (closed) { - throw new IOException("Log " + getName() + " is already closed, cannot append!"); - }
          • if (!initialized) {
          • init();
            + /**
            + * Return all pending writes at the time the method is called, or null if no writes are pending.
            + *
            + * Note that after this method returns, there can be additional pending writes,
            + * added concurrently while the existing pending writes are removed.
            + */
            + @Nullable
            + private Entry[] getPendingWrites() {
            + synchronized (this)
            Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; }

            }

          • /*
          • * Appends new writes to the pendingWrites. It is better to keep it in
          • * our own queue rather than writing it to the HDFS output stream because
          • * HDFSOutputStream.writeChunk is not lightweight at all.
            + /**
            + * When multiple threads try to log edits at the same time, they all will call (@link #append}
            + * followed by {@link #sync()}

            , concurrently. Hence, it can happen that multiple

            {@code append()}
            + * are followed by a single {@code sync}, or vice versa.
            + *
            + * We want to record the time and position of the first {@code append()}

            after a

            {@code sync()},
            + * then measure the time after the next {@code sync()}

            , and log a warning if it exceeds a threshold.
            + * Therefore this is called every time before we write the pending list out to the log writer.
            + *
            + * See

            {@link #stopTimer(TransactionLogWriter)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            */

          • private void append(Entry e) throws IOException {
          • pendingWrites.add(e);
            + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException
            Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; }
          • // Returns all currently pending writes. New writes
          • // will accumulate in a new list.
          • private List<Entry> getPendingWrites() {
          • synchronized (this) {
          • List<Entry> save = this.pendingWrites;
          • this.pendingWrites = new LinkedList<>();
          • return save;
            + /**
            + * Called by a {@code sync()}

            after flushing to file system. Issues a warning if the write(s)+sync
            + * together exceed a threshold.
            + *
            + * See

            {@link #startTimerIfNeeded(TransactionLogWriter, int)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            + */
            + private void stopTimer(TransactionLogWriter writer) throws IOException {
            + // this method is only called by a thread if it actually called sync(), inside a sync block
            + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case
            + stopWatch.stop();
            + long elapsed = stopWatch.elapsedMillis();
            + long bytesWritten = writer.getPosition() - positionBeforeWrite;
            + if (elapsed >= slowAppendThreshold) {
            + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
            + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
            + }
            + metricsCollector.histogram("wal.sync.size", countSinceLastSync);
            + metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync won't exceed max int
            }
            + positionBeforeWrite = -1L;
            + countSinceLastSync = 0;
            }

          private void sync() throws IOException {
          // writes out pending entries to the HLog

          • TransactionLogWriter tmpWriter = null;
            long latestSeq = 0;
            int entryCount = 0;
            synchronized (this) {
            if (closed) { - return; - }
          • // prevent writer being dereferenced
          • tmpWriter = writer;
            -
          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) {
          • tmpWriter.commitMarker(currentPending.size());
            + if (pendingWrites.isEmpty()) { + // this expected: close() sets closed to true after syncing all pending writes (including ours) + return; + }

            + // this should never happen because close() only sets closed=true after syncing.
            + // but if it should happen, we must fail this call because we don't know whether the edit was persisted
            + throw new IOException(
            + "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted");
            }
            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null)
            Unknown macro: { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e }

            }

          • long lastSynced = syncedUpTo.get();
            + // giving up the sync lock here allows other threads to write their edits before the sync happens.
            + // hence, we can have the edits from n threads in one sync.
            +
            // someone else might have already synced our edits, avoid double syncing
          • if (lastSynced < latestSeq) {
          • tmpWriter.sync();
          • metricsCollector.histogram("wal.sync.size", entryCount);
          • syncedUpTo.compareAndSet(lastSynced, latestSeq);
            + if (syncedUpTo < latestSeq) {
              • End diff –

          Please see the previous discussion of this at https://github.com/apache/incubator-tephra/pull/53#pullrequestreview-62374698

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138759678 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -85,105 +100,145 @@ public long getTimestamp() { @Override public void append(TransactionEdit edit) throws IOException { long startTime = System.nanoTime(); synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - } - // wait for sync to complete sync(); long durationMillis = (System.nanoTime() - startTime) / 1000000L; if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } + append(Collections.singletonList(edit)); } @Override public void append(List<TransactionEdit> edits) throws IOException { - long startTime = System.nanoTime(); - synchronized (this) { - ensureAvailable(); - + if (closing) { // or closed, which implies closing + throw new IOException("Log " + getName() + " is closing or already closed, cannot append"); + } + if (!initialized) { + init(); + } + // synchronizing here ensures that elements in the queue are ordered by seq number + synchronized (logSequence) { for (TransactionEdit edit : edits) { - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); + pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()), edit)); } } - - // wait for sync to complete + // try to sync all pending edits (competing for this with other threads) sync(); - long durationMillis = (System.nanoTime() - startTime) / 1000000L; - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } } private void ensureAvailable() throws IOException { if (closed) { - throw new IOException("Log " + getName() + " is already closed, cannot append!"); - } if (!initialized) { init(); + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; } } /* * Appends new writes to the pendingWrites. It is better to keep it in * our own queue rather than writing it to the HDFS output stream because * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()} , concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()} , and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimer(TransactionLogWriter)} . + * + * @throws IOException if the position of the writer cannot be determined */ private void append(Entry e) throws IOException { pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } // Returns all currently pending writes. New writes // will accumulate in a new list. private List<Entry> getPendingWrites() { synchronized (this) { List<Entry> save = this.pendingWrites; this.pendingWrites = new LinkedList<>(); return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)} . + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimer(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + long bytesWritten = writer.getPosition() - positionBeforeWrite; + if (elapsed >= slowAppendThreshold) { + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } + metricsCollector.histogram("wal.sync.size", countSinceLastSync); + metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync won't exceed max int } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { - return; - } // prevent writer being dereferenced tmpWriter = writer; - List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { tmpWriter.commitMarker(currentPending.size()); + if (pendingWrites.isEmpty()) { + // this expected: close() sets closed to true after syncing all pending writes (including ours) + return; + } + // this should never happen because close() only sets closed=true after syncing. + // but if it should happen, we must fail this call because we don't know whether the edit was persisted + throw new IOException( + "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted"); } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) Unknown macro: { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e } } long lastSynced = syncedUpTo.get(); + // giving up the sync lock here allows other threads to write their edits before the sync happens. + // hence, we can have the edits from n threads in one sync. + // someone else might have already synced our edits, avoid double syncing if (lastSynced < latestSeq) { tmpWriter.sync(); metricsCollector.histogram("wal.sync.size", entryCount); syncedUpTo.compareAndSet(lastSynced, latestSeq); + if (syncedUpTo < latestSeq) { End diff – Please see the previous discussion of this at https://github.com/apache/incubator-tephra/pull/53#pullrequestreview-62374698
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138757206

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -85,105 +100,145 @@ public long getTimestamp() {

          @Override
          public void append(TransactionEdit edit) throws IOException {

          • long startTime = System.nanoTime();
          • synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - }

            -

          • // wait for sync to complete
          • sync();
          • long durationMillis = (System.nanoTime() - startTime) / 1000000L;
          • if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - }
            + append(Collections.singletonList(edit));
            }

            @Override
            public void append(List<TransactionEdit> edits) throws IOException {
            - long startTime = System.nanoTime();
            - synchronized (this) {
            - ensureAvailable();
            -
            + if (closing) { // or closed, which implies closing + throw new IOException("Log " + getName() + " is closing or already closed, cannot append"); + }
            + if (!initialized) { + init(); + }
            + // synchronizing here ensures that elements in the queue are ordered by seq number
            + synchronized (logSequence) {
            for (TransactionEdit edit : edits) { - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); + pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()), edit)); }
            }
            -
            - // wait for sync to complete
            + // try to sync all pending edits (competing for this with other threads)
            sync();
            - long durationMillis = (System.nanoTime() - startTime) / 1000000L;
            - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - }

            }

          • private void ensureAvailable() throws IOException {
          • if (closed) { - throw new IOException("Log " + getName() + " is already closed, cannot append!"); - }
          • if (!initialized) {
          • init();
            + /**
            + * Return all pending writes at the time the method is called, or null if no writes are pending.
            + *
            + * Note that after this method returns, there can be additional pending writes,
            + * added concurrently while the existing pending writes are removed.
            + */
            + @Nullable
            + private Entry[] getPendingWrites() {
            + synchronized (this)
            Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; }

            }

          • /*
          • * Appends new writes to the pendingWrites. It is better to keep it in
          • * our own queue rather than writing it to the HDFS output stream because
          • * HDFSOutputStream.writeChunk is not lightweight at all.
            + /**
            + * When multiple threads try to log edits at the same time, they all will call (@link #append}
            + * followed by {@link #sync()}

            , concurrently. Hence, it can happen that multiple

            {@code append()}
            + * are followed by a single {@code sync}, or vice versa.
            + *
            + * We want to record the time and position of the first {@code append()}

            after a

            {@code sync()},
            + * then measure the time after the next {@code sync()}

            , and log a warning if it exceeds a threshold.
            + * Therefore this is called every time before we write the pending list out to the log writer.
            + *
            + * See

            {@link #stopTimer(TransactionLogWriter)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            */

          • private void append(Entry e) throws IOException {
          • pendingWrites.add(e);
            + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException
            Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; }
          • // Returns all currently pending writes. New writes
          • // will accumulate in a new list.
          • private List<Entry> getPendingWrites() {
          • synchronized (this) {
          • List<Entry> save = this.pendingWrites;
          • this.pendingWrites = new LinkedList<>();
          • return save;
            + /**
            + * Called by a {@code sync()}

            after flushing to file system. Issues a warning if the write(s)+sync
            + * together exceed a threshold.
            + *
            + * See

            {@link #startTimerIfNeeded(TransactionLogWriter, int)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            + */
            + private void stopTimer(TransactionLogWriter writer) throws IOException {
            + // this method is only called by a thread if it actually called sync(), inside a sync block
            + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case
            + stopWatch.stop();
            + long elapsed = stopWatch.elapsedMillis();
            + long bytesWritten = writer.getPosition() - positionBeforeWrite;
            + if (elapsed >= slowAppendThreshold) {
            + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
            + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
            + }
            + metricsCollector.histogram("wal.sync.size", countSinceLastSync);
            + metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync won't exceed max int
            }
            + positionBeforeWrite = -1L;
            + countSinceLastSync = 0;
            }

          private void sync() throws IOException {
          // writes out pending entries to the HLog

          • TransactionLogWriter tmpWriter = null;
            long latestSeq = 0;
            int entryCount = 0;
            synchronized (this) {
            if (closed) { - return; - }
          • // prevent writer being dereferenced
          • tmpWriter = writer;
            -
          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) {
          • tmpWriter.commitMarker(currentPending.size());
            + if (pendingWrites.isEmpty()) { + // this expected: close() sets closed to true after syncing all pending writes (including ours) + return; + }

            + // this should never happen because close() only sets closed=true after syncing.
            + // but if it should happen, we must fail this call because we don't know whether the edit was persisted
            + throw new IOException(
            + "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted");
            }
            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null)
            Unknown macro: { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e }

            }

          • long lastSynced = syncedUpTo.get();
            + // giving up the sync lock here allows other threads to write their edits before the sync happens.
            + // hence, we can have the edits from n threads in one sync.
            +
            // someone else might have already synced our edits, avoid double syncing
          • if (lastSynced < latestSeq) {
          • tmpWriter.sync();
          • metricsCollector.histogram("wal.sync.size", entryCount);
          • syncedUpTo.compareAndSet(lastSynced, latestSeq);
            + if (syncedUpTo < latestSeq) {
              • End diff –

          Since this check is happening outside the synchronized block, `latestSeq` may not give the right value - we can just use `writtenUpTo` instead of `latestSeq`.

          To be safe, let's move the definition of both `latestSeq` and `entryCount` into the synchronized block on line 196, so that they cannot be used outside the block.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138757206 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -85,105 +100,145 @@ public long getTimestamp() { @Override public void append(TransactionEdit edit) throws IOException { long startTime = System.nanoTime(); synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - } - // wait for sync to complete sync(); long durationMillis = (System.nanoTime() - startTime) / 1000000L; if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } + append(Collections.singletonList(edit)); } @Override public void append(List<TransactionEdit> edits) throws IOException { - long startTime = System.nanoTime(); - synchronized (this) { - ensureAvailable(); - + if (closing) { // or closed, which implies closing + throw new IOException("Log " + getName() + " is closing or already closed, cannot append"); + } + if (!initialized) { + init(); + } + // synchronizing here ensures that elements in the queue are ordered by seq number + synchronized (logSequence) { for (TransactionEdit edit : edits) { - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); + pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()), edit)); } } - - // wait for sync to complete + // try to sync all pending edits (competing for this with other threads) sync(); - long durationMillis = (System.nanoTime() - startTime) / 1000000L; - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } } private void ensureAvailable() throws IOException { if (closed) { - throw new IOException("Log " + getName() + " is already closed, cannot append!"); - } if (!initialized) { init(); + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; } } /* * Appends new writes to the pendingWrites. It is better to keep it in * our own queue rather than writing it to the HDFS output stream because * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()} , concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()} , and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimer(TransactionLogWriter)} . + * + * @throws IOException if the position of the writer cannot be determined */ private void append(Entry e) throws IOException { pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } // Returns all currently pending writes. New writes // will accumulate in a new list. private List<Entry> getPendingWrites() { synchronized (this) { List<Entry> save = this.pendingWrites; this.pendingWrites = new LinkedList<>(); return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)} . + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimer(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + long bytesWritten = writer.getPosition() - positionBeforeWrite; + if (elapsed >= slowAppendThreshold) { + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } + metricsCollector.histogram("wal.sync.size", countSinceLastSync); + metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync won't exceed max int } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { - return; - } // prevent writer being dereferenced tmpWriter = writer; - List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { tmpWriter.commitMarker(currentPending.size()); + if (pendingWrites.isEmpty()) { + // this expected: close() sets closed to true after syncing all pending writes (including ours) + return; + } + // this should never happen because close() only sets closed=true after syncing. + // but if it should happen, we must fail this call because we don't know whether the edit was persisted + throw new IOException( + "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted"); } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) Unknown macro: { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e } } long lastSynced = syncedUpTo.get(); + // giving up the sync lock here allows other threads to write their edits before the sync happens. + // hence, we can have the edits from n threads in one sync. + // someone else might have already synced our edits, avoid double syncing if (lastSynced < latestSeq) { tmpWriter.sync(); metricsCollector.histogram("wal.sync.size", entryCount); syncedUpTo.compareAndSet(lastSynced, latestSeq); + if (syncedUpTo < latestSeq) { End diff – Since this check is happening outside the synchronized block, `latestSeq` may not give the right value - we can just use `writtenUpTo` instead of `latestSeq`. To be safe, let's move the definition of both `latestSeq` and `entryCount` into the synchronized block on line 196, so that they cannot be used outside the block.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138733253

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
          }
          }

          • /*
          • * Appends new writes to the pendingWrites. It is better to keep it in
          • * our own queue rather than writing it to the HDFS output stream because
          • * HDFSOutputStream.writeChunk is not lightweight at all.
            + /**
            + * Return all pending writes at the time the method is called, or null if no writes are pending.
            + *
            + * Note that after this method returns, there can be additional pending writes,
            + * added concurrently while the existing pending writes are removed.
            + */
            + @Nullable
            + private Entry[] getPendingWrites() {
            + synchronized (this)
            Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + }

            + }
            +
            + /**
            + * When multiple threads try to log edits at the same time, they all will call (@link #append}
            + * followed by

            {@link #sync()}

            , concurrently. Hence, it can happen that multiple

            {@code append()}
            + * are followed by a single {@code sync}, or vice versa.
            + *
            + * We want to record the time and position of the first {@code append()}

            after a

            {@code sync()},
            + * then measure the time after the next {@code sync()}

            , and log a warning if it exceeds a threshold.
            + * Therefore this is called every time before we write the pending list out to the log writer.
            + *
            + * See

            {@link #stopTimerIfNeeded(TransactionLogWriter)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            */

          • private void append(Entry e) throws IOException {
          • pendingWrites.add(e);
            + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException
            Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; }
          • // Returns all currently pending writes. New writes
          • // will accumulate in a new list.
          • private List<Entry> getPendingWrites() {
          • synchronized (this) {
          • List<Entry> save = this.pendingWrites;
          • this.pendingWrites = new LinkedList<>();
          • return save;
            + /**
            + * Called by a {@code sync()}

            after flushing to file system. Issues a warning if the write(s)+sync
            + * together exceed a threshold.
            + *
            + * See

            {@link #startTimerIfNeeded(TransactionLogWriter, int)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            + */
            + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException {
            + // this method is only called by a thread if it actually called sync(), inside a sync block
            + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case
            + stopWatch.stop();
            + long elapsed = stopWatch.elapsedMillis();
            + if (elapsed >= slowAppendThreshold) {
            + long currentPosition = writer.getPosition();
            + long bytesWritten = currentPosition - positionBeforeWrite;
            + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
            + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
            + }
            }
            + positionBeforeWrite = -1L;
            + countSinceLastSync = 0;
            }

          private void sync() throws IOException {
          // writes out pending entries to the HLog

          • TransactionLogWriter tmpWriter = null;
            long latestSeq = 0;
            int entryCount = 0;
            synchronized (this) {
            if (closed) { return; }
          • // prevent writer being dereferenced
          • tmpWriter = writer;
            -
          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null)
            Unknown macro: { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e }

            }

          • long lastSynced = syncedUpTo.get();
            + // giving up the sync lock here allows other threads to write their edits before the sync happens.
            + // hence, we can have the edits from n threads in one sync.
            +
            // someone else might have already synced our edits, avoid double syncing
          • if (lastSynced < latestSeq) {
          • tmpWriter.sync();
          • metricsCollector.histogram("wal.sync.size", entryCount);
          • syncedUpTo.compareAndSet(lastSynced, latestSeq);
            + if (syncedUpTo < latestSeq) {
            + synchronized (this)
            Unknown macro: { + // someone else might have synced our edits while we were waiting + if (syncedUpTo < latestSeq) { + writer.sync(); + syncedUpTo = writtenUpTo; + stopTimerIfNeeded(writer); + } + }

            }
            + // in any case, emit metrics for the number entries we wrote.
            + // because the thread that actually syncs does not know how many it synced (it not write all of them)
            + metricsCollector.histogram("wal.sync.size", entryCount);

              • End diff –

          we could emit that metric in stopTimerIfNeeded(). And actually this one, too.

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138733253 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException { } } /* * Appends new writes to the pendingWrites. It is better to keep it in * our own queue rather than writing it to the HDFS output stream because * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + } + } + + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()} , concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()} , and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimerIfNeeded(TransactionLogWriter)} . + * + * @throws IOException if the position of the writer cannot be determined */ private void append(Entry e) throws IOException { pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } // Returns all currently pending writes. New writes // will accumulate in a new list. private List<Entry> getPendingWrites() { synchronized (this) { List<Entry> save = this.pendingWrites; this.pendingWrites = new LinkedList<>(); return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)} . + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + if (elapsed >= slowAppendThreshold) { + long currentPosition = writer.getPosition(); + long bytesWritten = currentPosition - positionBeforeWrite; + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { return; } // prevent writer being dereferenced tmpWriter = writer; - List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) Unknown macro: { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e } } long lastSynced = syncedUpTo.get(); + // giving up the sync lock here allows other threads to write their edits before the sync happens. + // hence, we can have the edits from n threads in one sync. + // someone else might have already synced our edits, avoid double syncing if (lastSynced < latestSeq) { tmpWriter.sync(); metricsCollector.histogram("wal.sync.size", entryCount); syncedUpTo.compareAndSet(lastSynced, latestSeq); + if (syncedUpTo < latestSeq) { + synchronized (this) Unknown macro: { + // someone else might have synced our edits while we were waiting + if (syncedUpTo < latestSeq) { + writer.sync(); + syncedUpTo = writtenUpTo; + stopTimerIfNeeded(writer); + } + } } + // in any case, emit metrics for the number entries we wrote. + // because the thread that actually syncs does not know how many it synced (it not write all of them) + metricsCollector.histogram("wal.sync.size", entryCount); End diff – we could emit that metric in stopTimerIfNeeded(). And actually this one, too.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138732890

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
          }
          }

          • /*
          • * Appends new writes to the pendingWrites. It is better to keep it in
          • * our own queue rather than writing it to the HDFS output stream because
          • * HDFSOutputStream.writeChunk is not lightweight at all.
            + /**
            + * Return all pending writes at the time the method is called, or null if no writes are pending.
            + *
            + * Note that after this method returns, there can be additional pending writes,
            + * added concurrently while the existing pending writes are removed.
            + */
            + @Nullable
            + private Entry[] getPendingWrites() {
            + synchronized (this)
            Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + }

            + }
            +
            + /**
            + * When multiple threads try to log edits at the same time, they all will call (@link #append}
            + * followed by

            {@link #sync()}

            , concurrently. Hence, it can happen that multiple

            {@code append()}
            + * are followed by a single {@code sync}, or vice versa.
            + *
            + * We want to record the time and position of the first {@code append()}

            after a

            {@code sync()},
            + * then measure the time after the next {@code sync()}

            , and log a warning if it exceeds a threshold.
            + * Therefore this is called every time before we write the pending list out to the log writer.
            + *
            + * See

            {@link #stopTimerIfNeeded(TransactionLogWriter)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            */

          • private void append(Entry e) throws IOException {
          • pendingWrites.add(e);
            + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException
            Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; }
          • // Returns all currently pending writes. New writes
          • // will accumulate in a new list.
          • private List<Entry> getPendingWrites() {
          • synchronized (this) {
          • List<Entry> save = this.pendingWrites;
          • this.pendingWrites = new LinkedList<>();
          • return save;
            + /**
            + * Called by a {@code sync()}

            after flushing to file system. Issues a warning if the write(s)+sync
            + * together exceed a threshold.
            + *
            + * See

            {@link #startTimerIfNeeded(TransactionLogWriter, int)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            + */
            + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException {
            + // this method is only called by a thread if it actually called sync(), inside a sync block
            + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case
            + stopWatch.stop();
            + long elapsed = stopWatch.elapsedMillis();
            + if (elapsed >= slowAppendThreshold) {
            + long currentPosition = writer.getPosition();
            + long bytesWritten = currentPosition - positionBeforeWrite;
            + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
            + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
            + }
            }
            + positionBeforeWrite = -1L;
            + countSinceLastSync = 0;
            }

          private void sync() throws IOException {
          // writes out pending entries to the HLog

          • TransactionLogWriter tmpWriter = null;
            long latestSeq = 0;
            int entryCount = 0;
            synchronized (this) {
            if (closed) {
            return;
              • End diff –

          It would not always be correct to fail, because the edits from the current thread may actually have been synced.

          The logic in close() (which is also synchronized) is that this only gets closed after syncing. So a sync has just happened. However, it seems that it is possible that append() is called concurrently and it might append new edits, so there is a possible race condition.

          I guess we need an extra flag named closing. Close would first set closing to true. Now all calls to append() will fail. Then it will sync and set closed to true, such that any remaining threads do not attempt to sync.

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138732890 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException { } } /* * Appends new writes to the pendingWrites. It is better to keep it in * our own queue rather than writing it to the HDFS output stream because * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + } + } + + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()} , concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()} , and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimerIfNeeded(TransactionLogWriter)} . + * + * @throws IOException if the position of the writer cannot be determined */ private void append(Entry e) throws IOException { pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } // Returns all currently pending writes. New writes // will accumulate in a new list. private List<Entry> getPendingWrites() { synchronized (this) { List<Entry> save = this.pendingWrites; this.pendingWrites = new LinkedList<>(); return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)} . + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + if (elapsed >= slowAppendThreshold) { + long currentPosition = writer.getPosition(); + long bytesWritten = currentPosition - positionBeforeWrite; + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { return; End diff – It would not always be correct to fail, because the edits from the current thread may actually have been synced. The logic in close() (which is also synchronized) is that this only gets closed after syncing. So a sync has just happened. However, it seems that it is possible that append() is called concurrently and it might append new edits, so there is a possible race condition. I guess we need an extra flag named closing. Close would first set closing to true. Now all calls to append() will fail. Then it will sync and set closed to true, such that any remaining threads do not attempt to sync.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138731295

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -85,44 +99,20 @@ public long getTimestamp() {

          @Override
          public void append(TransactionEdit edit) throws IOException {

          • long startTime = System.nanoTime();
          • synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - }

            -

          • // wait for sync to complete
          • sync();
          • long durationMillis = (System.nanoTime() - startTime) / 1000000L;
          • if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - }

            + append(Collections.singletonList(edit));
            }

          @Override
          public void append(List<TransactionEdit> edits) throws IOException {

          • long startTime = System.nanoTime();
          • synchronized (this) {
            + // synchronizing here ensures that elements in the queue are ordered by seq number
            + synchronized (logSequence) {
            ensureAvailable();
              • End diff –

          Good point, actually it is only used in one place, so I will just inline it outside the synchronize.

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138731295 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -85,44 +99,20 @@ public long getTimestamp() { @Override public void append(TransactionEdit edit) throws IOException { long startTime = System.nanoTime(); synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - } - // wait for sync to complete sync(); long durationMillis = (System.nanoTime() - startTime) / 1000000L; if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } + append(Collections.singletonList(edit)); } @Override public void append(List<TransactionEdit> edits) throws IOException { long startTime = System.nanoTime(); synchronized (this) { + // synchronizing here ensures that elements in the queue are ordered by seq number + synchronized (logSequence) { ensureAvailable(); End diff – Good point, actually it is only used in one place, so I will just inline it outside the synchronize.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138724401

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
          }
          }

          • /*
          • * Appends new writes to the pendingWrites. It is better to keep it in
          • * our own queue rather than writing it to the HDFS output stream because
          • * HDFSOutputStream.writeChunk is not lightweight at all.
            + /**
            + * Return all pending writes at the time the method is called, or null if no writes are pending.
            + *
            + * Note that after this method returns, there can be additional pending writes,
            + * added concurrently while the existing pending writes are removed.
            + */
            + @Nullable
            + private Entry[] getPendingWrites() {
            + synchronized (this)
            Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + }

            + }
            +
            + /**
            + * When multiple threads try to log edits at the same time, they all will call (@link #append}
            + * followed by

            {@link #sync()}

            , concurrently. Hence, it can happen that multiple

            {@code append()}
            + * are followed by a single {@code sync}, or vice versa.
            + *
            + * We want to record the time and position of the first {@code append()}

            after a

            {@code sync()},
            + * then measure the time after the next {@code sync()}

            , and log a warning if it exceeds a threshold.
            + * Therefore this is called every time before we write the pending list out to the log writer.
            + *
            + * See

            {@link #stopTimerIfNeeded(TransactionLogWriter)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            */

          • private void append(Entry e) throws IOException {
          • pendingWrites.add(e);
            + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException
            Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; }
          • // Returns all currently pending writes. New writes
          • // will accumulate in a new list.
          • private List<Entry> getPendingWrites() {
          • synchronized (this) {
          • List<Entry> save = this.pendingWrites;
          • this.pendingWrites = new LinkedList<>();
          • return save;
            + /**
            + * Called by a {@code sync()}

            after flushing to file system. Issues a warning if the write(s)+sync
            + * together exceed a threshold.
            + *
            + * See

            {@link #startTimerIfNeeded(TransactionLogWriter, int)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            + */
            + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException {
            + // this method is only called by a thread if it actually called sync(), inside a sync block
            + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case
            + stopWatch.stop();
            + long elapsed = stopWatch.elapsedMillis();
            + if (elapsed >= slowAppendThreshold) {
            + long currentPosition = writer.getPosition();
            + long bytesWritten = currentPosition - positionBeforeWrite;
            + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
            + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
            + }
            }
            + positionBeforeWrite = -1L;
            + countSinceLastSync = 0;
            }

          private void sync() throws IOException {
          // writes out pending entries to the HLog

          • TransactionLogWriter tmpWriter = null;
            long latestSeq = 0;
            int entryCount = 0;
            synchronized (this) {
            if (closed) {
            return;
              • End diff –

          It would be good if this throws an exception to indicate that the entries could not be synced. Silently returning will return success to the clients.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138724401 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException { } } /* * Appends new writes to the pendingWrites. It is better to keep it in * our own queue rather than writing it to the HDFS output stream because * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + } + } + + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()} , concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()} , and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimerIfNeeded(TransactionLogWriter)} . + * + * @throws IOException if the position of the writer cannot be determined */ private void append(Entry e) throws IOException { pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } // Returns all currently pending writes. New writes // will accumulate in a new list. private List<Entry> getPendingWrites() { synchronized (this) { List<Entry> save = this.pendingWrites; this.pendingWrites = new LinkedList<>(); return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)} . + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + if (elapsed >= slowAppendThreshold) { + long currentPosition = writer.getPosition(); + long bytesWritten = currentPosition - positionBeforeWrite; + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { return; End diff – It would be good if this throws an exception to indicate that the entries could not be synced. Silently returning will return success to the clients.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138720628

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
          }
          }

          • /*
          • * Appends new writes to the pendingWrites. It is better to keep it in
          • * our own queue rather than writing it to the HDFS output stream because
          • * HDFSOutputStream.writeChunk is not lightweight at all.
            + /**
            + * Return all pending writes at the time the method is called, or null if no writes are pending.
            + *
            + * Note that after this method returns, there can be additional pending writes,
            + * added concurrently while the existing pending writes are removed.
            + */
            + @Nullable
            + private Entry[] getPendingWrites() {
            + synchronized (this)
            Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + }

            + }
            +
            + /**
            + * When multiple threads try to log edits at the same time, they all will call (@link #append}
            + * followed by

            {@link #sync()}

            , concurrently. Hence, it can happen that multiple

            {@code append()}
            + * are followed by a single {@code sync}, or vice versa.
            + *
            + * We want to record the time and position of the first {@code append()}

            after a

            {@code sync()},
            + * then measure the time after the next {@code sync()}

            , and log a warning if it exceeds a threshold.
            + * Therefore this is called every time before we write the pending list out to the log writer.
            + *
            + * See

            {@link #stopTimerIfNeeded(TransactionLogWriter)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            */

          • private void append(Entry e) throws IOException {
          • pendingWrites.add(e);
            + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException
            Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; }
          • // Returns all currently pending writes. New writes
          • // will accumulate in a new list.
          • private List<Entry> getPendingWrites() {
          • synchronized (this) {
          • List<Entry> save = this.pendingWrites;
          • this.pendingWrites = new LinkedList<>();
          • return save;
            + /**
            + * Called by a {@code sync()}

            after flushing to file system. Issues a warning if the write(s)+sync
            + * together exceed a threshold.
            + *
            + * See

            {@link #startTimerIfNeeded(TransactionLogWriter, int)}

            .
            + *
            + * @throws IOException if the position of the writer cannot be determined
            + */
            + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException {
            + // this method is only called by a thread if it actually called sync(), inside a sync block
            + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case
            + stopWatch.stop();
            + long elapsed = stopWatch.elapsedMillis();
            + if (elapsed >= slowAppendThreshold) {
            + long currentPosition = writer.getPosition();
            + long bytesWritten = currentPosition - positionBeforeWrite;
            + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
            + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
            + }
            }
            + positionBeforeWrite = -1L;
            + countSinceLastSync = 0;
            }

          private void sync() throws IOException {
          // writes out pending entries to the HLog

          • TransactionLogWriter tmpWriter = null;
            long latestSeq = 0;
            int entryCount = 0;
            synchronized (this) {
            if (closed) { return; }
          • // prevent writer being dereferenced
          • tmpWriter = writer;
            -
          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null)
            Unknown macro: { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e }

            }

          • long lastSynced = syncedUpTo.get();
            + // giving up the sync lock here allows other threads to write their edits before the sync happens.
            + // hence, we can have the edits from n threads in one sync.
            +
            // someone else might have already synced our edits, avoid double syncing
          • if (lastSynced < latestSeq) {
          • tmpWriter.sync();
          • metricsCollector.histogram("wal.sync.size", entryCount);
          • syncedUpTo.compareAndSet(lastSynced, latestSeq);
            + if (syncedUpTo < latestSeq) {
            + synchronized (this)
            Unknown macro: { + // someone else might have synced our edits while we were waiting + if (syncedUpTo < latestSeq) { + writer.sync(); + syncedUpTo = writtenUpTo; + stopTimerIfNeeded(writer); + } + }

            }
            + // in any case, emit metrics for the number entries we wrote.
            + // because the thread that actually syncs does not know how many it synced (it not write all of them)
            + metricsCollector.histogram("wal.sync.size", entryCount);

              • End diff –

          Can we then emit metrics for sync bytes then?

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138720628 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException { } } /* * Appends new writes to the pendingWrites. It is better to keep it in * our own queue rather than writing it to the HDFS output stream because * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) Unknown macro: { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + } + } + + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()} , concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()} , and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimerIfNeeded(TransactionLogWriter)} . + * + * @throws IOException if the position of the writer cannot be determined */ private void append(Entry e) throws IOException { pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException Unknown macro: { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } // Returns all currently pending writes. New writes // will accumulate in a new list. private List<Entry> getPendingWrites() { synchronized (this) { List<Entry> save = this.pendingWrites; this.pendingWrites = new LinkedList<>(); return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)} . + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + if (elapsed >= slowAppendThreshold) { + long currentPosition = writer.getPosition(); + long bytesWritten = currentPosition - positionBeforeWrite; + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { return; } // prevent writer being dereferenced tmpWriter = writer; - List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) Unknown macro: { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e } } long lastSynced = syncedUpTo.get(); + // giving up the sync lock here allows other threads to write their edits before the sync happens. + // hence, we can have the edits from n threads in one sync. + // someone else might have already synced our edits, avoid double syncing if (lastSynced < latestSeq) { tmpWriter.sync(); metricsCollector.histogram("wal.sync.size", entryCount); syncedUpTo.compareAndSet(lastSynced, latestSeq); + if (syncedUpTo < latestSeq) { + synchronized (this) Unknown macro: { + // someone else might have synced our edits while we were waiting + if (syncedUpTo < latestSeq) { + writer.sync(); + syncedUpTo = writtenUpTo; + stopTimerIfNeeded(writer); + } + } } + // in any case, emit metrics for the number entries we wrote. + // because the thread that actually syncs does not know how many it synced (it not write all of them) + metricsCollector.histogram("wal.sync.size", entryCount); End diff – Can we then emit metrics for sync bytes then?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138723289

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -85,44 +99,20 @@ public long getTimestamp() {

          @Override
          public void append(TransactionEdit edit) throws IOException {

          • long startTime = System.nanoTime();
          • synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - }

            -

          • // wait for sync to complete
          • sync();
          • long durationMillis = (System.nanoTime() - startTime) / 1000000L;
          • if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - }

            + append(Collections.singletonList(edit));
            }

          @Override
          public void append(List<TransactionEdit> edits) throws IOException {

          • long startTime = System.nanoTime();
          • synchronized (this) {
            + // synchronizing here ensures that elements in the queue are ordered by seq number
            + synchronized (logSequence) {
            ensureAvailable();
              • End diff –

          `ensureAvailable()` can move out of the synchronized block since it uses a couple of volatile variables and calls a synchronized method `init()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138723289 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -85,44 +99,20 @@ public long getTimestamp() { @Override public void append(TransactionEdit edit) throws IOException { long startTime = System.nanoTime(); synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - } - // wait for sync to complete sync(); long durationMillis = (System.nanoTime() - startTime) / 1000000L; if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } + append(Collections.singletonList(edit)); } @Override public void append(List<TransactionEdit> edits) throws IOException { long startTime = System.nanoTime(); synchronized (this) { + // synchronizing here ensures that elements in the queue are ordered by seq number + synchronized (logSequence) { ensureAvailable(); End diff – `ensureAvailable()` can move out of the synchronized block since it uses a couple of volatile variables and calls a synchronized method `init()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on the issue:

          https://github.com/apache/incubator-tephra/pull/53

          LGTM

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on the issue: https://github.com/apache/incubator-tephra/pull/53 LGTM
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138692966

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +51,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;

          • private AtomicLong syncedUpTo = new AtomicLong();
          • private List<Entry> pendingWrites = Lists.newLinkedList();
            + private long writtenUpTo = 0L;
            + private volatile long syncedUpTo = 0L;
            + private final Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
            private TransactionLogWriter writer;
          • public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
            + private int countSinceLastSync = 0;
            + private long positionBeforeWrite = -1L;
            + private final Stopwatch stopWatch = new Stopwatch();
            +
            + private final long slowAppendThreshold;
            +
            + AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector, Configuration conf) { this.timestamp = timestamp; this.metricsCollector = metricsCollector; + this.slowAppendThreshold = conf.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, + TxConstants.TransactionLog.DEFAULT_SLOW_APPEND_THRESHOLD); }

          /**

          • Initializes the log file, opening a file writer. Clients calling {@code init()}

            should ensure that they

          • also call {@link HDFSTransactionLog#close()}

            .

          • @throws java.io.IOException If an error is encountered initializing the file writer.
            */
          • public synchronized void init() throws IOException {
            + private synchronized void init() throws IOException {
              • End diff –

          yes, I noticed that, too, that's why I made it private, but forgot to update the Javadoc

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138692966 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +51,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; private AtomicLong syncedUpTo = new AtomicLong(); private List<Entry> pendingWrites = Lists.newLinkedList(); + private long writtenUpTo = 0L; + private volatile long syncedUpTo = 0L; + private final Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>(); private TransactionLogWriter writer; public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { + private int countSinceLastSync = 0; + private long positionBeforeWrite = -1L; + private final Stopwatch stopWatch = new Stopwatch(); + + private final long slowAppendThreshold; + + AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector, Configuration conf) { this.timestamp = timestamp; this.metricsCollector = metricsCollector; + this.slowAppendThreshold = conf.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, + TxConstants.TransactionLog.DEFAULT_SLOW_APPEND_THRESHOLD); } /** Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they also call {@link HDFSTransactionLog#close()} . @throws java.io.IOException If an error is encountered initializing the file writer. */ public synchronized void init() throws IOException { + private synchronized void init() throws IOException { End diff – yes, I noticed that, too, that's why I made it private, but forgot to update the Javadoc
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on the issue:

          https://github.com/apache/incubator-tephra/pull/53

          Just one last comment about javadoc, rest LGTM.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on the issue: https://github.com/apache/incubator-tephra/pull/53 Just one last comment about javadoc, rest LGTM.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138691780

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +51,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;

          • private AtomicLong syncedUpTo = new AtomicLong();
          • private List<Entry> pendingWrites = Lists.newLinkedList();
            + private long writtenUpTo = 0L;
            + private volatile long syncedUpTo = 0L;
            + private final Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
            private TransactionLogWriter writer;
          • public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
            + private int countSinceLastSync = 0;
            + private long positionBeforeWrite = -1L;
            + private final Stopwatch stopWatch = new Stopwatch();
            +
            + private final long slowAppendThreshold;
            +
            + AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector, Configuration conf) { this.timestamp = timestamp; this.metricsCollector = metricsCollector; + this.slowAppendThreshold = conf.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, + TxConstants.TransactionLog.DEFAULT_SLOW_APPEND_THRESHOLD); }

          /**

          • Initializes the log file, opening a file writer. Clients calling {@code init()}

            should ensure that they

          • also call {@link HDFSTransactionLog#close()}

            .

          • @throws java.io.IOException If an error is encountered initializing the file writer.
            */
          • public synchronized void init() throws IOException {
            + private synchronized void init() throws IOException {
              • End diff –

          Seems like the javadoc is actually wrong. No "client" is calling this method and the contract should be whoever construct this class should call "close"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138691780 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +51,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; private AtomicLong syncedUpTo = new AtomicLong(); private List<Entry> pendingWrites = Lists.newLinkedList(); + private long writtenUpTo = 0L; + private volatile long syncedUpTo = 0L; + private final Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>(); private TransactionLogWriter writer; public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { + private int countSinceLastSync = 0; + private long positionBeforeWrite = -1L; + private final Stopwatch stopWatch = new Stopwatch(); + + private final long slowAppendThreshold; + + AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector, Configuration conf) { this.timestamp = timestamp; this.metricsCollector = metricsCollector; + this.slowAppendThreshold = conf.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, + TxConstants.TransactionLog.DEFAULT_SLOW_APPEND_THRESHOLD); } /** Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they also call {@link HDFSTransactionLog#close()} . @throws java.io.IOException If an error is encountered initializing the file writer. */ public synchronized void init() throws IOException { + private synchronized void init() throws IOException { End diff – Seems like the javadoc is actually wrong. No "client" is calling this method and the contract should be whoever construct this class should call "close"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138690276

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -165,26 +203,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          tmpWriter = writer;

          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null) {
            + entryCount = currentPending.length;
            + startTimerIfNeeded(tmpWriter, entryCount);
            + tmpWriter.commitMarker(entryCount);
            + for (Entry e : currentPending) { + tmpWriter.append(e); + latestSeq = Math.max(latestSeq, e.getKey().get()); + }

            + writtenUpTo.set(latestSeq);

              • End diff –

          Sounds good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138690276 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -165,26 +203,36 @@ private void sync() throws IOException { // prevent writer being dereferenced tmpWriter = writer; List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(tmpWriter, entryCount); + tmpWriter.commitMarker(entryCount); + for (Entry e : currentPending) { + tmpWriter.append(e); + latestSeq = Math.max(latestSeq, e.getKey().get()); + } + writtenUpTo.set(latestSeq); End diff – Sounds good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on the issue:

          https://github.com/apache/incubator-tephra/pull/53

          Thanks for t he review, I agree with everything but your last comment. I have pushed another commit to address these comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on the issue: https://github.com/apache/incubator-tephra/pull/53 Thanks for t he review, I agree with everything but your last comment. I have pushed another commit to address these comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138688512

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -165,26 +203,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          tmpWriter = writer;

          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null) {
            + entryCount = currentPending.length;
            + startTimerIfNeeded(tmpWriter, entryCount);
            + tmpWriter.commitMarker(entryCount);
            + for (Entry e : currentPending) { + tmpWriter.append(e); + latestSeq = Math.max(latestSeq, e.getKey().get()); + }

            + writtenUpTo.set(latestSeq);

              • End diff –

          I was thinking about that. However, it would add latency. In the following case: Suppose:

          • thread 1 writes up to seq id 5
          • thread 2 writes up to seq id 10
          • thread 2 syncs and sets syncedUpto to 10
          • thread 3 writes up to seq id 15
          • now thread 1 checks whether it needs to sync. Because its latestSeq is 5, it can return. But if it compares with writtenUpto, then it would try to sync.

          That would add latency to thread 1 while thread 3 must wait anyway. That's why I introduced the writtenUpto

          By the way, the existing code had syncedUpto wrong. It was always set to latestSeq of the thread that syncs. But it might have synced more than its own entries. For example, in the scenario above, if thread 1 syncs instead of thread 2, then it has synced everything up to 10, but it would set syncedUpto to 5. With the consequence that thread would sync again, needlessly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138688512 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -165,26 +203,36 @@ private void sync() throws IOException { // prevent writer being dereferenced tmpWriter = writer; List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(tmpWriter, entryCount); + tmpWriter.commitMarker(entryCount); + for (Entry e : currentPending) { + tmpWriter.append(e); + latestSeq = Math.max(latestSeq, e.getKey().get()); + } + writtenUpTo.set(latestSeq); End diff – I was thinking about that. However, it would add latency. In the following case: Suppose: thread 1 writes up to seq id 5 thread 2 writes up to seq id 10 thread 2 syncs and sets syncedUpto to 10 thread 3 writes up to seq id 15 now thread 1 checks whether it needs to sync. Because its latestSeq is 5, it can return. But if it compares with writtenUpto, then it would try to sync. That would add latency to thread 1 while thread 3 must wait anyway. That's why I introduced the writtenUpto By the way, the existing code had syncedUpto wrong. It was always set to latestSeq of the thread that syncs. But it might have synced more than its own entries. For example, in the scenario above, if thread 1 syncs instead of thread 2, then it has synced everything up to 10, but it would set syncedUpto to 5. With the consequence that thread would sync again, needlessly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138686433

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();
          — End diff –

          good suggestion, done

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138686433 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); — End diff – good suggestion, done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138686057

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          — End diff –

          yes it is only called within synchronized {}

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138686057 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); — End diff – yes it is only called within synchronized {}
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138684944

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -165,26 +203,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          tmpWriter = writer;

          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null) {
            + entryCount = currentPending.length;
            + startTimerIfNeeded(tmpWriter, entryCount);
            + tmpWriter.commitMarker(entryCount);
            + for (Entry e : currentPending) {
            + tmpWriter.append(e);
            + latestSeq = Math.max(latestSeq, e.getKey().get());
              • End diff –

          yes, actually that has always been the case, even before these changes...

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138684944 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -165,26 +203,36 @@ private void sync() throws IOException { // prevent writer being dereferenced tmpWriter = writer; List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(tmpWriter, entryCount); + tmpWriter.commitMarker(entryCount); + for (Entry e : currentPending) { + tmpWriter.append(e); + latestSeq = Math.max(latestSeq, e.getKey().get()); End diff – yes, actually that has always been the case, even before these changes...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138683102

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();

          • private List<Entry> pendingWrites = Lists.newLinkedList();
            + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
            private TransactionLogWriter writer;
          • public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
            + private final AtomicLong positionBeforeAppend = new AtomicLong(0);
            + private final AtomicInteger countSinceLastSync = new AtomicInteger(0);
              • End diff –

          yes

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138683102 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); private List<Entry> pendingWrites = Lists.newLinkedList(); + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>(); private TransactionLogWriter writer; public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { + private final AtomicLong positionBeforeAppend = new AtomicLong(0); + private final AtomicInteger countSinceLastSync = new AtomicInteger(0); End diff – yes
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138683090

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();

          • private List<Entry> pendingWrites = Lists.newLinkedList();
            + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
            private TransactionLogWriter writer;
          • public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
            + private final AtomicLong positionBeforeAppend = new AtomicLong(0);
              • End diff –

          oh yes, after the refactoring in the second commit it does not need to be atomic

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138683090 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); private List<Entry> pendingWrites = Lists.newLinkedList(); + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>(); private TransactionLogWriter writer; public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { + private final AtomicLong positionBeforeAppend = new AtomicLong(0); End diff – oh yes, after the refactoring in the second commit it does not need to be atomic
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138682945

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -211,26 +212,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          — End diff –

          That's what i thought... removing it now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138682945 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -211,26 +212,36 @@ private void sync() throws IOException { // prevent writer being dereferenced — End diff – That's what i thought... removing it now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on the issue:

          https://github.com/apache/incubator-tephra/pull/53

          I think the change is logically correct, just couple minor comments about simplifying the field types.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on the issue: https://github.com/apache/incubator-tephra/pull/53 I think the change is logically correct, just couple minor comments about simplifying the field types.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138565633

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();
          — End diff –

          or a `volatile long` also work since we only use `get` and `set`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138565633 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); — End diff – or a `volatile long` also work since we only use `get` and `set`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138565447

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -165,26 +203,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          tmpWriter = writer;

          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null) {
            + entryCount = currentPending.length;
            + startTimerIfNeeded(tmpWriter, entryCount);
            + tmpWriter.commitMarker(entryCount);
            + for (Entry e : currentPending) { + tmpWriter.append(e); + latestSeq = Math.max(latestSeq, e.getKey().get()); + }

            + writtenUpTo.set(latestSeq);

              • End diff –

          We actually don't need this. Just have the `latestSeq` be a `volatile long` field, and have it updated here to the latest seq number in the current entry array. Then in the "sync" synchronize block below, the value can be set into `syncedUpTo`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138565447 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -165,26 +203,36 @@ private void sync() throws IOException { // prevent writer being dereferenced tmpWriter = writer; List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(tmpWriter, entryCount); + tmpWriter.commitMarker(entryCount); + for (Entry e : currentPending) { + tmpWriter.append(e); + latestSeq = Math.max(latestSeq, e.getKey().get()); + } + writtenUpTo.set(latestSeq); End diff – We actually don't need this. Just have the `latestSeq` be a `volatile long` field, and have it updated here to the latest seq number in the current entry array. Then in the "sync" synchronize block below, the value can be set into `syncedUpTo`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138564366

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();
          — End diff –

          but, please make it `final`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138564366 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); — End diff – but, please make it `final`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138564293

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();
          — End diff –

          Actually not. Just notice it is used outside of writer sync block.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138564293 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); — End diff – Actually not. Just notice it is used outside of writer sync block.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138564128

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();

          • private List<Entry> pendingWrites = Lists.newLinkedList();
            + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
              • End diff –

          final

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138564128 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); private List<Entry> pendingWrites = Lists.newLinkedList(); + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>(); End diff – final
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138564073

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();
          — End diff –

          And this can be `long`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138564073 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); — End diff – And this can be `long`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138563780

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          — End diff –

          I think this can also be just a `long`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138563780 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); — End diff – I think this can also be just a `long`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138563420

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -165,26 +203,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          tmpWriter = writer;

          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
              • End diff –

          hum... nevermind, seems like doesn't gain much.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138563420 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -165,26 +203,36 @@ private void sync() throws IOException { // prevent writer being dereferenced tmpWriter = writer; List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); End diff – hum... nevermind, seems like doesn't gain much.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138562706

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -165,26 +203,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          tmpWriter = writer;

          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
              • End diff –

          Shall we move this statement out of the synchronize block and call it before the sync block and hold on to the `logSequence` object lock instead?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138562706 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -165,26 +203,36 @@ private void sync() throws IOException { // prevent writer being dereferenced tmpWriter = writer; List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); End diff – Shall we move this statement out of the synchronize block and call it before the sync block and hold on to the `logSequence` object lock instead?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138562104

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -165,26 +203,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          tmpWriter = writer;

          • List<Entry> currentPending = getPendingWrites();
          • if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - }

            -

          • // write out all accumulated entries to log.
          • for (Entry e : currentPending) {
          • tmpWriter.append(e);
          • entryCount++;
          • latestSeq = Math.max(latestSeq, e.getKey().get());
            + Entry[] currentPending = getPendingWrites();
            + if (currentPending != null) {
            + entryCount = currentPending.length;
            + startTimerIfNeeded(tmpWriter, entryCount);
            + tmpWriter.commitMarker(entryCount);
            + for (Entry e : currentPending) {
            + tmpWriter.append(e);
            + latestSeq = Math.max(latestSeq, e.getKey().get());
              • End diff –

          Is the `Math.max` actually needed? The ordering of the sequence number is the always increasing and is maintained in the queue (hence the array here). I think we just need to have `latestSeq = currentPending[currentPending.length - 1].getKey().get()` after the loop, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138562104 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -165,26 +203,36 @@ private void sync() throws IOException { // prevent writer being dereferenced tmpWriter = writer; List<Entry> currentPending = getPendingWrites(); if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - // write out all accumulated entries to log. for (Entry e : currentPending) { tmpWriter.append(e); entryCount++; latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(tmpWriter, entryCount); + tmpWriter.commitMarker(entryCount); + for (Entry e : currentPending) { + tmpWriter.append(e); + latestSeq = Math.max(latestSeq, e.getKey().get()); End diff – Is the `Math.max` actually needed? The ordering of the sequence number is the always increasing and is maintained in the queue (hence the array here). I think we just need to have `latestSeq = currentPending [currentPending.length - 1] .getKey().get()` after the loop, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138561233

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -211,26 +212,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          — End diff –

          I don't think it is needed since writer only get assigned once in the `init` method. It was there since the beginning without any reason.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138561233 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -211,26 +212,36 @@ private void sync() throws IOException { // prevent writer being dereferenced — End diff – I don't think it is needed since writer only get assigned once in the `init` method. It was there since the beginning without any reason.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138559950

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();

          • private List<Entry> pendingWrites = Lists.newLinkedList();
            + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
            private TransactionLogWriter writer;
          • public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
            + private final AtomicLong positionBeforeAppend = new AtomicLong(0);
            + private final AtomicInteger countSinceLastSync = new AtomicInteger(0);
              • End diff –

          And this can be just an `int` as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138559950 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); private List<Entry> pendingWrites = Lists.newLinkedList(); + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>(); private TransactionLogWriter writer; public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { + private final AtomicLong positionBeforeAppend = new AtomicLong(0); + private final AtomicInteger countSinceLastSync = new AtomicInteger(0); End diff – And this can be just an `int` as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138559674

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -48,21 +52,30 @@
          protected long timestamp;
          private volatile boolean initialized;
          private volatile boolean closed;
          + private AtomicLong writtenUpTo = new AtomicLong();
          private AtomicLong syncedUpTo = new AtomicLong();

          • private List<Entry> pendingWrites = Lists.newLinkedList();
            + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
            private TransactionLogWriter writer;
          • public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
            + private final AtomicLong positionBeforeAppend = new AtomicLong(0);
              • End diff –

          This can be just a `long` field, right? Since it is only used by the start/stop timer method, which only gets called inside the writer sync block.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138559674 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -48,21 +52,30 @@ protected long timestamp; private volatile boolean initialized; private volatile boolean closed; + private AtomicLong writtenUpTo = new AtomicLong(); private AtomicLong syncedUpTo = new AtomicLong(); private List<Entry> pendingWrites = Lists.newLinkedList(); + private Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>(); private TransactionLogWriter writer; public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { + private final AtomicLong positionBeforeAppend = new AtomicLong(0); End diff – This can be just a `long` field, right? Since it is only used by the start/stop timer method, which only gets called inside the writer sync block.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user anew commented on a diff in the pull request:

          https://github.com/apache/incubator-tephra/pull/53#discussion_r138534049

          — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java —
          @@ -211,26 +212,36 @@ private void sync() throws IOException {
          // prevent writer being dereferenced
          — End diff –

          I am not sure why this is needed. As long as this method is running, there is a stack frame (= a GC root) that has a reference to this, and hence to its members. Anybody know why this is here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138534049 — Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java — @@ -211,26 +212,36 @@ private void sync() throws IOException { // prevent writer being dereferenced — End diff – I am not sure why this is needed. As long as this method is running, there is a stack frame (= a GC root) that has a reference to this, and hence to its members. Anybody know why this is here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user anew opened a pull request:

          https://github.com/apache/incubator-tephra/pull/53

          (TEPHRA-243) Improve logging for slow log append

          • adds more detail to the log message (number of entries, bytes written)
          • ensures only one thread logs the message concurrently, by moving it into the method actually writes to storage. Previously all threads logged it even if they did not write themselves
          • changed ThriftTransactionSystemTest to use LocalTransactionLog (otherwise it does not get tested at all)
          • fixed a few unrelated compiler warning encountered on the way

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/anew/incubator-tephra tephra-243

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/incubator-tephra/pull/53.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #53


          commit 60085beef1c5b8cef90b1b14751ae7aa78b6d909
          Author: anew <anew@apache.org>
          Date: 2017-09-09T03:04:38Z

          (TEPHRA-243) Improve logging for slow log append


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user anew opened a pull request: https://github.com/apache/incubator-tephra/pull/53 ( TEPHRA-243 ) Improve logging for slow log append adds more detail to the log message (number of entries, bytes written) ensures only one thread logs the message concurrently, by moving it into the method actually writes to storage. Previously all threads logged it even if they did not write themselves changed ThriftTransactionSystemTest to use LocalTransactionLog (otherwise it does not get tested at all) fixed a few unrelated compiler warning encountered on the way You can merge this pull request into a Git repository by running: $ git pull https://github.com/anew/incubator-tephra tephra-243 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-tephra/pull/53.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #53 commit 60085beef1c5b8cef90b1b14751ae7aa78b6d909 Author: anew <anew@apache.org> Date: 2017-09-09T03:04:38Z ( TEPHRA-243 ) Improve logging for slow log append

            People

            • Assignee:
              anew Andreas Neumann
              Reporter:
              anew Andreas Neumann
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development