Index: core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (revision 1702164) +++ core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (working copy) @@ -321,10 +321,10 @@ private MessageManager messenger; private FileSystem fs; private int checkPointInterval; - volatile private long lastCheckPointStep; - volatile private boolean checkpointState; - volatile private FSDataOutputStream checkpointStream; - volatile private long checkpointMessageCount; + private long lastCheckPointStep; + private boolean checkpointState; + private FSDataOutputStream checkpointStream; + private long checkpointMessageCount; public void initialize(BSPJob job, @SuppressWarnings("rawtypes") BSPPeer bspPeer, @@ -343,16 +343,17 @@ Constants.DEFAULT_CHECKPOINT_INTERVAL); this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, Constants.DEFAULT_CHECKPOINT_INTERVAL); + synchronized (this) { + this.checkpointState = conf.getBoolean(Constants.CHECKPOINT_ENABLED, + false); - this.checkpointState = conf.getBoolean(Constants.CHECKPOINT_ENABLED, - false); - - if (superstep > 0) { - this.lastCheckPointStep = this.superstep; - } else { - this.lastCheckPointStep = 1; + if (superstep > 0) { + this.lastCheckPointStep = this.superstep; + } else { + this.lastCheckPointStep = 1; + } + this.checkpointMessageCount = 0L; } - this.checkpointMessageCount = 0L; } private String checkpointPath(long step) { Index: core/src/main/java/org/apache/hama/ipc/Server.java =================================================================== --- core/src/main/java/org/apache/hama/ipc/Server.java (revision 1702164) +++ core/src/main/java/org/apache/hama/ipc/Server.java (working copy) @@ -56,6 +56,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -875,7 +876,7 @@ private ByteBuffer data; private ByteBuffer dataLengthBuffer; private LinkedList responseQueue; - private volatile int rpcCount = 0; // number of outstanding rpcs + private AtomicInteger rpcCount = new AtomicInteger(); // number of outstanding rpcs private long lastContact; private int dataLength; private Socket socket; @@ -949,17 +950,17 @@ /* Return true if the connection has no outstanding rpc */ private boolean isIdle() { - return rpcCount == 0; + return rpcCount.get() == 0; } /* Decrement the outstanding RPC count */ private void decRpcCount() { - rpcCount--; + rpcCount.decrementAndGet(); } /* Increment the outstanding RPC count */ private void incRpcCount() { - rpcCount++; + rpcCount.incrementAndGet(); } private boolean timedOut(long currentTime) {