import java.util.ArrayList;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.SequenceFile.CompressionType;


public class TestWriteConcurrency {
  public static final int NUM_WRITES = 1000000;
  public static final int NUM_TESTS=10;

  public static void main(String[] args) throws Exception {
    int lazysize = Integer.parseInt(args[0]);
    System.out.println("lazy\trun\tthr\trate");
    for (int i = 0; i < NUM_TESTS; i++) {
      doTest(lazysize, i);
    }
  }

  public static void doTest(int lazysize, int run) throws Exception {
    final Configuration c = new Configuration();
    final FileSystem fs = FileSystem.get(c);
		

    for (int num_thr=1; num_thr <= 8; num_thr++) {
      System.gc();
      ArrayList<Thread> thrs = new ArrayList<Thread>(num_thr);
      final AtomicLong numMs = new AtomicLong();
      for (int i = 0; i < num_thr; i++) {
        final int ii = i;
        final int num_thr2 = num_thr;

        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
					
              try {
                Writer f = SequenceFile.createWriter(fs, c, new Path("/dev/shm/toddtest/" + ii ), ByteWritable.class, ByteWritable.class, CompressionType.NONE);
                ByteWritable v = new ByteWritable();
						
                long time = System.currentTimeMillis();
                for (int i = 0; i < NUM_WRITES; i ++)
                  f.append(v, v);
                f.close();
                long end = System.currentTimeMillis();
		numMs.addAndGet(end - time);
              } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
              }
					
            }
          });
        t.start();
        thrs.add(t);
      }
      for (Thread t : thrs) {
        t.join();
      }
      double avg_num_secs = numMs.get() / 1000.0 / (double)num_thr;
      System.out.printf("%d\t%d\t%d\t%.0f\n", lazysize, run, num_thr, NUM_WRITES / avg_num_secs);
    }
  }
}
