Index: /opt/eclipse/workspace/hbase/conf/hbase-default.xml =================================================================== --- /opt/eclipse/workspace/hbase/conf/hbase-default.xml (revision 665754) +++ /opt/eclipse/workspace/hbase/conf/hbase-default.xml (working copy) @@ -105,6 +105,12 @@ + hbase.regionserver.impl + org.apache.hadoop.hbase.regionserver.HRegionServer + An interface that is assignable to HRegionInterface. Used when starting a region server. + + + hbase.client.pause 10000 General client pause value. Used mostly as value to wait Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/HTable.java (revision 665754) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -333,7 +333,7 @@ return this.tableName; } - protected HConnection getConnection() { + public HConnection getConnection() { return this.connection; } @@ -1232,7 +1232,7 @@ * If there are multiple regions in a table, this scanner will iterate * through them all. */ - private class ClientScanner implements Scanner { + protected class ClientScanner implements Scanner { private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); private byte[][] columns; private byte [] startRow; @@ -1270,6 +1270,18 @@ } nextScanner(); } + + protected byte[][] getColumns() { + return columns; + } + + protected long getTimestamp() { + return scanTime; + } + + protected RowFilterInterface getFilter() { + return filter; + } /* * Gets a scanner for the next region. @@ -1306,8 +1318,7 @@ } try { - callable = new ScannerCallable(getConnection(), getTableName(), columns, - localStartKey, scanTime, filter); + callable = getScannerCallable(localStartKey); // open a scanner on the region server starting at the // beginning of the region getConnection().getRegionServerWithRetries(callable); @@ -1318,6 +1329,11 @@ } return true; } + + protected ScannerCallable getScannerCallable(byte [] localStartKey) { + return new ScannerCallable(getConnection(), getTableName(), columns, + localStartKey, scanTime, filter); + } /** {@inheritDoc} */ public RowResult next() throws IOException { Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 665754) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.RowResult; @@ -38,7 +39,7 @@ private final long timestamp; private final RowFilterInterface filter; - ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns, + protected ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns, byte [] startRow, long timestamp, RowFilterInterface filter) { super(connection, tableName, startRow); this.columns = columns; @@ -65,9 +66,7 @@ scannerId = -1L; } else if (scannerId == -1L && !closed) { // open the scanner - scannerId = server.openScanner( - this.location.getRegionInfo().getRegionName(), columns, row, - timestamp, filter); + scannerId = openScanner(); } else { return server.next(scannerId); } @@ -74,6 +73,24 @@ return null; } + protected long openScanner() throws IOException { + return server.openScanner( + this.location.getRegionInfo().getRegionName(), columns, row, + timestamp, filter); + } + + protected byte [][] getColumns() { + return columns; + } + + protected long getTimestamp() { + return timestamp; + } + + protected RowFilterInterface getFilter() { + return filter; + } + /** * Call this when the next invocation of call should close the scanner */ Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java (revision 0) @@ -0,0 +1,29 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +/** Thrown when a transaction cannot be committed. + * + */ +public class CommitUnsuccessfulException extends Exception { + + public CommitUnsuccessfulException() { + super(); + } + + public CommitUnsuccessfulException(String arg0, Throwable arg1) { + super(arg0, arg1); + } + + public CommitUnsuccessfulException(String arg0) { + super(arg0); + } + + public CommitUnsuccessfulException(Throwable arg0) { + super(arg0); + } + +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java (revision 0) @@ -0,0 +1,33 @@ +package org.apache.hadoop.hbase.client.transactional; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A local, in-memory implementation of the transaction logger. Useful only for + * non-distributed testing. + * + */ +public class DummyTransactionLogger implements TransactionLogger { + + private AtomicLong nextTransactionId = new AtomicLong(0); + private Map transactionIdToStatusMap = Collections + .synchronizedMap(new HashMap()); + + public long createNewTransactionLog() { + long id = nextTransactionId.getAndIncrement(); + transactionIdToStatusMap.put(id, TransactionStatus.PENDING); + return id; + } + + public TransactionStatus getStatusForTransaction(final long transactionId) { + return transactionIdToStatusMap.get(transactionId); + } + + public void setStatusForTransaction(final long transactionId, + final TransactionStatus status) { + transactionIdToStatusMap.put(transactionId, status); + } +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (revision 0) @@ -0,0 +1,377 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.io.Text; + +/** + * Table with transactional support. + * + */ +public class TransactionalTable extends HTable { + + public TransactionalTable(final HBaseConfiguration conf, + final String tableName) throws IOException { + super(conf, tableName); + } + + public TransactionalTable(final HBaseConfiguration conf, final Text tableName) + throws IOException { + super(conf, tableName); + } + + public TransactionalTable(final HBaseConfiguration conf, + final byte[] tableName) throws IOException { + super(conf, tableName); + } + + private static abstract class TransactionalServerCallable extends + ServerCallable { + protected TransactionState transactionState; + + protected TransactionalRegionInterface getTransactionServer() { + return (TransactionalRegionInterface) server; + } + + protected void recordServer() throws IOException { + if (transactionState.addRegion(location)) { + getTransactionServer().beginTransaction( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName()); + } + } + + public TransactionalServerCallable(final HConnection connection, + final byte[] tableName, final byte[] row, + final TransactionState transactionState) { + super(connection, tableName, row); + this.transactionState = transactionState; + } + + } + + /** + * Get a single value for the specified row and column + * + * @param row row key + * @param column column name + * @return value for specified row/column + * @throws IOException + */ + public Cell get(final TransactionState transactionState, final byte[] row, + final byte[] column) throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column); + } + }); + } + + /** + * Get the specified number of versions of the specified row and column + * + * @param row - row key + * @param column - column name + * @param numVersions - number of versions to retrieve + * @return - array byte values + * @throws IOException + */ + public Cell[] get(final TransactionState transactionState, final byte[] row, + final byte[] column, final int numVersions) throws IOException { + Cell[] values = null; + values = super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell[] call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column, + numVersions); + } + }); + + // FIXME, not sure why this is needed, but copied over from HTable + if (values != null) { + ArrayList cellValues = new ArrayList(); + for (int i = 0; i < values.length; i++) { + cellValues.add(values[i]); + } + return cellValues.toArray(new Cell[values.length]); + } + return null; + } + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param row - row key + * @param column - column name + * @param timestamp - timestamp + * @param numVersions - number of versions to retrieve + * @return - array of values that match the above criteria + * @throws IOException + */ + public Cell[] get(final TransactionState transactionState, final byte[] row, + final byte[] column, final long timestamp, final int numVersions) + throws IOException { + Cell[] values = null; + values = super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell[] call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column, + timestamp, numVersions); + } + }); + + if (values != null) { + ArrayList cellValues = new ArrayList(); + for (int i = 0; i < values.length; i++) { + cellValues.add(values[i]); + } + return cellValues.toArray(new Cell[values.length]); + } + return null; + } + + /** + * Get all the data for the specified row at the latest timestamp + * + * @param row row key + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row) throws IOException { + return getRow(transactionState, row, HConstants.LATEST_TIMESTAMP); + } + + /** + * Get all the data for the specified row at a specified timestamp + * + * @param row row key + * @param ts timestamp + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final long ts) throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public RowResult call() throws IOException { + recordServer(); + return getTransactionServer().getRow( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, ts); + } + }); + } + + /** + * Get selected columns for the specified row at the latest timestamp + * + * @param row row key + * @param columns Array of column names you want to retrieve. + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final byte[][] columns) throws IOException { + return getRow(transactionState, row, columns, HConstants.LATEST_TIMESTAMP); + } + + /** + * Get selected columns for the specified row at a specified timestamp + * + * @param row row key + * @param columns Array of column names you want to retrieve. + * @param ts timestamp + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final byte[][] columns, final long ts) + throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public RowResult call() throws IOException { + recordServer(); + return getTransactionServer().getRow( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, columns, ts); + } + }); + } + + /** + * Get a scanner on the current table starting at first row. Return the + * specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns) throws IOException { + return getScanner(transactionState, columns, HConstants.EMPTY_START_ROW, + HConstants.LATEST_TIMESTAMP, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow) throws IOException { + return getScanner(transactionState, columns, startRow, + HConstants.LATEST_TIMESTAMP, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, final long timestamp) + throws IOException { + return getScanner(transactionState, columns, startRow, timestamp, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, + final RowFilterInterface filter) throws IOException { + return getScanner(transactionState, columns, startRow, + HConstants.LATEST_TIMESTAMP, filter); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + return new TransactionalClientScanner(transactionState, columns, startRow, + timestamp, filter); + } + + /** + * Commit a BatchUpdate to the table. + * + * @param batchUpdate + * @throws IOException + */ + public synchronized void commit(final TransactionState transactionState, + final BatchUpdate batchUpdate) throws IOException { + super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), batchUpdate.getRow(), transactionState) { + public Boolean call() throws IOException { + recordServer(); + getTransactionServer().batchUpdate( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), batchUpdate); + return null; + } + }); + } + + protected class TransactionalClientScanner extends HTable.ClientScanner { + + private TransactionState transactionState; + + protected TransactionalClientScanner( + final TransactionState transactionState, final byte[][] columns, + final byte[] startRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + super(columns, startRow, timestamp, filter); + this.transactionState = transactionState; + } + + @Override + protected ScannerCallable getScannerCallable(final byte[] localStartKey) { + return new TransactionScannerCallable(transactionState, getConnection(), + getTableName(), getColumns(), localStartKey, getTimestamp(), + getFilter()); + } + } + +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (revision 0) @@ -0,0 +1,31 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +/** + * Simple interface used to provide a log about transaction status. Written to + * by the client, and read by regionservers in case of failure. + * + */ +public interface TransactionLogger { + + enum TransactionStatus { + PENDING, COMMITTED, ABORTED + } + + /** + * Create a new transaction log. Return the transaction's globally unique id. + * Log's initial value should be PENDING + * + * @return transaction id + */ + long createNewTransactionLog(); + + TransactionStatus getStatusForTransaction(long transactionId); + + void setStatusForTransaction(long transactionId, TransactionStatus status); + +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (revision 0) @@ -0,0 +1,119 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; + +/** + * Transaction Manager. Responsible for commiting transactions. + * + */ +public class TransactionManager { + static final Log LOG = LogFactory.getLog(TransactionManager.class); + + private final HConnection connection; + private final TransactionLogger transactionLogger; + + public TransactionManager(final TransactionLogger transactionLogger) { + this(transactionLogger, new HBaseConfiguration()); + } + + public TransactionManager(final TransactionLogger transactionLogger, + final HBaseConfiguration conf) { + this.transactionLogger = transactionLogger; + connection = HConnectionManager.getConnection(conf); + } + + /** + * Called to start a transaction. + * + * @return new transaction state + */ + public TransactionState beginTransaction() { + long transactionId = transactionLogger.createNewTransactionLog(); + LOG.debug("Begining transaction " + transactionId); + return new TransactionState(transactionId); + } + + /** + * Try and commit a transaction. + * + * @param transactionState + * @return + * @throws IOException + */ + public void tryCommit(final TransactionState transactionState) + throws CommitUnsuccessfulException, IOException { + LOG.debug("atempting to commit trasaction: " + transactionState.toString()); + + for (HRegionLocation location : transactionState.getParticipatingRegions()) { + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + boolean canCommit = transactionalRegionServer + .commitRequest(location.getRegionInfo().getRegionName(), + transactionState.getTransactionId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Region [" + location.getRegionInfo().getRegionNameAsString() + + "] votes " + (canCommit ? "to commit" : "to abort") + + " transaction " + transactionState.getTransactionId()); + } + + if (!canCommit) { + LOG.debug("Aborting [" + transactionState.getTransactionId() + "]"); + abort(transactionState, location); + throw new CommitUnsuccessfulException(); + } + } + + LOG.debug("Commiting [" + transactionState.getTransactionId() + "]"); + transactionLogger.setStatusForTransaction(transactionState + .getTransactionId(), TransactionLogger.TransactionStatus.COMMITTED); + + for (HRegionLocation location : transactionState.getParticipatingRegions()) { + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + transactionalRegionServer.commit( + location.getRegionInfo().getRegionName(), transactionState + .getTransactionId()); + } + // Tran log can be deleted now ... + } + + /** + * Abort a s transaction. + * + * @param transactionState + * @throws IOException + */ + public void abort(final TransactionState transactionState) throws IOException { + abort(transactionState, null); + } + + private void abort(final TransactionState transactionState, + final HRegionLocation locationToIgnore) throws IOException { + transactionLogger.setStatusForTransaction(transactionState + .getTransactionId(), TransactionLogger.TransactionStatus.ABORTED); + for (HRegionLocation location : transactionState.getParticipatingRegions()) { + if (locationToIgnore != null && location.equals(locationToIgnore)) { + continue; + } + + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + + transactionalRegionServer.abort(location.getRegionInfo().getRegionName(), + transactionState.getTransactionId()); + } + } +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (revision 0) @@ -0,0 +1,37 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; + +class TransactionScannerCallable extends ScannerCallable { + + private TransactionState transactionState; + + TransactionScannerCallable(final TransactionState transactionState, + final HConnection connection, final byte[] tableName, + final byte[][] columns, final byte[] startRow, final long timestamp, + final RowFilterInterface filter) { + super(connection, tableName, columns, startRow, timestamp, filter); + this.transactionState = transactionState; + } + + @Override + protected long openScanner() throws IOException { + if (transactionState.addRegion(location)) { + ((TransactionalRegionInterface) server).beginTransaction(transactionState + .getTransactionId(), location.getRegionInfo().getRegionName()); + } + return ((TransactionalRegionInterface) server).openScanner(transactionState + .getTransactionId(), this.location.getRegionInfo().getRegionName(), + getColumns(), row, getTimestamp(), getFilter()); + } +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java (revision 0) @@ -0,0 +1,61 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionLocation; + +/** + * Holds client-side transaction information. Client's use them as opaque + * objects passed around to transaction operations. + * + */ +public class TransactionState { + static final Log LOG = LogFactory.getLog(TransactionState.class); + + private final long transactionId; + + private Set participatingRegions = new HashSet(); + + TransactionState(final long transactionId) { + this.transactionId = transactionId; + } + + boolean addRegion(final HRegionLocation hregion) { + boolean added = participatingRegions.add(hregion); + + if (added) { + LOG.debug("Adding new hregion [" + + hregion.getRegionInfo().getRegionNameAsString() + + "] to transaction [" + transactionId + "]"); + } + + return added; + } + + Set getParticipatingRegions() { + return participatingRegions; + } + + /** + * Get the transactionId. + * + * @return Return the transactionId. + */ + public long getTransactionId() { + return transactionId; + } + + @Override + public String toString() { + return "id: " + transactionId + ", particpants: " + + participatingRegions.size(); + } +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java (revision 0) @@ -0,0 +1,27 @@ +/* + * $Id$ + * Created on Jun 7, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import org.apache.hadoop.hbase.DoNotRetryIOException; + +/** + * Thrown if a region server is passed an unknown transaction id + */ + public class UnknownTransactionException extends DoNotRetryIOException { + + /** constructor */ + public UnknownTransactionException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public UnknownTransactionException(String s) { + super(s); + } +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (revision 665754) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -84,6 +84,9 @@ /** Parameter name for what region server interface to use. */ static final String REGION_SERVER_CLASS = "hbase.regionserver.class"; + /** Parameter name for what region server implementation to use. */ + static final String REGION_SERVER_IMPL= "hbase.regionserver.impl"; + /** Default region server interface class name. */ static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName(); Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (revision 0) @@ -0,0 +1,163 @@ +/* + * $Id$ + * Created on Jun 4, 2008 + * + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; + +/** Interface for transactional region servers. + * + */ +public interface TransactionalRegionInterface extends HRegionInterface { + /** Interface version number */ + public static final long versionID = 1L; + + /** Sent to initiate a transaction. + * + * @param transactionId + * @param regionName name of region + */ + public void beginTransaction(long transactionId, final byte[] regionName) throws IOException; + + /** + * Retrieve a single value from the specified region for the specified row and + * column keys + * + * @param regionName name of region + * @param row row key + * @param column column key + * @return alue for that region/row/column + * @throws IOException + */ + public Cell get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column) throws IOException; + + /** + * Get the specified number of versions of the specified row and column + * + * @param regionName region name + * @param row row key + * @param column column key + * @param numVersions number of versions to return + * @return array of values + * @throws IOException + */ + public Cell[] get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final int numVersions) + throws IOException; + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param regionName region name + * @param row row key + * @param column column key + * @param timestamp timestamp + * @param numVersions number of versions to return + * @return array of values + * @throws IOException + */ + public Cell[] get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final long timestamp, + final int numVersions) throws IOException; + + /** + * Get all the data for the specified row at a given timestamp + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final long ts) throws IOException; + + /** + * Get selected columns for the specified row at a given timestamp. + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns, final long ts) + throws IOException; + + /** + * Get selected columns for the specified row at the latest timestamp. + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns) throws IOException; + + /** + * Opens a remote scanner with a RowFilter. + * + * @param transactionId + * @param regionName name of region to scan + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex for column family name. A column name is judged to be regex if + * it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row to scan + * @param timestamp only return values whose timestamp is <= this value + * @param filter RowFilter for filtering results at the row-level. + * + * @return scannerId scanner identifier used in other calls + * @throws IOException + */ + public long openScanner(final long transactionId, final byte[] regionName, + final byte[][] columns, final byte[] startRow, long timestamp, + RowFilterInterface filter) throws IOException; + + /** + * Applies a batch of updates via one RPC + * + * @param regionName name of the region to update + * @param b BatchUpdate + * @throws IOException + */ + public void batchUpdate(long transactionId, final byte[] regionName, + final BatchUpdate b) throws IOException; + + /** + * Ask if we can commit the given transaction. + * + * @param transactionId + * @return true if we can commit + */ + public boolean commitRequest(final byte[] regionName, long transactionId) + throws IOException; + + /** + * Commit the transaction. + * + * @param transactionId + * @return + */ + public void commit(final byte[] regionName, long transactionId) + throws IOException; + + /** + * Abort the transaction. + * + * @param transactionId + * @return + */ + public void abort(final byte[] regionName, long transactionId) + throws IOException; +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 665754) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -65,6 +65,7 @@ /** 'local:' */ public static final String LOCAL_COLON = LOCAL + ":"; private final HBaseConfiguration conf; + private final Class regionServerClass; /** * Constructor. @@ -98,6 +99,7 @@ // start/stop ports at different times during the life of the test. conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0"); this.regionThreads = new ArrayList(); + regionServerClass = (Class) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); for (int i = 0; i < noRegionServers; i++) { addRegionServer(); } @@ -112,7 +114,13 @@ */ public RegionServerThread addRegionServer() throws IOException { synchronized (regionThreads) { - RegionServerThread t = new RegionServerThread(new HRegionServer(conf), + HRegionServer server; + try { + server = regionServerClass.getConstructor(HBaseConfiguration.class).newInstance(conf); + } catch (Exception e) { + throw new IOException(e); + } + RegionServerThread t = new RegionServerThread(server, this.regionThreads.size()); this.regionThreads.add(t); return t; Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 665754) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.RegionServerRunningException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; @@ -476,7 +480,7 @@ * Run init. Sets up hlog and starts up all server threads. * @param c Extra configuration. */ - private void init(final MapWritable c) throws IOException { + protected void init(final MapWritable c) throws IOException { try { for (Map.Entry e: c.entrySet()) { String key = e.getKey().toString(); @@ -860,15 +877,7 @@ HRegion region = this.onlineRegions.get(mapKey); if (region == null) { try { - region = new HRegion(HTableDescriptor.getTableDir(rootDir, - regionInfo.getTableDesc().getName()), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher, - new Progressable() { - public void progress() { - addProcessingMessage(regionInfo); - } - } - ); + region = instantiateRegion(regionInfo); // Startup a compaction early if one is needed. this.compactSplitThread.compactionRequested(region); } catch (IOException e) { @@ -891,6 +900,17 @@ reportOpen(regionInfo); } + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo + .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null, + this.cacheFlusher, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + /* * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue. * This method is called while region is in the queue of regions to process @@ -1172,16 +1192,9 @@ requestCount.incrementAndGet(); try { HRegion r = getRegion(regionName); - long scannerId = -1L; InternalScanner s = r.getScanner(cols, firstRow, timestamp, filter); - scannerId = rand.nextLong(); - String scannerName = String.valueOf(scannerId); - synchronized(scanners) { - scanners.put(scannerName, s); - } - this.leases. - createLease(scannerName, new ScannerListener(scannerName)); + long scannerId = addScanner(s); return scannerId; } catch (IOException e) { LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", @@ -1191,6 +1204,18 @@ } } + protected long addScanner(InternalScanner s) throws LeaseStillHeldException { + long scannerId = -1L; + scannerId = rand.nextLong(); + String scannerName = String.valueOf(scannerId); + synchronized(scanners) { + scanners.put(scannerName, s); + } + this.leases. + createLease(scannerName, new ScannerListener(scannerName)); + return scannerId; + } + /** {@inheritDoc} */ public void close(final long scannerId) throws IOException { checkOpen(); @@ -1409,7 +1434,7 @@ * * @throws IOException */ - private void checkOpen() throws IOException { + protected void checkOpen() throws IOException { if (this.stopRequested.get() || this.abortRequested) { throw new IOException("Server not running"); } @@ -1567,6 +1592,34 @@ * @param args */ public static void main(String [] args) { - doMain(args, HRegionServer.class); + Configuration conf = new HBaseConfiguration(); + @SuppressWarnings("unchecked") + Class regionServerClass = (Class) conf + .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); + doMain(args, regionServerClass); + } + + /** + * Get the leases. + * @return Return the leases. + */ + protected Leases getLeases() { + return leases; + } + + /** + * Get the rootDir. + * @return Return the rootDir. + */ + protected Path getRootDir() { + return rootDir; + } + + /** + * Get the fs. + * @return Return the fs. + */ + protected FileSystem getFileSystem() { + return fs; } } Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (revision 0) @@ -0,0 +1,39 @@ +/* + * $Id$ + * Created on Jun 6, 2008 + * + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * Cleans up committed transactions when they are no longer needed to verify + * pending transactions. + */ +class CleanOldTransactionsChore extends Chore { + + private static final String SLEEP_CONF = "hbase.transaction.clean.sleep"; + private static final int DEFAULT_SLEEP = 60 * 1000; + + private final TransactionalRegionServer regionServer; + + public CleanOldTransactionsChore( + final TransactionalRegionServer regionServer, + final AtomicBoolean stopRequest) { + super(regionServer.getConfiguration().getInt(SLEEP_CONF, DEFAULT_SLEEP), + stopRequest); + this.regionServer = regionServer; + } + + @Override + protected void chore() { + for (HRegion region : regionServer.getOnlineRegions()) { + ((TransactionalRegion) region).removeUnNeededCommitedTransactions(); + } + } + +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 0) @@ -0,0 +1,510 @@ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.LeaseException; +import org.apache.hadoop.hbase.LeaseListener; +import org.apache.hadoop.hbase.Leases; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status; +import org.apache.hadoop.util.Progressable; + +/** + * Regionserver which provides transactional support for atomic transactions. + * This is achieved with optimistic concurrency control (see + * http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf). We keep track + * read and write sets for each transaction, and hold off on processing the + * writes. To decide to commit a transaction we check its read sets with all + * transactions that have committed while it was running for overlaps. + *

+ * Because transactions can span multiple regions, all regions must agree to + * commit a transactions. The client side of this commit protocol is encoded in + * org.apache.hadoop.hbase.client.transactional.TransactionManger + *

+ * In the event of an failure of the client mid-commit, (after we voted yes), we + * will have to consult the transaction log to determine the final decision of + * the transaction. This is not yet implemented. + */ +class TransactionalRegion extends HRegion { + + private static final String LEASE_TIME = "hbase.transaction.leaseTime"; + private static final int DEFAULT_LEASE_TIME = 60 * 1000; + private static final int LEASE_CHECK_FREQUENCY = 1000; + + private static final Log LOG = LogFactory.getLog(TransactionalRegion.class); + + // Collection of active transactions (PENDING) keyed by id. + private Map transactionsById = new HashMap(); + + // Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by + // their sequence number + private SortedMap commitedTransactionsBySequenceNumber = Collections + .synchronizedSortedMap(new TreeMap()); + + // Collection of transactions that are COMMIT_PENDING + private Set commitPendingTransactions = Collections + .synchronizedSet(new HashSet()); + + private final Leases transactionLeases; + private AtomicInteger nextSequenceId = new AtomicInteger(0); + private Object commitCheckLock = new Object(); + + public TransactionalRegion(final Path basedir, final HLog log, + final FileSystem fs, final HBaseConfiguration conf, + final HRegionInfo regionInfo, final Path initialFiles, + final FlushRequester flushListener, final Progressable reporter) + throws IOException { + super(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, + reporter); + transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME), + LEASE_CHECK_FREQUENCY); + } + + public void beginTransaction(final long transactionId) throws IOException { + String key = String.valueOf(transactionId); + if (transactionsById.get(key) != null) { + TransactionState alias = getTransactionState(transactionId); + if (alias != null) { + alias.setStatus(Status.ABORTED); + retireTransaction(alias); + } + throw new IOException("Already exiting transaction id: " + key); + } + + TransactionState state = new TransactionState(transactionId); + + // Order is important here + for (TransactionState commitPending : commitPendingTransactions) { + state.addTransactionToCheck(commitPending); + } + state.setStartSequenceNumber(nextSequenceId.get()); + + transactionsById.put(String.valueOf(key), state); + try { + transactionLeases.createLease(key, new TransactionLeaseListener(key)); + } catch (LeaseStillHeldException e) { + throw new RuntimeException(e); + } + LOG.debug("Begining transaction " + key + " in region " + + super.getRegionInfo().getRegionNameAsString()); + } + + /** + * Fetch a single data item. + * + * @param transactionId + * @param row + * @param column + * @return column value + * @throws IOException + */ + public Cell get(final long transactionId, final byte[] row, + final byte[] column) throws IOException { + Cell[] results = get(transactionId, row, column, 1); + return (results == null || results.length == 0) ? null : results[0]; + } + + /** + * Fetch multiple versions of a single data item + * + * @param transactionId + * @param row + * @param column + * @param numVersions + * @return array of values one element per version + * @throws IOException + */ + public Cell[] get(final long transactionId, final byte[] row, + final byte[] column, final int numVersions) throws IOException { + return get(transactionId, row, column, Long.MAX_VALUE, numVersions); + } + + /** + * Fetch multiple versions of a single data item, with timestamp. + * + * @param transactionId + * @param row + * @param column + * @param timestamp + * @param numVersions + * @return array of values one element per version that matches the timestamp + * @throws IOException + */ + public Cell[] get(final long transactionId, final byte[] row, + final byte[] column, final long timestamp, final int numVersions) + throws IOException { + TransactionState state = getTransactionState(transactionId); + state.addRead(row); + + return get(row, column, timestamp, numVersions); + } + + /** + * Fetch all the columns for the indicated row at a specified timestamp. + * Returns a TreeMap that maps column names to values. + * + * @param transactionId + * @param row + * @param columns Array of columns you'd like to retrieve. When null, get all. + * @param ts + * @return Map values + * @throws IOException + */ + public Map getFull(final long transactionId, final byte[] row, + final Set columns, final long ts) throws IOException { + TransactionState state = getTransactionState(transactionId); + state.addRead(row); + return getFull(row, columns, ts); + } + + /** + * Return an iterator that scans over the HRegion, returning the indicated + * columns for only the rows that match the data filter. This Iterator must be + * closed by the caller. + * + * @param transactionId + * @param cols columns to scan. If column name is a column family, all columns + * of the specified column family are returned. Its also possible to pass a + * regex in the column qualifier. A column qualifier is judged to be a regex + * if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param firstRow row which is the starting point of the scan + * @param timestamp only return rows whose timestamp is <= this value + * @param filter row filter + * @return InternalScanner + * @throws IOException + */ + public InternalScanner getScanner(final long transactionId, + final byte[][] cols, final byte[] firstRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow, + timestamp, filter)); + } + + /** + * Add a write to the transaction. Does not get applied until commit process. + * + * @param b + * @throws IOException + */ + public void batchUpdate(final long transactionId, final BatchUpdate b) + throws IOException { + TransactionState state = getTransactionState(transactionId); + state.addWrite(b); + // FIXME write to the WAL + } + + public boolean commitRequest(final long transactionId) throws IOException { + synchronized (commitCheckLock) { + TransactionState state = getTransactionState(transactionId); + if (state == null) { + return false; + } + + if (hasConflict(state)) { + state.setStatus(Status.ABORTED); + retireTransaction(state); + return false; + } + + // No conflicts, we can commit. + LOG.trace("No conflicts for transaction " + transactionId + + " found in region " + super.getRegionInfo().getRegionNameAsString() + + ". Voting for commit"); + state.setStatus(Status.COMMIT_PENDING); + + // If there are writes we must keep record off the transaction + if (state.getWriteSet().size() > 0) { + // Order is important + commitPendingTransactions.add(state); + state.setSequenceNumber(nextSequenceId.getAndIncrement()); + commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(), + state); + } + + return true; + } + } + + private boolean hasConflict(final TransactionState state) { + // Check against transactions that committed while we were running + for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) { + TransactionState other = commitedTransactionsBySequenceNumber.get(i); + if (other == null) { + continue; + } + if (hasConflict(state, other)) { + return true; + + } + } + + // Check against transactions that were COMMIT_PENDING when we started + for (TransactionState other : state.getTransactionsToCheck()) { + if (other.getStatus().equals(TransactionState.Status.ABORTED)) { + continue; + } + if (hasConflict(state, other)) { + return true; + } + } + return false; + } + + private boolean hasConflict(final TransactionState checkTransaction, + final TransactionState checkAgainst) { + for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) { + if (checkTransaction.getReadSet().contains(otherUpdate.getRow())) { + LOG.trace("Transaction " + checkTransaction.toString() + + " conflicts with " + checkAgainst.toString()); + return true; + } + } + return false; + } + + /** + * Commit the transaction. + * + * @param transactionId + * @return + * @throws IOException + */ + public void commit(final long transactionId) throws IOException { + TransactionState state; + try { + state = getTransactionState(transactionId); + } catch (UnknownTransactionException e) { + LOG.fatal("Asked to commit unknown transaction: " + transactionId + + " in region " + super.getRegionInfo().getRegionNameAsString()); + // FIXME Write to the transaction log that this transaction was corrupted + throw e; + } + + if (!state.getStatus().equals(Status.COMMIT_PENDING)) { + LOG.fatal("Asked to commit a non pending transaction"); + // FIXME Write to the transaction log that this transaction was corrupted + throw new IOException("commit failure"); + } + + commit(state); + } + + /** + * Commit the transaction. + * + * @param transactionId + * @return + */ + public void abort(final long transactionId) { + TransactionState state; + try { + state = getTransactionState(transactionId); + } catch (UnknownTransactionException e) { + LOG.error("Asked to abort unknown transaction: " + transactionId); + return; + } + + state.setStatus(Status.ABORTED); + + // FIXME write abort message to the WAL (if there are writes) + + // Following removes needed if we have voted + if (state.getSequenceNumber() != null) { + commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber()); + } + commitPendingTransactions.remove(state); + + retireTransaction(state); + } + + private void commit(final TransactionState state) throws IOException { + + LOG.debug("Commiting transaction: " + state.toString() + " to " + + super.getRegionInfo().getRegionNameAsString()); + + // FIXME write commit message to WAL if there are writes + + for (BatchUpdate update : state.getWriteSet()) { + super.batchUpdate(update); + // FIXME. These updates do not need to be WALed. They were written as + // they were received + } + + state.setStatus(Status.COMMITED); + if (state.getWriteSet().size() > 0 + && !commitPendingTransactions.remove(state)) { + LOG + .fatal("Commiting a non-query transaction that is not in commitPendingTransactions"); + throw new IOException("commit failure"); // FIXME, how to handle? + } + retireTransaction(state); + } + + // Cancel leases, and removed from lease lookup. This transaction may still + // live in commitedTransactionsBySequenceNumber and commitPendingTransactions + private void retireTransaction(final TransactionState state) { + String key = String.valueOf(state.getTransactionId()); + try { + transactionLeases.cancelLease(key); + } catch (LeaseException e) { + // Ignore + } + + transactionsById.remove(key); + } + + private TransactionState getTransactionState(final long transactionId) + throws UnknownTransactionException { + String key = String.valueOf(transactionId); + TransactionState state = null; + + state = transactionsById.get(key); + + if (state == null) { + LOG.trace("Unknown transaction: " + key); + throw new UnknownTransactionException(key); + } + + try { + transactionLeases.renewLease(key); + } catch (LeaseException e) { + throw new RuntimeException(e); + } + + return state; + } + + /** + * Cleanup references to committed transactions that are no longer needed. + * + */ + void removeUnNeededCommitedTransactions() { + Integer minStartSeqNumber = getMinStartSequenceNumber(); + if (minStartSeqNumber == null) { + minStartSeqNumber = Integer.MAX_VALUE; // Remove all + } + + Iterator> it = commitedTransactionsBySequenceNumber + .entrySet().iterator(); + + int numRemoved = 0; + while (it.hasNext()) { + if (it.next().getKey() >= minStartSeqNumber) { + break; + } + it.remove(); + numRemoved++; + } + + LOG.debug("Removed " + numRemoved + + " commited transactions with sequence lower than " + + minStartSeqNumber); + } + + private Integer getMinStartSequenceNumber() { + Integer min = null; + for (TransactionState transactionState : transactionsById.values()) { + if (min == null || transactionState.getStartSequenceNumber() < min) { + min = transactionState.getStartSequenceNumber(); + } + } + return min; + } + + // TODO + private void resolveTransactionFromLog(final TransactionState s) { + throw new RuntimeException("NYI"); + } + + private class TransactionLeaseListener implements LeaseListener { + private final String transactionName; + + TransactionLeaseListener(final String n) { + this.transactionName = n; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Transaction " + this.transactionName + " lease expired"); + TransactionState s = null; + synchronized (transactionsById) { + s = transactionsById.remove(transactionName); + } + if (s == null) { + LOG.warn("Unknown transaction expired " + this.transactionName); + return; + } + + switch (s.getStatus()) { + case PENDING: + s.setStatus(Status.ABORTED); // Other transactions may have a ref + break; + case COMMIT_PENDING: + LOG.info("Transaction " + s.getTransactionId() + + " expired in COMMIT_PENDING state"); + LOG.info("Checking transaction status in transaction log"); + resolveTransactionFromLog(s); + break; + default: + LOG.warn("Unexpected status on expired lease"); + } + } + } + + /** Wrapper which keeps track of rows returned by scanner. */ + private class ScannerWrapper implements InternalScanner { + private long transactionId; + private InternalScanner scanner; + + public ScannerWrapper(final long transactionId, + final InternalScanner scanner) { + this.transactionId = transactionId; + this.scanner = scanner; + } + + public void close() throws IOException { + scanner.close(); + } + + public boolean isMultipleMatchScanner() { + return scanner.isMultipleMatchScanner(); + } + + public boolean isWildcardScanner() { + return scanner.isWildcardScanner(); + } + + public boolean next(final HStoreKey key, + final SortedMap results) throws IOException { + boolean result = scanner.next(key, results); + getTransactionState(transactionId).addRead(key.getRow()); + return result; + } + } +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 0) @@ -0,0 +1,267 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.util.Progressable; + +/** + * RegionServer with support for transactions. Transactional logic is at the + * region level, so we mostly just delegate to the appropriate + * TransactionalRegion. + */ +public class TransactionalRegionServer extends HRegionServer implements + TransactionalRegionInterface { + static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class); + + private final CleanOldTransactionsChore cleanOldTransactionsThread; + + public TransactionalRegionServer(final HBaseConfiguration conf) + throws IOException { + this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS, + DEFAULT_REGIONSERVER_ADDRESS)), conf); + } + + public TransactionalRegionServer(final HServerAddress address, + final HBaseConfiguration conf) throws IOException { + super(address, conf); + cleanOldTransactionsThread = new CleanOldTransactionsChore(this, + super.stopRequested); + } + + /** {@inheritDoc} */ + @Override + public long getProtocolVersion(final String protocol, final long clientVersion) + throws IOException { + if (protocol.equals(TransactionalRegionInterface.class.getName())) { + return TransactionalRegionInterface.versionID; + } + return super.getProtocolVersion(protocol, clientVersion); + } + + @Override + protected void init(final MapWritable c) throws IOException { + super.init(c); + String n = Thread.currentThread().getName(); + UncaughtExceptionHandler handler = new UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + abort(); + LOG.fatal("Set stop flag in " + t.getName(), e); + } + }; + Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n + + ".oldTransactionCleaner", handler); + + } + + @Override + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + return new TransactionalRegion(HTableDescriptor.getTableDir(super + .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super + .getFileSystem(), super.conf, regionInfo, null, super + .getFlushRequester(), new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + + protected TransactionalRegion getTransactionalRegion(final byte[] regionName) + throws NotServingRegionException { + return (TransactionalRegion) super.getRegion(regionName); + } + + public void abort(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).abort(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public void batchUpdate(final long transactionId, final byte[] regionName, + final BatchUpdate b) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).batchUpdate(transactionId, b); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public void commit(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).commit(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public boolean commitRequest(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).commitRequest(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell[] get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final int numVersions) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column, + numVersions); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell[] get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final long timestamp, + final int numVersions) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column, + timestamp, numVersions); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final long ts) throws IOException { + return getRow(transactionId, regionName, row, null, ts); + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns) throws IOException { + return getRow(transactionId, regionName, row, columns, + HConstants.LATEST_TIMESTAMP); + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns, final long ts) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + // convert the columns array into a set so it's easy to check later. + Set columnSet = null; + if (columns != null) { + columnSet = new TreeSet(Bytes.BYTES_COMPARATOR); + columnSet.addAll(Arrays.asList(columns)); + } + + TransactionalRegion region = getTransactionalRegion(regionName); + Map map = region.getFull(transactionId, row, columnSet, ts); + HbaseMapWritable result = new HbaseMapWritable(); + result.putAll(map); + return new RowResult(row, result); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + + } + + public long openScanner(final long transactionId, final byte[] regionName, + final byte[][] cols, final byte[] firstRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + checkOpen(); + NullPointerException npe = null; + if (regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if (cols == null) { + npe = new NullPointerException("columns to scan is null"); + } else if (firstRow == null) { + npe = new NullPointerException("firstRow for scanner is null"); + } + if (npe != null) { + IOException io = new IOException("Invalid arguments to openScanner"); + io.initCause(npe); + throw io; + } + super.getRequestCount().incrementAndGet(); + try { + TransactionalRegion r = getTransactionalRegion(regionName); + long scannerId = -1L; + InternalScanner s = r.getScanner(transactionId, cols, firstRow, + timestamp, filter); + scannerId = super.addScanner(s); + return scannerId; + } catch (IOException e) { + LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", + RemoteExceptionHandler.checkIOException(e)); + checkFileSystem(); + throw e; + } + } + + public void beginTransaction(final long transactionId, final byte[] regionName) + throws IOException { + getTransactionalRegion(regionName).beginTransaction(transactionId); + } + +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (revision 0) @@ -0,0 +1,159 @@ +/* + * $Id$ + * Created on Jun 4, 2008 + * + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.util.HashSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Holds the state of a transaction. + */ +class TransactionState { + + /** Current status. */ + public enum Status { + /** Initial status, still performing operations. */ + PENDING, + /** + * Checked if we can commit, and said yes. Still need to determine the + * global decision. + */ + COMMIT_PENDING, + /** Committed. */ + COMMITED, + /** Aborted. */ + ABORTED + } + + private final long transactionId; + private Status status; + private SortedSet readSet = new TreeSet( + Bytes.BYTES_COMPARATOR); + private Set writeSet = new HashSet(); + private Set additionalTransactionsToCheck = new HashSet(); + private int startSequenceNumber; + private Integer sequenceNumber; + + public TransactionState(final long transactionId) { + this.transactionId = transactionId; + this.status = Status.PENDING; + } + + public void addRead(final byte[] rowKey) { + readSet.add(rowKey); + } + + public Set getReadSet() { + return readSet; + } + + public void addWrite(final BatchUpdate write) { + writeSet.add(write); + } + + public Set getWriteSet() { + return writeSet; + } + + public void addTransactionToCheck(final TransactionState transaction) { + additionalTransactionsToCheck.add(transaction); + } + + public Set getTransactionsToCheck() { + return additionalTransactionsToCheck; + } + + /** + * Get the status. + * + * @return Return the status. + */ + public Status getStatus() { + return status; + } + + /** + * Set the status. + * + * @param status The status to set. + */ + public void setStatus(final Status status) { + this.status = status; + } + + /** + * Get the startSequenceNumber. + * + * @return Return the startSequenceNumber. + */ + public int getStartSequenceNumber() { + return startSequenceNumber; + } + + /** + * Get the sequenceNumber. + * + * @return Return the sequenceNumber. + */ + public Integer getSequenceNumber() { + return sequenceNumber; + } + + /** + * Set the sequenceNumber. + * + * @param sequenceNumber The sequenceNumber to set. + */ + public void setSequenceNumber(final Integer sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append("[transactionId: "); + result.append(transactionId); + result.append(" status: "); + result.append(status.name()); + result.append(" read Size: "); + result.append(readSet.size()); + result.append(" write Size: "); + result.append(writeSet.size()); + result.append(" startSQ: "); + result.append(startSequenceNumber); + if (sequenceNumber != null) { + result.append(" commitedSQ:"); + result.append(sequenceNumber); + } + result.append("]"); + + return result.toString(); + } + + /** + * Get the transactionId. + * + * @return Return the transactionId. + */ + public long getTransactionId() { + return transactionId; + } + + /** + * Set the startSequenceNumber. + * + * @param startSequenceNumber The startSequenceNumber to set. + */ + public void setStartSequenceNumber(final int startSequenceNumber) { + this.startSequenceNumber = startSequenceNumber; + } + +} Index: /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/util/Bytes.java =================================================================== --- /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/util/Bytes.java (revision 665754) +++ /opt/eclipse/workspace/hbase/src/java/org/apache/hadoop/hbase/util/Bytes.java (working copy) @@ -133,9 +133,9 @@ * @param bytes * @return the long value */ - public static long toInt(byte[] bytes) { + public static int toInt(byte[] bytes) { if (bytes == null || bytes.length == 0) { - return -1L; + return -1; } return ByteBuffer.wrap(bytes).getInt(); } Index: /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/StressTestTransactions.java =================================================================== --- /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/StressTestTransactions.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/StressTestTransactions.java (revision 0) @@ -0,0 +1,424 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException; +import org.apache.hadoop.hbase.client.transactional.DummyTransactionLogger; +import org.apache.hadoop.hbase.client.transactional.TransactionManager; +import org.apache.hadoop.hbase.client.transactional.TransactionState; +import org.apache.hadoop.hbase.client.transactional.TransactionalTable; +import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Stress Test the transaction functionality. This requires to run an + * {@link TransactionalRegionServer}. We run many threads doing reads/writes + * which may conflict with each other. We have two types of transactions, those + * which operate on rows of a single table, and those which operate on rows + * across multiple tables. Each transaction type has a modification operation + * which changes two values while maintaining the sum. Also each transaction + * type has a consistency-check operation which sums all rows and verifies that + * the sum is as expected. + */ +public class StressTestTransactions extends HBaseClusterTestCase { + private static final Log LOG = LogFactory + .getLog(StressTestTransactions.class); + + private static final int NUM_TABLES = 3; + private static final int NUM_ST_ROWS = 3; + private static final int NUM_MT_ROWS = 3; + private static final int NUM_TRANSACTIONS_PER_THREAD = 100; + private static final int NUM_SINGLE_TABLE_THREADS = 6; + private static final int NUM_MULTI_TABLE_THREADS = 6; + private static final int PRE_COMMIT_SLEEP = 10; + private static final Random RAND = new Random(); + + private static final byte[] FAMILY = Bytes.toBytes("family:"); + private static final byte[] COL = Bytes.toBytes("family:a"); + + private HBaseAdmin admin; + private TransactionalTable[] tables; + private TransactionManager transactionManager; + + /** constructor */ + public StressTestTransactions() { + conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class + .getName()); + conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class + .getName()); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + tables = new TransactionalTable[NUM_TABLES]; + + for (int i = 0; i < tables.length; i++) { + HTableDescriptor desc = new HTableDescriptor(makeTableName(i)); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin = new HBaseAdmin(conf); + admin.createTable(desc); + tables[i] = new TransactionalTable(conf, desc.getName()); + } + + transactionManager = new TransactionManager(new DummyTransactionLogger(), + conf); + } + + private String makeTableName(final int i) { + return "table" + i; + } + + private void writeInitalValues() throws IOException { + for (TransactionalTable table : tables) { + for (int i = 0; i < NUM_ST_ROWS; i++) { + byte[] row = makeSTRow(i); + BatchUpdate b = new BatchUpdate(row); + b.put(COL, Bytes.toBytes(SingleTableTransactionThread.INITIAL_VALUE)); + table.commit(b); + } + for (int i = 0; i < NUM_MT_ROWS; i++) { + byte[] row = makeMTRow(i); + BatchUpdate b = new BatchUpdate(row); + b.put(COL, Bytes.toBytes(MultiTableTransactionThread.INITIAL_VALUE)); + table.commit(b); + } + } + } + + private byte[] makeSTRow(final int i) { + return Bytes.toBytes("st" + i); + } + + private byte[] makeMTRow(final int i) { + return Bytes.toBytes("mt" + i); + } + + private static int nextThreadNum = 1; + private static final AtomicBoolean stopRequest = new AtomicBoolean(false); + private static final AtomicBoolean consistencyFailure = new AtomicBoolean( + false); + + // Thread which runs transactions + abstract class TransactionThread extends Thread { + private int numRuns = 0; + private int numAborts = 0; + private int numUnknowns = 0; + + public TransactionThread(final String namePrefix) { + super.setName(namePrefix + "transaction " + nextThreadNum++); + } + + @Override + public void run() { + for (int i = 0; i < NUM_TRANSACTIONS_PER_THREAD; i++) { + if (stopRequest.get()) { + return; + } + try { + numRuns++; + transaction(); + } catch (UnknownTransactionException e) { + numUnknowns++; + } catch (IOException e) { + throw new RuntimeException(e); + } catch (CommitUnsuccessfulException e) { + numAborts++; + } + } + } + + protected abstract void transaction() throws IOException, + CommitUnsuccessfulException; + + public int getNumAborts() { + return numAborts; + } + + public int getNumUnknowns() { + return numUnknowns; + } + + protected void preCommitSleep() { + try { + Thread.sleep(PRE_COMMIT_SLEEP); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected void consistencyFailure() { + LOG.fatal("Consistency failure"); + stopRequest.set(true); + consistencyFailure.set(true); + } + + /** + * Get the numRuns. + * + * @return Return the numRuns. + */ + public int getNumRuns() { + return numRuns; + } + + } + + // Atomically change the value of two rows rows while maintaining the sum. + // This should preserve the global sum of the rows, which is also checked + // with a transaction. + private class SingleTableTransactionThread extends TransactionThread { + private static final int INITIAL_VALUE = 10; + public static final int TOTAL_SUM = INITIAL_VALUE * NUM_ST_ROWS; + private static final int MAX_TRANSFER_AMT = 100; + + private TransactionalTable table; + boolean doCheck = false; + + public SingleTableTransactionThread() { + super("single table "); + } + + @Override + protected void transaction() throws IOException, + CommitUnsuccessfulException { + if (doCheck) { + checkTotalSum(); + } else { + doSingleRowChange(); + } + doCheck = !doCheck; + } + + private void doSingleRowChange() throws IOException, + CommitUnsuccessfulException { + table = tables[RAND.nextInt(NUM_TABLES)]; + int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2) + - MAX_TRANSFER_AMT; + int row1Index = RAND.nextInt(NUM_ST_ROWS); + int row2Index; + do { + row2Index = RAND.nextInt(NUM_ST_ROWS); + } while (row2Index == row1Index); + byte[] row1 = makeSTRow(row1Index); + byte[] row2 = makeSTRow(row2Index); + + TransactionState transactionState = transactionManager.beginTransaction(); + int row1Amount = Bytes.toInt(table.get(transactionState, row1, COL) + .getValue()); + int row2Amount = Bytes.toInt(table.get(transactionState, row2, COL) + .getValue()); + + row1Amount -= transferAmount; + row2Amount += transferAmount; + + BatchUpdate update = new BatchUpdate(row1); + update.put(COL, Bytes.toBytes(row1Amount)); + table.commit(transactionState, update); + update = new BatchUpdate(row2); + update.put(COL, Bytes.toBytes(row2Amount)); + table.commit(transactionState, update); + + super.preCommitSleep(); + + transactionManager.tryCommit(transactionState); + LOG.debug("Commited"); + } + + // Check the table we last mutated + private void checkTotalSum() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState = transactionManager.beginTransaction(); + int totalSum = 0; + for (int i = 0; i < NUM_ST_ROWS; i++) { + totalSum += Bytes.toInt(table.get(transactionState, makeSTRow(i), COL) + .getValue()); + } + + transactionManager.tryCommit(transactionState); + if (TOTAL_SUM != totalSum) { + super.consistencyFailure(); + } + } + + } + + // Similar to SingleTable, but this time we maintain consistency across tables + // rather than rows + private class MultiTableTransactionThread extends TransactionThread { + private static final int INITIAL_VALUE = 1000; + public static final int TOTAL_SUM = INITIAL_VALUE * NUM_TABLES; + private static final int MAX_TRANSFER_AMT = 100; + + private byte[] row; + boolean doCheck = false; + + public MultiTableTransactionThread() { + super("multi table"); + } + + @Override + protected void transaction() throws IOException, + CommitUnsuccessfulException { + if (doCheck) { + checkTotalSum(); + } else { + doSingleRowChange(); + } + doCheck = !doCheck; + } + + private void doSingleRowChange() throws IOException, + CommitUnsuccessfulException { + row = makeMTRow(RAND.nextInt(NUM_MT_ROWS)); + int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2) + - MAX_TRANSFER_AMT; + int table1Index = RAND.nextInt(tables.length); + int table2Index; + do { + table2Index = RAND.nextInt(tables.length); + } while (table2Index == table1Index); + + TransactionalTable table1 = tables[table1Index]; + TransactionalTable table2 = tables[table2Index]; + + TransactionState transactionState = transactionManager.beginTransaction(); + int table1Amount = Bytes.toInt(table1.get(transactionState, row, COL) + .getValue()); + int table2Amount = Bytes.toInt(table2.get(transactionState, row, COL) + .getValue()); + + table1Amount -= transferAmount; + table2Amount += transferAmount; + + BatchUpdate update = new BatchUpdate(row); + update.put(COL, Bytes.toBytes(table1Amount)); + table1.commit(transactionState, update); + + update = new BatchUpdate(row); + update.put(COL, Bytes.toBytes(table2Amount)); + table2.commit(transactionState, update); + + super.preCommitSleep(); + + transactionManager.tryCommit(transactionState); + + LOG.trace(Bytes.toString(table1.getTableName()) + ": " + table1Amount); + LOG.trace(Bytes.toString(table2.getTableName()) + ": " + table2Amount); + + } + + private void checkTotalSum() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState = transactionManager.beginTransaction(); + int totalSum = 0; + int[] amounts = new int[tables.length]; + for (int i = 0; i < tables.length; i++) { + int amount = Bytes.toInt(tables[i].get(transactionState, row, COL) + .getValue()); + amounts[i] = amount; + totalSum += amount; + } + + transactionManager.tryCommit(transactionState); + + for (int i = 0; i < tables.length; i++) { + LOG.trace(Bytes.toString(tables[i].getTableName()) + ": " + amounts[i]); + } + + if (TOTAL_SUM != totalSum) { + super.consistencyFailure(); + } + } + + } + + public void testStressTransactions() throws IOException, InterruptedException { + writeInitalValues(); + + List transactionThreads = new LinkedList(); + + for (int i = 0; i < NUM_SINGLE_TABLE_THREADS; i++) { + TransactionThread transactionThread = new SingleTableTransactionThread(); + transactionThread.start(); + transactionThreads.add(transactionThread); + } + + for (int i = 0; i < NUM_MULTI_TABLE_THREADS; i++) { + TransactionThread transactionThread = new MultiTableTransactionThread(); + transactionThread.start(); + transactionThreads.add(transactionThread); + } + + for (TransactionThread transactionThread : transactionThreads) { + transactionThread.join(); + } + + for (TransactionThread transactionThread : transactionThreads) { + LOG.info(transactionThread.getName() + " done with " + + transactionThread.getNumAborts() + " aborts, and " + + transactionThread.getNumUnknowns() + " unknown transactions of " + + transactionThread.getNumRuns()); + } + + doFinalConsistencyChecks(); + } + + private void doFinalConsistencyChecks() throws IOException { + + int[] mtSums = new int[NUM_MT_ROWS]; + for (int i = 0; i < mtSums.length; i++) { + mtSums[i] = 0; + } + + for (TransactionalTable table : tables) { + int thisTableSum = 0; + for (int i = 0; i < NUM_ST_ROWS; i++) { + byte[] row = makeSTRow(i); + thisTableSum += Bytes.toInt(table.get(row, COL).getValue()); + } + Assert.assertEquals(SingleTableTransactionThread.TOTAL_SUM, thisTableSum); + + for (int i = 0; i < NUM_MT_ROWS; i++) { + byte[] row = makeMTRow(i); + mtSums[i] += Bytes.toInt(table.get(row, COL).getValue()); + } + } + + for (int mtSum : mtSums) { + Assert.assertEquals(MultiTableTransactionThread.TOTAL_SUM, mtSum); + } + } +} Index: /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/TestTransactions.java =================================================================== --- /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/TestTransactions.java (revision 0) +++ /opt/eclipse/workspace/hbase/src/test/org/apache/hadoop/hbase/TestTransactions.java (revision 0) @@ -0,0 +1,149 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException; +import org.apache.hadoop.hbase.client.transactional.DummyTransactionLogger; +import org.apache.hadoop.hbase.client.transactional.TransactionManager; +import org.apache.hadoop.hbase.client.transactional.TransactionState; +import org.apache.hadoop.hbase.client.transactional.TransactionalTable; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Test the transaction functionality. This requires to run an + * {@link TransactionalRegionServer}. + */ +public class TestTransactions extends HBaseClusterTestCase { + + private static final String[] TABLE_NAMES = new String[] { "table1", + "table2", "table3" }; + + private static final byte[] FAMILY = Bytes.toBytes("family:"); + private static final byte[] COL_A = Bytes.toBytes("family:a"); + + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final byte[] ROW3 = Bytes.toBytes("row3"); + + private HBaseAdmin admin; + private TransactionalTable[] tables; + private TransactionManager transactionManager; + + /** constructor */ + public TestTransactions() { + conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class + .getName()); + conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class + .getName()); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + tables = new TransactionalTable[TABLE_NAMES.length]; + + for (int i = 0; i < TABLE_NAMES.length; i++) { + HTableDescriptor desc = new HTableDescriptor(TABLE_NAMES[i]); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin = new HBaseAdmin(conf); + admin.createTable(desc); + tables[i] = new TransactionalTable(conf, desc.getName()); + } + transactionManager = new TransactionManager(new DummyTransactionLogger(), + conf); + writeInitalRow(); + } + + private void writeInitalRow() throws IOException { + BatchUpdate update = new BatchUpdate(ROW1); + update.put(COL_A, Bytes.toBytes(1)); + tables[0].commit(update); + } + + public void testSimpleTransaction() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState = makeTransaction1(); + transactionManager.tryCommit(transactionState); + } + + public void testTwoTransactionsWithoutConflict() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState1 = makeTransaction1(); + TransactionState transactionState2 = makeTransaction2(); + + transactionManager.tryCommit(transactionState1); + transactionManager.tryCommit(transactionState2); + } + + public void testTwoTransactionsWithConflict() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState1 = makeTransaction1(); + TransactionState transactionState2 = makeTransaction2(); + + transactionManager.tryCommit(transactionState2); + + try { + transactionManager.tryCommit(transactionState1); + fail(); + } catch (CommitUnsuccessfulException e) { + // Good + } + } + + // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA + private TransactionState makeTransaction1() throws IOException { + TransactionState transactionState = transactionManager.beginTransaction(); + + Cell row1_A = tables[0].get(transactionState, ROW1, COL_A); + + BatchUpdate write1 = new BatchUpdate(ROW2); + write1.put(COL_A, row1_A.getValue()); + tables[0].commit(transactionState, write1); + + BatchUpdate write2 = new BatchUpdate(ROW3); + write2.put(COL_A, row1_A.getValue()); + tables[0].commit(transactionState, write2); + + return transactionState; + } + + // Read ROW1,COL_A, increment its (integer) value, write back + private TransactionState makeTransaction2() throws IOException { + TransactionState transactionState = transactionManager.beginTransaction(); + + Cell row1_A = tables[0].get(transactionState, ROW1, COL_A); + + int value = Bytes.toInt(row1_A.getValue()); + + BatchUpdate write = new BatchUpdate(ROW1); + write.put(COL_A, Bytes.toBytes(value + 1)); + tables[0].commit(transactionState, write); + + return transactionState; + } +}