Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3266

the parameters of fsyncPerTransaction are not working

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.8.0
    • None
    • File Channel
    • None

    Description

      every take or put transaction has sync disk?

      this.fsyncPerTransaction = fsyncPerTransaction;
            this.fsyncInterval = fsyncInterval;
            if (!fsyncPerTransaction) {
              LOG.info("Sync interval = " + fsyncInterval);
              syncExecutor = Executors.newSingleThreadScheduledExecutor();
              syncExecutor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                  try {
                    sync();
                  } catch (Throwable ex) {
                    LOG.error("Data file, " + getFile().toString() + " could not " +
                        "be synced to disk due to an error.", ex);
                  }
                }
              }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);
            } else {
              syncExecutor = null;
            }
      
      private void commit(long transactionID, short type) throws IOException {
          Preconditions.checkState(open, "Log is closed");
          Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type);
          ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
          int logFileIndex = nextLogWriter(transactionID);
          long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
          long requiredSpace = minimumRequiredSpace + buffer.limit();
          if (usableSpace <= requiredSpace) {
            throw new IOException("Usable space exhausted, only " + usableSpace +
                " bytes remaining, required " + requiredSpace + " bytes");
          }
          boolean error = true;
          try {
            try {
              LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
              // If multiple transactions are committing at the same time,
              // this ensures that the number of actual fsyncs is small and a
              // number of them are grouped together into one.
              logFileWriter.commit(buffer);
              logFileWriter.sync();
              error = false;
            } catch (LogFileRetryableIOException e) {
              if (!open) {
                throw e;
              }
              roll(logFileIndex, buffer);
              LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
              logFileWriter.commit(buffer);
              logFileWriter.sync();
              error = false;
            }
          } finally {
            if (error && open) {
              roll(logFileIndex);
            }
          }
        }
      
       logFileWriter.commit(buffer);
       logFileWriter.sync();
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            zhaiyuyong scott.zhai
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: