Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3266

the parameters of fsyncPerTransaction are not working

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.8.0
    • Fix Version/s: None
    • Component/s: File Channel
    • Labels:
      None

      Description

      every take or put transaction has sync disk?

      this.fsyncPerTransaction = fsyncPerTransaction;
            this.fsyncInterval = fsyncInterval;
            if (!fsyncPerTransaction) {
              LOG.info("Sync interval = " + fsyncInterval);
              syncExecutor = Executors.newSingleThreadScheduledExecutor();
              syncExecutor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                  try {
                    sync();
                  } catch (Throwable ex) {
                    LOG.error("Data file, " + getFile().toString() + " could not " +
                        "be synced to disk due to an error.", ex);
                  }
                }
              }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);
            } else {
              syncExecutor = null;
            }
      
      private void commit(long transactionID, short type) throws IOException {
          Preconditions.checkState(open, "Log is closed");
          Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type);
          ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
          int logFileIndex = nextLogWriter(transactionID);
          long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
          long requiredSpace = minimumRequiredSpace + buffer.limit();
          if (usableSpace <= requiredSpace) {
            throw new IOException("Usable space exhausted, only " + usableSpace +
                " bytes remaining, required " + requiredSpace + " bytes");
          }
          boolean error = true;
          try {
            try {
              LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
              // If multiple transactions are committing at the same time,
              // this ensures that the number of actual fsyncs is small and a
              // number of them are grouped together into one.
              logFileWriter.commit(buffer);
              logFileWriter.sync();
              error = false;
            } catch (LogFileRetryableIOException e) {
              if (!open) {
                throw e;
              }
              roll(logFileIndex, buffer);
              LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
              logFileWriter.commit(buffer);
              logFileWriter.sync();
              error = false;
            }
          } finally {
            if (error && open) {
              roll(logFileIndex);
            }
          }
        }
      
       logFileWriter.commit(buffer);
       logFileWriter.sync();
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              zhaiyuyong scott.zhai
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: