e: c.entrySet()) {
String key = e.getKey().toString();
@@ -860,15 +877,7 @@
HRegion region = this.onlineRegions.get(mapKey);
if (region == null) {
try {
- region = new HRegion(HTableDescriptor.getTableDir(rootDir,
- regionInfo.getTableDesc().getName()),
- this.log, this.fs, conf, regionInfo, null, this.cacheFlusher,
- new Progressable() {
- public void progress() {
- addProcessingMessage(regionInfo);
- }
- }
- );
+ region = instantiateRegion(regionInfo);
// Startup a compaction early if one is needed.
this.compactSplitThread.compactionRequested(region);
} catch (IOException e) {
@@ -891,6 +900,17 @@
reportOpen(regionInfo);
}
+ protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+ throws IOException {
+ return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
+ .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null,
+ this.cacheFlusher, new Progressable() {
+ public void progress() {
+ addProcessingMessage(regionInfo);
+ }
+ });
+ }
+
/*
* Add a MSG_REPORT_PROCESS_OPEN to the outbound queue.
* This method is called while region is in the queue of regions to process
@@ -1172,16 +1192,9 @@
requestCount.incrementAndGet();
try {
HRegion r = getRegion(regionName);
- long scannerId = -1L;
InternalScanner s =
r.getScanner(cols, firstRow, timestamp, filter);
- scannerId = rand.nextLong();
- String scannerName = String.valueOf(scannerId);
- synchronized(scanners) {
- scanners.put(scannerName, s);
- }
- this.leases.
- createLease(scannerName, new ScannerListener(scannerName));
+ long scannerId = addScanner(s);
return scannerId;
} catch (IOException e) {
LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
@@ -1191,6 +1204,18 @@
}
}
+ protected long addScanner(InternalScanner s) throws LeaseStillHeldException {
+ long scannerId = -1L;
+ scannerId = rand.nextLong();
+ String scannerName = String.valueOf(scannerId);
+ synchronized(scanners) {
+ scanners.put(scannerName, s);
+ }
+ this.leases.
+ createLease(scannerName, new ScannerListener(scannerName));
+ return scannerId;
+ }
+
/** {@inheritDoc} */
public void close(final long scannerId) throws IOException {
checkOpen();
@@ -1409,7 +1434,7 @@
*
* @throws IOException
*/
- private void checkOpen() throws IOException {
+ protected void checkOpen() throws IOException {
if (this.stopRequested.get() || this.abortRequested) {
throw new IOException("Server not running");
}
@@ -1567,6 +1592,34 @@
* @param args
*/
public static void main(String [] args) {
- doMain(args, HRegionServer.class);
+ Configuration conf = new HBaseConfiguration();
+ @SuppressWarnings("unchecked")
+ Class extends HRegionServer> regionServerClass = (Class extends HRegionServer>) conf
+ .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+ doMain(args, regionServerClass);
+ }
+
+ /**
+ * Get the leases.
+ * @return Return the leases.
+ */
+ protected Leases getLeases() {
+ return leases;
+ }
+
+ /**
+ * Get the rootDir.
+ * @return Return the rootDir.
+ */
+ protected Path getRootDir() {
+ return rootDir;
+ }
+
+ /**
+ * Get the fs.
+ * @return Return the fs.
+ */
+ protected FileSystem getFileSystem() {
+ return fs;
}
}
Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java
===================================================================
--- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (revision 0)
+++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (revision 0)
@@ -0,0 +1,39 @@
+/*
+ * $Id$
+ * Created on Jun 6, 2008
+ *
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Cleans up committed transactions when they are no longer needed to verify
+ * pending transactions.
+ */
+class CleanOldTransactionsChore extends Chore {
+
+ private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
+ private static final int DEFAULT_SLEEP = 60 * 1000;
+
+ private final TransactionalRegionServer regionServer;
+
+ public CleanOldTransactionsChore(
+ final TransactionalRegionServer regionServer,
+ final AtomicBoolean stopRequest) {
+ super(regionServer.getConfiguration().getInt(SLEEP_CONF, DEFAULT_SLEEP),
+ stopRequest);
+ this.regionServer = regionServer;
+ }
+
+ @Override
+ protected void chore() {
+ for (HRegion region : regionServer.getOnlineRegions()) {
+ ((TransactionalRegion) region).removeUnNeededCommitedTransactions();
+ }
+ }
+
+}
Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
===================================================================
--- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 0)
+++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 0)
@@ -0,0 +1,510 @@
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.LeaseException;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Regionserver which provides transactional support for atomic transactions.
+ * This is achieved with optimistic concurrency control (see
+ * http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf). We keep track
+ * read and write sets for each transaction, and hold off on processing the
+ * writes. To decide to commit a transaction we check its read sets with all
+ * transactions that have committed while it was running for overlaps.
+ *
+ * Because transactions can span multiple regions, all regions must agree to
+ * commit a transactions. The client side of this commit protocol is encoded in
+ * org.apache.hadoop.hbase.client.transactional.TransactionManger
+ *
+ * In the event of an failure of the client mid-commit, (after we voted yes), we
+ * will have to consult the transaction log to determine the final decision of
+ * the transaction. This is not yet implemented.
+ */
+class TransactionalRegion extends HRegion {
+
+ private static final String LEASE_TIME = "hbase.transaction.leaseTime";
+ private static final int DEFAULT_LEASE_TIME = 60 * 1000;
+ private static final int LEASE_CHECK_FREQUENCY = 1000;
+
+ private static final Log LOG = LogFactory.getLog(TransactionalRegion.class);
+
+ // Collection of active transactions (PENDING) keyed by id.
+ private Map transactionsById = new HashMap();
+
+ // Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by
+ // their sequence number
+ private SortedMap commitedTransactionsBySequenceNumber = Collections
+ .synchronizedSortedMap(new TreeMap());
+
+ // Collection of transactions that are COMMIT_PENDING
+ private Set commitPendingTransactions = Collections
+ .synchronizedSet(new HashSet());
+
+ private final Leases transactionLeases;
+ private AtomicInteger nextSequenceId = new AtomicInteger(0);
+ private Object commitCheckLock = new Object();
+
+ public TransactionalRegion(final Path basedir, final HLog log,
+ final FileSystem fs, final HBaseConfiguration conf,
+ final HRegionInfo regionInfo, final Path initialFiles,
+ final FlushRequester flushListener, final Progressable reporter)
+ throws IOException {
+ super(basedir, log, fs, conf, regionInfo, initialFiles, flushListener,
+ reporter);
+ transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME),
+ LEASE_CHECK_FREQUENCY);
+ }
+
+ public void beginTransaction(final long transactionId) throws IOException {
+ String key = String.valueOf(transactionId);
+ if (transactionsById.get(key) != null) {
+ TransactionState alias = getTransactionState(transactionId);
+ if (alias != null) {
+ alias.setStatus(Status.ABORTED);
+ retireTransaction(alias);
+ }
+ throw new IOException("Already exiting transaction id: " + key);
+ }
+
+ TransactionState state = new TransactionState(transactionId);
+
+ // Order is important here
+ for (TransactionState commitPending : commitPendingTransactions) {
+ state.addTransactionToCheck(commitPending);
+ }
+ state.setStartSequenceNumber(nextSequenceId.get());
+
+ transactionsById.put(String.valueOf(key), state);
+ try {
+ transactionLeases.createLease(key, new TransactionLeaseListener(key));
+ } catch (LeaseStillHeldException e) {
+ throw new RuntimeException(e);
+ }
+ LOG.debug("Begining transaction " + key + " in region "
+ + super.getRegionInfo().getRegionNameAsString());
+ }
+
+ /**
+ * Fetch a single data item.
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @return column value
+ * @throws IOException
+ */
+ public Cell get(final long transactionId, final byte[] row,
+ final byte[] column) throws IOException {
+ Cell[] results = get(transactionId, row, column, 1);
+ return (results == null || results.length == 0) ? null : results[0];
+ }
+
+ /**
+ * Fetch multiple versions of a single data item
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @param numVersions
+ * @return array of values one element per version
+ * @throws IOException
+ */
+ public Cell[] get(final long transactionId, final byte[] row,
+ final byte[] column, final int numVersions) throws IOException {
+ return get(transactionId, row, column, Long.MAX_VALUE, numVersions);
+ }
+
+ /**
+ * Fetch multiple versions of a single data item, with timestamp.
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @param timestamp
+ * @param numVersions
+ * @return array of values one element per version that matches the timestamp
+ * @throws IOException
+ */
+ public Cell[] get(final long transactionId, final byte[] row,
+ final byte[] column, final long timestamp, final int numVersions)
+ throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+ state.addRead(row);
+
+ return get(row, column, timestamp, numVersions);
+ }
+
+ /**
+ * Fetch all the columns for the indicated row at a specified timestamp.
+ * Returns a TreeMap that maps column names to values.
+ *
+ * @param transactionId
+ * @param row
+ * @param columns Array of columns you'd like to retrieve. When null, get all.
+ * @param ts
+ * @return Map values
+ * @throws IOException
+ */
+ public Map getFull(final long transactionId, final byte[] row,
+ final Set columns, final long ts) throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+ state.addRead(row);
+ return getFull(row, columns, ts);
+ }
+
+ /**
+ * Return an iterator that scans over the HRegion, returning the indicated
+ * columns for only the rows that match the data filter. This Iterator must be
+ * closed by the caller.
+ *
+ * @param transactionId
+ * @param cols columns to scan. If column name is a column family, all columns
+ * of the specified column family are returned. Its also possible to pass a
+ * regex in the column qualifier. A column qualifier is judged to be a regex
+ * if it contains at least one of the following characters:
+ * \+|^&*$[]]}{)(.
+ * @param firstRow row which is the starting point of the scan
+ * @param timestamp only return rows whose timestamp is <= this value
+ * @param filter row filter
+ * @return InternalScanner
+ * @throws IOException
+ */
+ public InternalScanner getScanner(final long transactionId,
+ final byte[][] cols, final byte[] firstRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
+ timestamp, filter));
+ }
+
+ /**
+ * Add a write to the transaction. Does not get applied until commit process.
+ *
+ * @param b
+ * @throws IOException
+ */
+ public void batchUpdate(final long transactionId, final BatchUpdate b)
+ throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+ state.addWrite(b);
+ // FIXME write to the WAL
+ }
+
+ public boolean commitRequest(final long transactionId) throws IOException {
+ synchronized (commitCheckLock) {
+ TransactionState state = getTransactionState(transactionId);
+ if (state == null) {
+ return false;
+ }
+
+ if (hasConflict(state)) {
+ state.setStatus(Status.ABORTED);
+ retireTransaction(state);
+ return false;
+ }
+
+ // No conflicts, we can commit.
+ LOG.trace("No conflicts for transaction " + transactionId
+ + " found in region " + super.getRegionInfo().getRegionNameAsString()
+ + ". Voting for commit");
+ state.setStatus(Status.COMMIT_PENDING);
+
+ // If there are writes we must keep record off the transaction
+ if (state.getWriteSet().size() > 0) {
+ // Order is important
+ commitPendingTransactions.add(state);
+ state.setSequenceNumber(nextSequenceId.getAndIncrement());
+ commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(),
+ state);
+ }
+
+ return true;
+ }
+ }
+
+ private boolean hasConflict(final TransactionState state) {
+ // Check against transactions that committed while we were running
+ for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) {
+ TransactionState other = commitedTransactionsBySequenceNumber.get(i);
+ if (other == null) {
+ continue;
+ }
+ if (hasConflict(state, other)) {
+ return true;
+
+ }
+ }
+
+ // Check against transactions that were COMMIT_PENDING when we started
+ for (TransactionState other : state.getTransactionsToCheck()) {
+ if (other.getStatus().equals(TransactionState.Status.ABORTED)) {
+ continue;
+ }
+ if (hasConflict(state, other)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean hasConflict(final TransactionState checkTransaction,
+ final TransactionState checkAgainst) {
+ for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) {
+ if (checkTransaction.getReadSet().contains(otherUpdate.getRow())) {
+ LOG.trace("Transaction " + checkTransaction.toString()
+ + " conflicts with " + checkAgainst.toString());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Commit the transaction.
+ *
+ * @param transactionId
+ * @return
+ * @throws IOException
+ */
+ public void commit(final long transactionId) throws IOException {
+ TransactionState state;
+ try {
+ state = getTransactionState(transactionId);
+ } catch (UnknownTransactionException e) {
+ LOG.fatal("Asked to commit unknown transaction: " + transactionId
+ + " in region " + super.getRegionInfo().getRegionNameAsString());
+ // FIXME Write to the transaction log that this transaction was corrupted
+ throw e;
+ }
+
+ if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
+ LOG.fatal("Asked to commit a non pending transaction");
+ // FIXME Write to the transaction log that this transaction was corrupted
+ throw new IOException("commit failure");
+ }
+
+ commit(state);
+ }
+
+ /**
+ * Commit the transaction.
+ *
+ * @param transactionId
+ * @return
+ */
+ public void abort(final long transactionId) {
+ TransactionState state;
+ try {
+ state = getTransactionState(transactionId);
+ } catch (UnknownTransactionException e) {
+ LOG.error("Asked to abort unknown transaction: " + transactionId);
+ return;
+ }
+
+ state.setStatus(Status.ABORTED);
+
+ // FIXME write abort message to the WAL (if there are writes)
+
+ // Following removes needed if we have voted
+ if (state.getSequenceNumber() != null) {
+ commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber());
+ }
+ commitPendingTransactions.remove(state);
+
+ retireTransaction(state);
+ }
+
+ private void commit(final TransactionState state) throws IOException {
+
+ LOG.debug("Commiting transaction: " + state.toString() + " to "
+ + super.getRegionInfo().getRegionNameAsString());
+
+ // FIXME write commit message to WAL if there are writes
+
+ for (BatchUpdate update : state.getWriteSet()) {
+ super.batchUpdate(update);
+ // FIXME. These updates do not need to be WALed. They were written as
+ // they were received
+ }
+
+ state.setStatus(Status.COMMITED);
+ if (state.getWriteSet().size() > 0
+ && !commitPendingTransactions.remove(state)) {
+ LOG
+ .fatal("Commiting a non-query transaction that is not in commitPendingTransactions");
+ throw new IOException("commit failure"); // FIXME, how to handle?
+ }
+ retireTransaction(state);
+ }
+
+ // Cancel leases, and removed from lease lookup. This transaction may still
+ // live in commitedTransactionsBySequenceNumber and commitPendingTransactions
+ private void retireTransaction(final TransactionState state) {
+ String key = String.valueOf(state.getTransactionId());
+ try {
+ transactionLeases.cancelLease(key);
+ } catch (LeaseException e) {
+ // Ignore
+ }
+
+ transactionsById.remove(key);
+ }
+
+ private TransactionState getTransactionState(final long transactionId)
+ throws UnknownTransactionException {
+ String key = String.valueOf(transactionId);
+ TransactionState state = null;
+
+ state = transactionsById.get(key);
+
+ if (state == null) {
+ LOG.trace("Unknown transaction: " + key);
+ throw new UnknownTransactionException(key);
+ }
+
+ try {
+ transactionLeases.renewLease(key);
+ } catch (LeaseException e) {
+ throw new RuntimeException(e);
+ }
+
+ return state;
+ }
+
+ /**
+ * Cleanup references to committed transactions that are no longer needed.
+ *
+ */
+ void removeUnNeededCommitedTransactions() {
+ Integer minStartSeqNumber = getMinStartSequenceNumber();
+ if (minStartSeqNumber == null) {
+ minStartSeqNumber = Integer.MAX_VALUE; // Remove all
+ }
+
+ Iterator> it = commitedTransactionsBySequenceNumber
+ .entrySet().iterator();
+
+ int numRemoved = 0;
+ while (it.hasNext()) {
+ if (it.next().getKey() >= minStartSeqNumber) {
+ break;
+ }
+ it.remove();
+ numRemoved++;
+ }
+
+ LOG.debug("Removed " + numRemoved
+ + " commited transactions with sequence lower than "
+ + minStartSeqNumber);
+ }
+
+ private Integer getMinStartSequenceNumber() {
+ Integer min = null;
+ for (TransactionState transactionState : transactionsById.values()) {
+ if (min == null || transactionState.getStartSequenceNumber() < min) {
+ min = transactionState.getStartSequenceNumber();
+ }
+ }
+ return min;
+ }
+
+ // TODO
+ private void resolveTransactionFromLog(final TransactionState s) {
+ throw new RuntimeException("NYI");
+ }
+
+ private class TransactionLeaseListener implements LeaseListener {
+ private final String transactionName;
+
+ TransactionLeaseListener(final String n) {
+ this.transactionName = n;
+ }
+
+ /** {@inheritDoc} */
+ public void leaseExpired() {
+ LOG.info("Transaction " + this.transactionName + " lease expired");
+ TransactionState s = null;
+ synchronized (transactionsById) {
+ s = transactionsById.remove(transactionName);
+ }
+ if (s == null) {
+ LOG.warn("Unknown transaction expired " + this.transactionName);
+ return;
+ }
+
+ switch (s.getStatus()) {
+ case PENDING:
+ s.setStatus(Status.ABORTED); // Other transactions may have a ref
+ break;
+ case COMMIT_PENDING:
+ LOG.info("Transaction " + s.getTransactionId()
+ + " expired in COMMIT_PENDING state");
+ LOG.info("Checking transaction status in transaction log");
+ resolveTransactionFromLog(s);
+ break;
+ default:
+ LOG.warn("Unexpected status on expired lease");
+ }
+ }
+ }
+
+ /** Wrapper which keeps track of rows returned by scanner. */
+ private class ScannerWrapper implements InternalScanner {
+ private long transactionId;
+ private InternalScanner scanner;
+
+ public ScannerWrapper(final long transactionId,
+ final InternalScanner scanner) {
+ this.transactionId = transactionId;
+ this.scanner = scanner;
+ }
+
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ public boolean isMultipleMatchScanner() {
+ return scanner.isMultipleMatchScanner();
+ }
+
+ public boolean isWildcardScanner() {
+ return scanner.isWildcardScanner();
+ }
+
+ public boolean next(final HStoreKey key,
+ final SortedMap results) throws IOException {
+ boolean result = scanner.next(key, results);
+ getTransactionState(transactionId).addRead(key.getRow());
+ return result;
+ }
+ }
+}
Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
===================================================================
--- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 0)
+++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 0)
@@ -0,0 +1,267 @@
+/*
+ * $Id$
+ * Created on Jun 5, 2008
+ *
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * RegionServer with support for transactions. Transactional logic is at the
+ * region level, so we mostly just delegate to the appropriate
+ * TransactionalRegion.
+ */
+public class TransactionalRegionServer extends HRegionServer implements
+ TransactionalRegionInterface {
+ static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class);
+
+ private final CleanOldTransactionsChore cleanOldTransactionsThread;
+
+ public TransactionalRegionServer(final HBaseConfiguration conf)
+ throws IOException {
+ this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+ DEFAULT_REGIONSERVER_ADDRESS)), conf);
+ }
+
+ public TransactionalRegionServer(final HServerAddress address,
+ final HBaseConfiguration conf) throws IOException {
+ super(address, conf);
+ cleanOldTransactionsThread = new CleanOldTransactionsChore(this,
+ super.stopRequested);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getProtocolVersion(final String protocol, final long clientVersion)
+ throws IOException {
+ if (protocol.equals(TransactionalRegionInterface.class.getName())) {
+ return TransactionalRegionInterface.versionID;
+ }
+ return super.getProtocolVersion(protocol, clientVersion);
+ }
+
+ @Override
+ protected void init(final MapWritable c) throws IOException {
+ super.init(c);
+ String n = Thread.currentThread().getName();
+ UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ abort();
+ LOG.fatal("Set stop flag in " + t.getName(), e);
+ }
+ };
+ Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n
+ + ".oldTransactionCleaner", handler);
+
+ }
+
+ @Override
+ protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+ throws IOException {
+ return new TransactionalRegion(HTableDescriptor.getTableDir(super
+ .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+ .getFileSystem(), super.conf, regionInfo, null, super
+ .getFlushRequester(), new Progressable() {
+ public void progress() {
+ addProcessingMessage(regionInfo);
+ }
+ });
+ }
+
+ protected TransactionalRegion getTransactionalRegion(final byte[] regionName)
+ throws NotServingRegionException {
+ return (TransactionalRegion) super.getRegion(regionName);
+ }
+
+ public void abort(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).abort(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void batchUpdate(final long transactionId, final byte[] regionName,
+ final BatchUpdate b) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).batchUpdate(transactionId, b);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void commit(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).commit(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public boolean commitRequest(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).commitRequest(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell[] get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final int numVersions)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column,
+ numVersions);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell[] get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final long timestamp,
+ final int numVersions) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column,
+ timestamp, numVersions);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final long ts) throws IOException {
+ return getRow(transactionId, regionName, row, null, ts);
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns) throws IOException {
+ return getRow(transactionId, regionName, row, columns,
+ HConstants.LATEST_TIMESTAMP);
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns, final long ts)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ // convert the columns array into a set so it's easy to check later.
+ Set columnSet = null;
+ if (columns != null) {
+ columnSet = new TreeSet(Bytes.BYTES_COMPARATOR);
+ columnSet.addAll(Arrays.asList(columns));
+ }
+
+ TransactionalRegion region = getTransactionalRegion(regionName);
+ Map map = region.getFull(transactionId, row, columnSet, ts);
+ HbaseMapWritable result = new HbaseMapWritable();
+ result.putAll(map);
+ return new RowResult(row, result);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+
+ }
+
+ public long openScanner(final long transactionId, final byte[] regionName,
+ final byte[][] cols, final byte[] firstRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ checkOpen();
+ NullPointerException npe = null;
+ if (regionName == null) {
+ npe = new NullPointerException("regionName is null");
+ } else if (cols == null) {
+ npe = new NullPointerException("columns to scan is null");
+ } else if (firstRow == null) {
+ npe = new NullPointerException("firstRow for scanner is null");
+ }
+ if (npe != null) {
+ IOException io = new IOException("Invalid arguments to openScanner");
+ io.initCause(npe);
+ throw io;
+ }
+ super.getRequestCount().incrementAndGet();
+ try {
+ TransactionalRegion r = getTransactionalRegion(regionName);
+ long scannerId = -1L;
+ InternalScanner s = r.getScanner(transactionId, cols, firstRow,
+ timestamp, filter);
+ scannerId = super.addScanner(s);
+ return scannerId;
+ } catch (IOException e) {
+ LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
+ RemoteExceptionHandler.checkIOException(e));
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void beginTransaction(final long transactionId, final byte[] regionName)
+ throws IOException {
+ getTransactionalRegion(regionName).beginTransaction(transactionId);
+ }
+
+}
Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
===================================================================
--- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (revision 0)
+++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (revision 0)
@@ -0,0 +1,159 @@
+/*
+ * $Id$
+ * Created on Jun 4, 2008
+ *
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Holds the state of a transaction.
+ */
+class TransactionState {
+
+ /** Current status. */
+ public enum Status {
+ /** Initial status, still performing operations. */
+ PENDING,
+ /**
+ * Checked if we can commit, and said yes. Still need to determine the
+ * global decision.
+ */
+ COMMIT_PENDING,
+ /** Committed. */
+ COMMITED,
+ /** Aborted. */
+ ABORTED
+ }
+
+ private final long transactionId;
+ private Status status;
+ private SortedSet readSet = new TreeSet(
+ Bytes.BYTES_COMPARATOR);
+ private Set writeSet = new HashSet();
+ private Set additionalTransactionsToCheck = new HashSet();
+ private int startSequenceNumber;
+ private Integer sequenceNumber;
+
+ public TransactionState(final long transactionId) {
+ this.transactionId = transactionId;
+ this.status = Status.PENDING;
+ }
+
+ public void addRead(final byte[] rowKey) {
+ readSet.add(rowKey);
+ }
+
+ public Set getReadSet() {
+ return readSet;
+ }
+
+ public void addWrite(final BatchUpdate write) {
+ writeSet.add(write);
+ }
+
+ public Set getWriteSet() {
+ return writeSet;
+ }
+
+ public void addTransactionToCheck(final TransactionState transaction) {
+ additionalTransactionsToCheck.add(transaction);
+ }
+
+ public Set getTransactionsToCheck() {
+ return additionalTransactionsToCheck;
+ }
+
+ /**
+ * Get the status.
+ *
+ * @return Return the status.
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Set the status.
+ *
+ * @param status The status to set.
+ */
+ public void setStatus(final Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Get the startSequenceNumber.
+ *
+ * @return Return the startSequenceNumber.
+ */
+ public int getStartSequenceNumber() {
+ return startSequenceNumber;
+ }
+
+ /**
+ * Get the sequenceNumber.
+ *
+ * @return Return the sequenceNumber.
+ */
+ public Integer getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ /**
+ * Set the sequenceNumber.
+ *
+ * @param sequenceNumber The sequenceNumber to set.
+ */
+ public void setSequenceNumber(final Integer sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ result.append("[transactionId: ");
+ result.append(transactionId);
+ result.append(" status: ");
+ result.append(status.name());
+ result.append(" read Size: ");
+ result.append(readSet.size());
+ result.append(" write Size: ");
+ result.append(writeSet.size());
+ result.append(" startSQ: ");
+ result.append(startSequenceNumber);
+ if (sequenceNumber != null) {
+ result.append(" commitedSQ:");
+ result.append(sequenceNumber);
+ }
+ result.append("]");
+
+ return result.toString();
+ }
+
+ /**
+ * Get the transactionId.
+ *
+ * @return Return the transactionId.
+ */
+ public long getTransactionId() {
+ return transactionId;
+ }
+
+ /**
+ * Set the startSequenceNumber.
+ *
+ * @param startSequenceNumber The startSequenceNumber to set.
+ */
+ public void setStartSequenceNumber(final int startSequenceNumber) {
+ this.startSequenceNumber = startSequenceNumber;
+ }
+
+}
Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/util/Bytes.java
===================================================================
--- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/util/Bytes.java (revision 665754)
+++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/util/Bytes.java (working copy)
@@ -133,9 +133,9 @@
* @param bytes
* @return the long value
*/
- public static long toInt(byte[] bytes) {
+ public static int toInt(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
- return -1L;
+ return -1;
}
return ByteBuffer.wrap(bytes).getInt();
}
Index: /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/StressTestTransactions.java
===================================================================
--- /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/StressTestTransactions.java (revision 0)
+++ /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/StressTestTransactions.java (revision 0)
@@ -0,0 +1,424 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.DummyTransactionLogger;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Stress Test the transaction functionality. This requires to run an
+ * {@link TransactionalRegionServer}. We run many threads doing reads/writes
+ * which may conflict with each other. We have two types of transactions, those
+ * which operate on rows of a single table, and those which operate on rows
+ * across multiple tables. Each transaction type has a modification operation
+ * which changes two values while maintaining the sum. Also each transaction
+ * type has a consistency-check operation which sums all rows and verifies that
+ * the sum is as expected.
+ */
+public class StressTestTransactions extends HBaseClusterTestCase {
+ private static final Log LOG = LogFactory
+ .getLog(StressTestTransactions.class);
+
+ private static final int NUM_TABLES = 3;
+ private static final int NUM_ST_ROWS = 3;
+ private static final int NUM_MT_ROWS = 3;
+ private static final int NUM_TRANSACTIONS_PER_THREAD = 100;
+ private static final int NUM_SINGLE_TABLE_THREADS = 6;
+ private static final int NUM_MULTI_TABLE_THREADS = 6;
+ private static final int PRE_COMMIT_SLEEP = 10;
+ private static final Random RAND = new Random();
+
+ private static final byte[] FAMILY = Bytes.toBytes("family:");
+ private static final byte[] COL = Bytes.toBytes("family:a");
+
+ private HBaseAdmin admin;
+ private TransactionalTable[] tables;
+ private TransactionManager transactionManager;
+
+ /** constructor */
+ public StressTestTransactions() {
+ conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+ .getName());
+ conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+ .getName());
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ tables = new TransactionalTable[NUM_TABLES];
+
+ for (int i = 0; i < tables.length; i++) {
+ HTableDescriptor desc = new HTableDescriptor(makeTableName(i));
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ tables[i] = new TransactionalTable(conf, desc.getName());
+ }
+
+ transactionManager = new TransactionManager(new DummyTransactionLogger(),
+ conf);
+ }
+
+ private String makeTableName(final int i) {
+ return "table" + i;
+ }
+
+ private void writeInitalValues() throws IOException {
+ for (TransactionalTable table : tables) {
+ for (int i = 0; i < NUM_ST_ROWS; i++) {
+ byte[] row = makeSTRow(i);
+ BatchUpdate b = new BatchUpdate(row);
+ b.put(COL, Bytes.toBytes(SingleTableTransactionThread.INITIAL_VALUE));
+ table.commit(b);
+ }
+ for (int i = 0; i < NUM_MT_ROWS; i++) {
+ byte[] row = makeMTRow(i);
+ BatchUpdate b = new BatchUpdate(row);
+ b.put(COL, Bytes.toBytes(MultiTableTransactionThread.INITIAL_VALUE));
+ table.commit(b);
+ }
+ }
+ }
+
+ private byte[] makeSTRow(final int i) {
+ return Bytes.toBytes("st" + i);
+ }
+
+ private byte[] makeMTRow(final int i) {
+ return Bytes.toBytes("mt" + i);
+ }
+
+ private static int nextThreadNum = 1;
+ private static final AtomicBoolean stopRequest = new AtomicBoolean(false);
+ private static final AtomicBoolean consistencyFailure = new AtomicBoolean(
+ false);
+
+ // Thread which runs transactions
+ abstract class TransactionThread extends Thread {
+ private int numRuns = 0;
+ private int numAborts = 0;
+ private int numUnknowns = 0;
+
+ public TransactionThread(final String namePrefix) {
+ super.setName(namePrefix + "transaction " + nextThreadNum++);
+ }
+
+ @Override
+ public void run() {
+ for (int i = 0; i < NUM_TRANSACTIONS_PER_THREAD; i++) {
+ if (stopRequest.get()) {
+ return;
+ }
+ try {
+ numRuns++;
+ transaction();
+ } catch (UnknownTransactionException e) {
+ numUnknowns++;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (CommitUnsuccessfulException e) {
+ numAborts++;
+ }
+ }
+ }
+
+ protected abstract void transaction() throws IOException,
+ CommitUnsuccessfulException;
+
+ public int getNumAborts() {
+ return numAborts;
+ }
+
+ public int getNumUnknowns() {
+ return numUnknowns;
+ }
+
+ protected void preCommitSleep() {
+ try {
+ Thread.sleep(PRE_COMMIT_SLEEP);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void consistencyFailure() {
+ LOG.fatal("Consistency failure");
+ stopRequest.set(true);
+ consistencyFailure.set(true);
+ }
+
+ /**
+ * Get the numRuns.
+ *
+ * @return Return the numRuns.
+ */
+ public int getNumRuns() {
+ return numRuns;
+ }
+
+ }
+
+ // Atomically change the value of two rows rows while maintaining the sum.
+ // This should preserve the global sum of the rows, which is also checked
+ // with a transaction.
+ private class SingleTableTransactionThread extends TransactionThread {
+ private static final int INITIAL_VALUE = 10;
+ public static final int TOTAL_SUM = INITIAL_VALUE * NUM_ST_ROWS;
+ private static final int MAX_TRANSFER_AMT = 100;
+
+ private TransactionalTable table;
+ boolean doCheck = false;
+
+ public SingleTableTransactionThread() {
+ super("single table ");
+ }
+
+ @Override
+ protected void transaction() throws IOException,
+ CommitUnsuccessfulException {
+ if (doCheck) {
+ checkTotalSum();
+ } else {
+ doSingleRowChange();
+ }
+ doCheck = !doCheck;
+ }
+
+ private void doSingleRowChange() throws IOException,
+ CommitUnsuccessfulException {
+ table = tables[RAND.nextInt(NUM_TABLES)];
+ int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
+ - MAX_TRANSFER_AMT;
+ int row1Index = RAND.nextInt(NUM_ST_ROWS);
+ int row2Index;
+ do {
+ row2Index = RAND.nextInt(NUM_ST_ROWS);
+ } while (row2Index == row1Index);
+ byte[] row1 = makeSTRow(row1Index);
+ byte[] row2 = makeSTRow(row2Index);
+
+ TransactionState transactionState = transactionManager.beginTransaction();
+ int row1Amount = Bytes.toInt(table.get(transactionState, row1, COL)
+ .getValue());
+ int row2Amount = Bytes.toInt(table.get(transactionState, row2, COL)
+ .getValue());
+
+ row1Amount -= transferAmount;
+ row2Amount += transferAmount;
+
+ BatchUpdate update = new BatchUpdate(row1);
+ update.put(COL, Bytes.toBytes(row1Amount));
+ table.commit(transactionState, update);
+ update = new BatchUpdate(row2);
+ update.put(COL, Bytes.toBytes(row2Amount));
+ table.commit(transactionState, update);
+
+ super.preCommitSleep();
+
+ transactionManager.tryCommit(transactionState);
+ LOG.debug("Commited");
+ }
+
+ // Check the table we last mutated
+ private void checkTotalSum() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+ int totalSum = 0;
+ for (int i = 0; i < NUM_ST_ROWS; i++) {
+ totalSum += Bytes.toInt(table.get(transactionState, makeSTRow(i), COL)
+ .getValue());
+ }
+
+ transactionManager.tryCommit(transactionState);
+ if (TOTAL_SUM != totalSum) {
+ super.consistencyFailure();
+ }
+ }
+
+ }
+
+ // Similar to SingleTable, but this time we maintain consistency across tables
+ // rather than rows
+ private class MultiTableTransactionThread extends TransactionThread {
+ private static final int INITIAL_VALUE = 1000;
+ public static final int TOTAL_SUM = INITIAL_VALUE * NUM_TABLES;
+ private static final int MAX_TRANSFER_AMT = 100;
+
+ private byte[] row;
+ boolean doCheck = false;
+
+ public MultiTableTransactionThread() {
+ super("multi table");
+ }
+
+ @Override
+ protected void transaction() throws IOException,
+ CommitUnsuccessfulException {
+ if (doCheck) {
+ checkTotalSum();
+ } else {
+ doSingleRowChange();
+ }
+ doCheck = !doCheck;
+ }
+
+ private void doSingleRowChange() throws IOException,
+ CommitUnsuccessfulException {
+ row = makeMTRow(RAND.nextInt(NUM_MT_ROWS));
+ int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
+ - MAX_TRANSFER_AMT;
+ int table1Index = RAND.nextInt(tables.length);
+ int table2Index;
+ do {
+ table2Index = RAND.nextInt(tables.length);
+ } while (table2Index == table1Index);
+
+ TransactionalTable table1 = tables[table1Index];
+ TransactionalTable table2 = tables[table2Index];
+
+ TransactionState transactionState = transactionManager.beginTransaction();
+ int table1Amount = Bytes.toInt(table1.get(transactionState, row, COL)
+ .getValue());
+ int table2Amount = Bytes.toInt(table2.get(transactionState, row, COL)
+ .getValue());
+
+ table1Amount -= transferAmount;
+ table2Amount += transferAmount;
+
+ BatchUpdate update = new BatchUpdate(row);
+ update.put(COL, Bytes.toBytes(table1Amount));
+ table1.commit(transactionState, update);
+
+ update = new BatchUpdate(row);
+ update.put(COL, Bytes.toBytes(table2Amount));
+ table2.commit(transactionState, update);
+
+ super.preCommitSleep();
+
+ transactionManager.tryCommit(transactionState);
+
+ LOG.trace(Bytes.toString(table1.getTableName()) + ": " + table1Amount);
+ LOG.trace(Bytes.toString(table2.getTableName()) + ": " + table2Amount);
+
+ }
+
+ private void checkTotalSum() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+ int totalSum = 0;
+ int[] amounts = new int[tables.length];
+ for (int i = 0; i < tables.length; i++) {
+ int amount = Bytes.toInt(tables[i].get(transactionState, row, COL)
+ .getValue());
+ amounts[i] = amount;
+ totalSum += amount;
+ }
+
+ transactionManager.tryCommit(transactionState);
+
+ for (int i = 0; i < tables.length; i++) {
+ LOG.trace(Bytes.toString(tables[i].getTableName()) + ": " + amounts[i]);
+ }
+
+ if (TOTAL_SUM != totalSum) {
+ super.consistencyFailure();
+ }
+ }
+
+ }
+
+ public void testStressTransactions() throws IOException, InterruptedException {
+ writeInitalValues();
+
+ List transactionThreads = new LinkedList();
+
+ for (int i = 0; i < NUM_SINGLE_TABLE_THREADS; i++) {
+ TransactionThread transactionThread = new SingleTableTransactionThread();
+ transactionThread.start();
+ transactionThreads.add(transactionThread);
+ }
+
+ for (int i = 0; i < NUM_MULTI_TABLE_THREADS; i++) {
+ TransactionThread transactionThread = new MultiTableTransactionThread();
+ transactionThread.start();
+ transactionThreads.add(transactionThread);
+ }
+
+ for (TransactionThread transactionThread : transactionThreads) {
+ transactionThread.join();
+ }
+
+ for (TransactionThread transactionThread : transactionThreads) {
+ LOG.info(transactionThread.getName() + " done with "
+ + transactionThread.getNumAborts() + " aborts, and "
+ + transactionThread.getNumUnknowns() + " unknown transactions of "
+ + transactionThread.getNumRuns());
+ }
+
+ doFinalConsistencyChecks();
+ }
+
+ private void doFinalConsistencyChecks() throws IOException {
+
+ int[] mtSums = new int[NUM_MT_ROWS];
+ for (int i = 0; i < mtSums.length; i++) {
+ mtSums[i] = 0;
+ }
+
+ for (TransactionalTable table : tables) {
+ int thisTableSum = 0;
+ for (int i = 0; i < NUM_ST_ROWS; i++) {
+ byte[] row = makeSTRow(i);
+ thisTableSum += Bytes.toInt(table.get(row, COL).getValue());
+ }
+ Assert.assertEquals(SingleTableTransactionThread.TOTAL_SUM, thisTableSum);
+
+ for (int i = 0; i < NUM_MT_ROWS; i++) {
+ byte[] row = makeMTRow(i);
+ mtSums[i] += Bytes.toInt(table.get(row, COL).getValue());
+ }
+ }
+
+ for (int mtSum : mtSums) {
+ Assert.assertEquals(MultiTableTransactionThread.TOTAL_SUM, mtSum);
+ }
+ }
+}
Index: /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/TestTransactions.java
===================================================================
--- /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/TestTransactions.java (revision 0)
+++ /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/TestTransactions.java (revision 0)
@@ -0,0 +1,149 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.DummyTransactionLogger;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Test the transaction functionality. This requires to run an
+ * {@link TransactionalRegionServer}.
+ */
+public class TestTransactions extends HBaseClusterTestCase {
+
+ private static final String[] TABLE_NAMES = new String[] { "table1",
+ "table2", "table3" };
+
+ private static final byte[] FAMILY = Bytes.toBytes("family:");
+ private static final byte[] COL_A = Bytes.toBytes("family:a");
+
+ private static final byte[] ROW1 = Bytes.toBytes("row1");
+ private static final byte[] ROW2 = Bytes.toBytes("row2");
+ private static final byte[] ROW3 = Bytes.toBytes("row3");
+
+ private HBaseAdmin admin;
+ private TransactionalTable[] tables;
+ private TransactionManager transactionManager;
+
+ /** constructor */
+ public TestTransactions() {
+ conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+ .getName());
+ conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+ .getName());
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ tables = new TransactionalTable[TABLE_NAMES.length];
+
+ for (int i = 0; i < TABLE_NAMES.length; i++) {
+ HTableDescriptor desc = new HTableDescriptor(TABLE_NAMES[i]);
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ tables[i] = new TransactionalTable(conf, desc.getName());
+ }
+ transactionManager = new TransactionManager(new DummyTransactionLogger(),
+ conf);
+ writeInitalRow();
+ }
+
+ private void writeInitalRow() throws IOException {
+ BatchUpdate update = new BatchUpdate(ROW1);
+ update.put(COL_A, Bytes.toBytes(1));
+ tables[0].commit(update);
+ }
+
+ public void testSimpleTransaction() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState = makeTransaction1();
+ transactionManager.tryCommit(transactionState);
+ }
+
+ public void testTwoTransactionsWithoutConflict() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState1 = makeTransaction1();
+ TransactionState transactionState2 = makeTransaction2();
+
+ transactionManager.tryCommit(transactionState1);
+ transactionManager.tryCommit(transactionState2);
+ }
+
+ public void testTwoTransactionsWithConflict() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState1 = makeTransaction1();
+ TransactionState transactionState2 = makeTransaction2();
+
+ transactionManager.tryCommit(transactionState2);
+
+ try {
+ transactionManager.tryCommit(transactionState1);
+ fail();
+ } catch (CommitUnsuccessfulException e) {
+ // Good
+ }
+ }
+
+ // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
+ private TransactionState makeTransaction1() throws IOException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+
+ Cell row1_A = tables[0].get(transactionState, ROW1, COL_A);
+
+ BatchUpdate write1 = new BatchUpdate(ROW2);
+ write1.put(COL_A, row1_A.getValue());
+ tables[0].commit(transactionState, write1);
+
+ BatchUpdate write2 = new BatchUpdate(ROW3);
+ write2.put(COL_A, row1_A.getValue());
+ tables[0].commit(transactionState, write2);
+
+ return transactionState;
+ }
+
+ // Read ROW1,COL_A, increment its (integer) value, write back
+ private TransactionState makeTransaction2() throws IOException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+
+ Cell row1_A = tables[0].get(transactionState, ROW1, COL_A);
+
+ int value = Bytes.toInt(row1_A.getValue());
+
+ BatchUpdate write = new BatchUpdate(ROW1);
+ write.put(COL_A, Bytes.toBytes(value + 1));
+ tables[0].commit(transactionState, write);
+
+ return transactionState;
+ }
+}