Index: src/test/org/apache/hadoop/hbase/TestTransactions.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestTransactions.java (revision 0) +++ src/test/org/apache/hadoop/hbase/TestTransactions.java (revision 0) @@ -0,0 +1,163 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException; +import org.apache.hadoop.hbase.client.transactional.DummyTransactionLogger; +import org.apache.hadoop.hbase.client.transactional.TransactionManager; +import org.apache.hadoop.hbase.client.transactional.TransactionState; +import org.apache.hadoop.hbase.client.transactional.TransactionalTable; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.ordered.OrderedRegionServer; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Test the ordered scanner functionality. This requires to run an + * {@link OrderedRegionServer}. + */ +public class TestTransactions extends HBaseClusterTestCase { + private static final Log LOG = LogFactory.getLog(TestTransactions.class); + + private static final byte[] FAMILY = Bytes.toBytes("family:"); + private static final byte[] COL_A = Bytes.toBytes("family:a"); + private static final byte[] COL_B = Bytes.toBytes("family:b"); + + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final byte[] ROW3 = Bytes.toBytes("row3"); + + private HTableDescriptor desc; + private HBaseAdmin admin; + private TransactionalTable table; + private TransactionManager transactionManager; + + /** constructor */ + public TestTransactions() { + conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class + .getName()); + conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class + .getName()); + } + + /** + * Since all the "tests" depend on the results of the previous test, they are + * not Junit tests that can stand alone. Consequently we have a single Junit + * test that runs the "sub-tests" as private methods. + * + * @throws IOException + * @throws CommitUnsuccessfulException + */ + public void test() throws IOException, CommitUnsuccessfulException { + setup(); + writeInitalRows(); + testSimpleTransaction(); + testTwoTransactionsWithoutConflict(); + testTwoTransactionsWithConflict(); + } + + private void setup() throws IOException { + desc = new HTableDescriptor("test"); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin = new HBaseAdmin(conf); + admin.createTable(desc); + table = new TransactionalTable(conf, desc.getName()); + transactionManager = new TransactionManager(new DummyTransactionLogger(), + conf); + + } + + private void writeInitalRows() throws IOException { + BatchUpdate update = new BatchUpdate(ROW1); + update.put(COL_A, Bytes.toBytes(1)); + table.commit(update); + } + + private void testSimpleTransaction() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState = makeTransaction1(); + transactionManager.tryCommit(transactionState); + + // TODO -- Verify went through + } + + private void testTwoTransactionsWithoutConflict() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState1 = makeTransaction1(); + TransactionState transactionState2 = makeTransaction2(); + + transactionManager.tryCommit(transactionState1); + transactionManager.tryCommit(transactionState2); + } + + private void testTwoTransactionsWithConflict() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState1 = makeTransaction1(); + TransactionState transactionState2 = makeTransaction2(); + + transactionManager.tryCommit(transactionState2); + + try { + transactionManager.tryCommit(transactionState1); + fail(); + } catch (CommitUnsuccessfulException e) { + // Good + } + } + + // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA + private TransactionState makeTransaction1() throws IOException { + TransactionState transactionState = transactionManager.beginTransaction(); + + Cell row1_A = table.get(transactionState, ROW1, COL_A); + + BatchUpdate write1 = new BatchUpdate(ROW2); + write1.put(COL_A, row1_A.getValue()); + table.commit(transactionState, write1); + + BatchUpdate write2 = new BatchUpdate(ROW3); + write2.put(COL_A, row1_A.getValue()); + table.commit(transactionState, write2); + + return transactionState; + } + + // Read ROW1,COL_A, increment its (integer) value, write back + private TransactionState makeTransaction2() throws IOException { + TransactionState transactionState = transactionManager.beginTransaction(); + + Cell row1_A = table.get(transactionState, ROW1, COL_A); + + int value = Bytes.toInt(row1_A.getValue()); + + BatchUpdate write = new BatchUpdate(ROW1); + write.put(COL_A, Bytes.toBytes(value + 1)); + table.commit(transactionState, write); + + return transactionState; + } +} Index: src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/java/org/apache/hadoop/hbase/HConstants.java (revision 663789) +++ src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -84,6 +84,9 @@ /** Parameter name for what region server interface to use. */ static final String REGION_SERVER_CLASS = "hbase.regionserver.class"; + /** Parameter name for what region server implementation to use. */ + static final String REGION_SERVER_IMPL= "hbase.regionserver.impl"; + /** Default region server interface class name. */ static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName(); Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 663789) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.RegionServerRunningException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; @@ -857,15 +861,7 @@ HRegion region = this.onlineRegions.get(mapKey); if (region == null) { try { - region = new HRegion(HTableDescriptor.getTableDir(rootDir, - regionInfo.getTableDesc().getName()), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher, - new Progressable() { - public void progress() { - addProcessingMessage(regionInfo); - } - } - ); + region = instantiateRegion(regionInfo); // Startup a compaction early if one is needed. this.compactSplitThread.compactionRequested(region); } catch (IOException e) { @@ -888,6 +884,17 @@ reportOpen(regionInfo); } + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo + .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null, + this.cacheFlusher, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + /* * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue. * This method is called while region is in the queue of regions to process @@ -1169,16 +1176,9 @@ requestCount.incrementAndGet(); try { HRegion r = getRegion(regionName); - long scannerId = -1L; InternalScanner s = r.getScanner(cols, firstRow, timestamp, filter); - scannerId = rand.nextLong(); - String scannerName = String.valueOf(scannerId); - synchronized(scanners) { - scanners.put(scannerName, s); - } - this.leases. - createLease(scannerName, new ScannerListener(scannerName)); + long scannerId = addScanner(s); return scannerId; } catch (IOException e) { LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", @@ -1188,6 +1188,18 @@ } } + protected long addScanner(InternalScanner s) throws LeaseStillHeldException { + long scannerId = -1L; + scannerId = rand.nextLong(); + String scannerName = String.valueOf(scannerId); + synchronized(scanners) { + scanners.put(scannerName, s); + } + this.leases. + createLease(scannerName, new ScannerListener(scannerName)); + return scannerId; + } + /** {@inheritDoc} */ public void close(final long scannerId) throws IOException { checkOpen(); @@ -1372,7 +1384,7 @@ * * @throws IOException */ - private void checkOpen() throws IOException { + protected void checkOpen() throws IOException { if (this.stopRequested.get() || this.abortRequested) { throw new IOException("Server not running"); } @@ -1530,6 +1542,34 @@ * @param args */ public static void main(String [] args) { - doMain(args, HRegionServer.class); + Configuration conf = new HBaseConfiguration(); + @SuppressWarnings("unchecked") + Class regionServerClass = (Class) conf + .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); + doMain(args, regionServerClass); } + + /** + * Get the leases. + * @return Return the leases. + */ + protected Leases getLeases() { + return leases; + } + + /** + * Get the rootDir. + * @return Return the rootDir. + */ + protected Path getRootDir() { + return rootDir; + } + + /** + * Get the fs. + * @return Return the fs. + */ + protected FileSystem getFileSystem() { + return fs; + } } Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (revision 0) @@ -0,0 +1,136 @@ +/* + * $Id$ + * Created on Jun 4, 2008 + * + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.util.HashSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Holds the state of a transaction. + */ +class TransactionState { + + /** Current status. */ + public enum Status { + /** Initial status, still performing operations. */ + PENDING, + /** Checked if we can commit, and said yes. */ + COMMIT_PENDING, + /** Committed */ + COMMITED, + } + + private final long transactionId; + private Status status; + private SortedSet readSet = new TreeSet( + Bytes.BYTES_COMPARATOR); + private Set writeSet = new HashSet(); + private final int startSequenceNumber; + private Integer sequenceNumber; + + public TransactionState(final int startSequenceNumber, + final long transactionId) { + this.startSequenceNumber = startSequenceNumber; + this.transactionId = transactionId; + this.status = Status.PENDING; + } + + public void addRead(final byte[] rowKey) { + readSet.add(rowKey); + } + + public Set getReadSet() { + return readSet; + } + + public void addWrite(final BatchUpdate write) { + writeSet.add(write); + } + + public Set getWriteSet() { + return writeSet; + } + + /** + * Get the status. + * + * @return Return the status. + */ + public Status getStatus() { + return status; + } + + /** + * Set the status. + * + * @param status The status to set. + */ + public void setStatus(final Status status) { + this.status = status; + } + + /** + * Get the startSequenceNumber. + * + * @return Return the startSequenceNumber. + */ + public int getStartSequenceNumber() { + return startSequenceNumber; + } + + /** + * Get the sequenceNumber. + * + * @return Return the sequenceNumber. + */ + public Integer getSequenceNumber() { + return sequenceNumber; + } + + /** + * Set the sequenceNumber. + * + * @param sequenceNumber The sequenceNumber to set. + */ + public void setSequenceNumber(final Integer sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append("[transactionId: "); + result.append(transactionId); + result.append(" status: "); + result.append(status.name()); + result.append(" read Size: "); + result.append(readSet.size()); + result.append(" write Size: "); + result.append(writeSet.size()); + result.append(" start SQ:"); + result.append(startSequenceNumber); + result.append(" SQ:"); + result.append(sequenceNumber); + result.append("]"); + + return result.toString(); + } + + /** + * Get the transactionId. + * + * @return Return the transactionId. + */ + public long getTransactionId() { + return transactionId; + } + +} Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 0) @@ -0,0 +1,364 @@ + +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.LeaseException; +import org.apache.hadoop.hbase.LeaseListener; +import org.apache.hadoop.hbase.Leases; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status; +import org.apache.hadoop.util.Progressable; + +public class TransactionalRegion extends HRegion { + + private static final String TRANSACTION_LEASE_TIMEOUT = "hbase.transaction.timeout"; + private static final int DEFAULT_TRANSACTION_LEASE_TIMEOUT = 3 * 60 * 1000; + private static final int LEASE_CHECK_FREQUENCY = 1000; + + static final Log LOG = LogFactory.getLog(TransactionalRegion.class); + + private Map transactionsById = Collections + .synchronizedMap(new HashMap()); + + // Collection of recent transactions that are COMMIT_PENDING or COMMITED. + // FIXME We need to clean this out occasionally (can remove transactions with + // sequence number < min(pending transactions) + private SortedMap commitedTransactionsBySequenceNumber = Collections + .synchronizedSortedMap(new TreeMap()); + + private Leases transactionLeases; + + private AtomicInteger nextSequenceId = new AtomicInteger(0); + private Object commitCheckLock = new Object(); + + public TransactionalRegion(final Path basedir, final HLog log, + final FileSystem fs, final HBaseConfiguration conf, + final HRegionInfo regionInfo, final Path initialFiles, + final FlushRequester flushListener, final Progressable reporter) + throws IOException { + super(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, + reporter); + transactionLeases = new Leases(conf.getInt(TRANSACTION_LEASE_TIMEOUT, + DEFAULT_TRANSACTION_LEASE_TIMEOUT), LEASE_CHECK_FREQUENCY); + } + + /** + * Fetch a single data item. + * + * @param transactionId + * @param row + * @param column + * @return column value + * @throws IOException + */ + public Cell get(final long transactionId, final byte[] row, + final byte[] column) throws IOException { + Cell[] results = get(transactionId, row, column, 1); + return (results == null || results.length == 0) ? null : results[0]; + } + + /** + * Fetch multiple versions of a single data item + * + * @param transactionId + * @param row + * @param column + * @param numVersions + * @return array of values one element per version + * @throws IOException + */ + public Cell[] get(final long transactionId, final byte[] row, + final byte[] column, final int numVersions) throws IOException { + return get(transactionId, row, column, Long.MAX_VALUE, numVersions); + } + + /** + * Fetch multiple versions of a single data item, with timestamp. + * + * @param transactionId + * @param row + * @param column + * @param timestamp + * @param numVersions + * @return array of values one element per version that matches the timestamp + * @throws IOException + */ + public Cell[] get(final long transactionId, final byte[] row, + final byte[] column, final long timestamp, final int numVersions) + throws IOException { + TransactionState state = getTransactionState(transactionId, true); + state.addRead(row); + + return get(row, column, timestamp, numVersions); + } + + /** + * Fetch all the columns for the indicated row at a specified timestamp. + * Returns a TreeMap that maps column names to values. + * + * @param transactionId + * @param row + * @param columns Array of columns you'd like to retrieve. When null, get all. + * @param ts + * @return Map values + * @throws IOException + */ + public Map getFull(final long transactionId, final byte[] row, + final Set columns, final long ts) throws IOException { + TransactionState state = getTransactionState(transactionId, true); + state.addRead(row); + return getFull(row, columns, ts); + } + + /** + * Return an iterator that scans over the HRegion, returning the indicated + * columns for only the rows that match the data filter. This Iterator must be + * closed by the caller. + * + * @param transactionId + * @param cols columns to scan. If column name is a column family, all columns + * of the specified column family are returned. Its also possible to pass a + * regex in the column qualifier. A column qualifier is judged to be a regex + * if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param firstRow row which is the starting point of the scan + * @param timestamp only return rows whose timestamp is <= this value + * @param filter row filter + * @return InternalScanner + * @throws IOException + */ + public InternalScanner getScanner(final long transactionId, + final byte[][] cols, final byte[] firstRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow, + timestamp, filter)); + } + + /** + * @param b + * @throws IOException + */ + public void batchUpdate(final long transactionId, final BatchUpdate b) + throws IOException { + TransactionState state = getTransactionState(transactionId, true); + state.addWrite(b); + batchUpdate(b); + } + + public boolean commitRequest(final long transactionId) { + synchronized (commitCheckLock) { + TransactionState state = getTransactionState(transactionId, false); + if (state == null) { + return false; + } + + for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) { + TransactionState other = commitedTransactionsBySequenceNumber.get(i); + if (other == null) { + continue; + } + for (BatchUpdate otherUpdate : other.getWriteSet()) { + if (state.getReadSet().contains(otherUpdate.getRow())) { + LOG.trace("Cannot commit because transaction sequence [" + i + + "] conflicts"); + retireTransaction(state); + return false; + } + } + } + // No conflicts, we can commit. + // FIXME, if there are no writes we can forget the transaction + LOG.trace("No conflicts for transaction [" + transactionId + + "] found. Voting for commit"); + state.setStatus(Status.COMMIT_PENDING); + state.setSequenceNumber(nextSequenceId.getAndIncrement()); + commitedTransactionsBySequenceNumber + .put(state.getSequenceNumber(), state); + + return true; + } + } + + /** + * Commit the transaction. + * + * @param transactionId + * @return + * @throws IOException + */ + public void commit(final long transactionId) throws IOException { + TransactionState state = getTransactionState(transactionId, false); + if (state == null) { + LOG.fatal("Asked to commit unknown transaction: " + transactionId); + // FIXME, how to handle + throw new IOException("commit failure"); + } + + if (!state.getStatus().equals(Status.COMMIT_PENDING)) { + LOG.fatal("Asked to commit a non pending transaction"); + // FIXME, how to handle + throw new IOException("commit failure"); + } + + commit(state); + retireTransaction(state); + } + + /** + * Commit the transaction. + * + * @param transactionId + * @return + * @throws IOException + */ + public void abort(final long transactionId) throws IOException { + TransactionState state = getTransactionState(transactionId, false); + if (state == null) { + LOG.debug("Asked to abort unknown transaction: " + transactionId); + // Assume that we voted no, and thus discarded the transaction + return; + } + + // FIXME write to the WAL + + if (commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber()) == null + && state.getWriteSet().size() > 0) { + LOG.fatal("Aborting transaction [" + transactionId + + "] that we did not have in commited transactions list"); + throw new IOException("abort failure"); + } + + retireTransaction(state); + } + + private void commit(final TransactionState state) throws IOException { + LOG.debug("Commiting transaction: " + state.toString()); + + // FIXME write commit message to WAL + + for (BatchUpdate update : state.getWriteSet()) { + super.batchUpdate(update); // Strictly speaking, these updates do not + // need to be WALed... Does it hurt? + } + } + + // Cancel leases, and removed from lease lookup. This transaction may still + // live in commitedTransactionsBySequenceNumber + private void retireTransaction(final TransactionState state) { + String key = String.valueOf(state.getTransactionId()); + try { + transactionLeases.cancelLease(key); + } catch (LeaseException e) { + // Ignore + } + + transactionsById.remove(key); + } + + private TransactionState getTransactionState(final long transactionId, + final boolean create) { + String key = String.valueOf(transactionId); + TransactionState state = null; + boolean created = false; + synchronized (transactionsById) { + state = transactionsById.get(key); + if (state == null && create) { + state = new TransactionState(nextSequenceId.get(), transactionId); + created = true; + transactionsById.put(key, state); + try { + transactionLeases.createLease(key, new TransactionListener(key)); + } catch (LeaseStillHeldException e) { + throw new RuntimeException(e); + } + } + } + + if (state != null && !created) { + try { + transactionLeases.renewLease(key); + } catch (LeaseException e) { + throw new RuntimeException(e); + } + } + + return state; + } + + /** + * Instantiated as a scanner lease. If the lease times out, the scanner is + * closed + */ + private class TransactionListener implements LeaseListener { + private final String transactionName; + + TransactionListener(final String n) { + this.transactionName = n; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Transaction " + this.transactionName + " lease expired"); + TransactionState s = null; + synchronized (transactionsById) { + s = transactionsById.remove(transactionName); + } + if (s == null) { + LOG.warn("Unknown transaction expired " + this.transactionName); + } + } + } + + /** Wrapper which keeps track of rows returned by scanner. */ + private class ScannerWrapper implements InternalScanner { + private long transactionId; + private InternalScanner scanner; + + public ScannerWrapper(final long transactionId, + final InternalScanner scanner) { + this.transactionId = transactionId; + this.scanner = scanner; + } + + public void close() throws IOException { + scanner.close(); + } + + public boolean isMultipleMatchScanner() { + return scanner.isMultipleMatchScanner(); + } + + public boolean isWildcardScanner() { + return scanner.isWildcardScanner(); + } + + public boolean next(final HStoreKey key, + final SortedMap results) throws IOException { + boolean result = scanner.next(key, results); + getTransactionState(transactionId, true).addRead(key.getRow()); + return result; + } + } +} Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 0) @@ -0,0 +1,238 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Progressable; + +/** + * RegionServer with transactional support. Transactional logic is at the region + * level, so we mostly just delegate to the appropriate TransactionalRegion. + */ +public class TransactionalRegionServer extends HRegionServer implements + TransactionalRegionInterface { + static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class); + + public TransactionalRegionServer(final HBaseConfiguration conf) + throws IOException { + super(conf); + } + + public TransactionalRegionServer(final HServerAddress address, + final HBaseConfiguration conf) throws IOException { + super(address, conf); + } + + /** {@inheritDoc} */ + @Override + public long getProtocolVersion(final String protocol, final long clientVersion) + throws IOException { + if (protocol.equals(TransactionalRegionInterface.class.getName())) { + return TransactionalRegionInterface.versionID; + } + return super.getProtocolVersion(protocol, clientVersion); + } + + @Override + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + return new TransactionalRegion(HTableDescriptor.getTableDir(super + .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super + .getFileSystem(), super.conf, regionInfo, null, super + .getFlushRequester(), new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + + protected TransactionalRegion getTransactionalRegion(final byte[] regionName) + throws NotServingRegionException { + return (TransactionalRegion) super.getRegion(regionName); + } + + public void abort(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).abort(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public void batchUpdate(final long transactionId, final byte[] regionName, + final BatchUpdate b) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).batchUpdate(transactionId, b); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public void commit(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).commit(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public boolean commitRequest(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).commitRequest(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell[] get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final int numVersions) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column, + numVersions); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell[] get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final long timestamp, + final int numVersions) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column, + timestamp, numVersions); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final long ts) throws IOException { + return getRow(transactionId, regionName, row, null, ts); + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns) throws IOException { + return getRow(transactionId, regionName, row, columns, + HConstants.LATEST_TIMESTAMP); + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns, final long ts) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + // convert the columns array into a set so it's easy to check later. + Set columnSet = null; + if (columns != null) { + columnSet = new TreeSet(Bytes.BYTES_COMPARATOR); + columnSet.addAll(Arrays.asList(columns)); + } + + TransactionalRegion region = getTransactionalRegion(regionName); + Map map = region.getFull(transactionId, row, columnSet, ts); + HbaseMapWritable result = new HbaseMapWritable(); + result.putAll(map); + return new RowResult(row, result); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + + } + + public long openScanner(final long transactionId, final byte[] regionName, + final byte[][] cols, final byte[] firstRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + checkOpen(); + NullPointerException npe = null; + if (regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if (cols == null) { + npe = new NullPointerException("columns to scan is null"); + } else if (firstRow == null) { + npe = new NullPointerException("firstRow for scanner is null"); + } + if (npe != null) { + IOException io = new IOException("Invalid arguments to openScanner"); + io.initCause(npe); + throw io; + } + super.getRequestCount().incrementAndGet(); + try { + TransactionalRegion r = getTransactionalRegion(regionName); + long scannerId = -1L; + InternalScanner s = r.getScanner(transactionId, cols, firstRow, + timestamp, filter); + scannerId = super.addScanner(s); + return scannerId; + } catch (IOException e) { + LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", + RemoteExceptionHandler.checkIOException(e)); + checkFileSystem(); + throw e; + } + } + +} Index: src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 663789) +++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -65,6 +65,7 @@ /** 'local:' */ public static final String LOCAL_COLON = LOCAL + ":"; private final HBaseConfiguration conf; + private final Class regionServerClass; /** * Constructor. @@ -98,6 +99,7 @@ // start/stop ports at different times during the life of the test. conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0"); this.regionThreads = new ArrayList(); + regionServerClass = (Class) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); for (int i = 0; i < noRegionServers; i++) { addRegionServer(); } @@ -112,7 +114,13 @@ */ public RegionServerThread addRegionServer() throws IOException { synchronized (regionThreads) { - RegionServerThread t = new RegionServerThread(new HRegionServer(conf), + HRegionServer server; + try { + server = regionServerClass.getConstructor(HBaseConfiguration.class).newInstance(conf); + } catch (Exception e) { + throw new IOException(e); + } + RegionServerThread t = new RegionServerThread(server, this.regionThreads.size()); this.regionThreads.add(t); return t; Index: src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (revision 0) +++ src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (revision 0) @@ -0,0 +1,156 @@ +/* + * $Id$ + * Created on Jun 4, 2008 + * + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; + +/** Interface for transactional region servers. + * + */ +public interface TransactionalRegionInterface extends HRegionInterface { + /** Interface version number */ + public static final long versionID = 1L; + + /** + * Retrieve a single value from the specified region for the specified row and + * column keys + * + * @param regionName name of region + * @param row row key + * @param column column key + * @return alue for that region/row/column + * @throws IOException + */ + public Cell get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column) throws IOException; + + /** + * Get the specified number of versions of the specified row and column + * + * @param regionName region name + * @param row row key + * @param column column key + * @param numVersions number of versions to return + * @return array of values + * @throws IOException + */ + public Cell[] get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final int numVersions) + throws IOException; + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param regionName region name + * @param row row key + * @param column column key + * @param timestamp timestamp + * @param numVersions number of versions to return + * @return array of values + * @throws IOException + */ + public Cell[] get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final long timestamp, + final int numVersions) throws IOException; + + /** + * Get all the data for the specified row at a given timestamp + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final long ts) throws IOException; + + /** + * Get selected columns for the specified row at a given timestamp. + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns, final long ts) + throws IOException; + + /** + * Get selected columns for the specified row at the latest timestamp. + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns) throws IOException; + + /** + * Opens a remote scanner with a RowFilter. + * + * @param transactionId + * @param regionName name of region to scan + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex for column family name. A column name is judged to be regex if + * it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row to scan + * @param timestamp only return values whose timestamp is <= this value + * @param filter RowFilter for filtering results at the row-level. + * + * @return scannerId scanner identifier used in other calls + * @throws IOException + */ + public long openScanner(final long transactionId, final byte[] regionName, + final byte[][] columns, final byte[] startRow, long timestamp, + RowFilterInterface filter) throws IOException; + + /** + * Applies a batch of updates via one RPC + * + * @param regionName name of the region to update + * @param b BatchUpdate + * @throws IOException + */ + public void batchUpdate(long transactionId, final byte[] regionName, + final BatchUpdate b) throws IOException; + + /** + * Ask if we can commit the given transaction. + * + * @param transactionId + * @return true if we can commit + */ + public boolean commitRequest(final byte[] regionName, long transactionId) + throws IOException; + + /** + * Commit the transaction. + * + * @param transactionId + * @return + */ + public void commit(final byte[] regionName, long transactionId) + throws IOException; + + /** + * Abort the transaction. + * + * @param transactionId + * @return + */ + public void abort(final byte[] regionName, long transactionId) + throws IOException; +} Index: src/java/org/apache/hadoop/hbase/util/Bytes.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/Bytes.java (revision 663789) +++ src/java/org/apache/hadoop/hbase/util/Bytes.java (working copy) @@ -133,9 +133,9 @@ * @param bytes * @return the long value */ - public static long toInt(byte[] bytes) { + public static int toInt(byte[] bytes) { if (bytes == null || bytes.length == 0) { - return -1L; + return -1; } return ByteBuffer.wrap(bytes).getInt(); } Index: src/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 663789) +++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -333,7 +333,7 @@ return this.tableName; } - protected HConnection getConnection() { + public HConnection getConnection() { return this.connection; } @@ -1232,7 +1232,7 @@ * If there are multiple regions in a table, this scanner will iterate * through them all. */ - private class ClientScanner implements Scanner { + protected class ClientScanner implements Scanner { private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); private byte[][] columns; private byte [] startRow; @@ -1270,6 +1270,18 @@ } nextScanner(); } + + protected byte[][] getColumns() { + return columns; + } + + protected long getTimestamp() { + return scanTime; + } + + protected RowFilterInterface getFilter() { + return filter; + } /* * Gets a scanner for the next region. @@ -1306,8 +1318,7 @@ } try { - callable = new ScannerCallable(getConnection(), getTableName(), columns, - localStartKey, scanTime, filter); + callable = getScannerCallable(localStartKey); // open a scanner on the region server starting at the // beginning of the region getConnection().getRegionServerWithRetries(callable); @@ -1318,6 +1329,11 @@ } return true; } + + protected ScannerCallable getScannerCallable(byte [] localStartKey) { + return new ScannerCallable(getConnection(), getTableName(), columns, + localStartKey, scanTime, filter); + } /** {@inheritDoc} */ public RowResult next() throws IOException { Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (revision 0) @@ -0,0 +1,34 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; + +public class TransactionScannerCallable extends ScannerCallable { + + private TransactionState transactionState; + + TransactionScannerCallable(final TransactionState transactionState, + final HConnection connection, final byte[] tableName, + final byte[][] columns, final byte[] startRow, final long timestamp, + final RowFilterInterface filter) { + super(connection, tableName, columns, startRow, timestamp, filter); + this.transactionState = transactionState; + } + + @Override + protected long openScanner() throws IOException { + transactionState.addRegion(location); + return ((TransactionalRegionInterface) server).openScanner(transactionState + .getTransactionId(), this.location.getRegionInfo().getRegionName(), + getColumns(), row, getTimestamp(), getFilter()); + } +} Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (revision 0) @@ -0,0 +1,117 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; + +/** + * Transaction Manager. Responsible for commiting transactions. + * + */ +public class TransactionManager { + static final Log LOG = LogFactory.getLog(TransactionManager.class); + + private final HConnection connection; + private final TransactionLogger transactionLogger; + + public TransactionManager(final TransactionLogger transactionLogger) { + this(transactionLogger, new HBaseConfiguration()); + } + + public TransactionManager(final TransactionLogger transactionLogger, + final HBaseConfiguration conf) { + this.transactionLogger = transactionLogger; + connection = HConnectionManager.getConnection(conf); + } + + /** + * Called to start a transaction. + * + * @return new transaction state + */ + public TransactionState beginTransaction() { + long transactionId = transactionLogger.createNewTransactionLog(); + return new TransactionState(transactionId); + } + + /** + * Try and commit a transaction. + * + * @param transactionState + * @return + * @throws IOException + */ + public void tryCommit(final TransactionState transactionState) + throws CommitUnsuccessfulException, IOException { + LOG.debug("atempting to commit trasaction: " + transactionState.toString()); + + for (HRegionLocation location : transactionState.getParticipatingRegions()) { + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + boolean canCommit = transactionalRegionServer + .commitRequest(location.getRegionInfo().getRegionName(), + transactionState.getTransactionId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Region [" + location.getRegionInfo().toString() + "] votes " + + (canCommit ? " to commit" : " to abort")); + } + + if (!canCommit) { + LOG.debug("Aborting [" + transactionState.getTransactionId() + "]"); + abort(transactionState, location); + throw new CommitUnsuccessfulException(); + } + } + + LOG.debug("Commiting [" + transactionState.getTransactionId() + "]"); + transactionLogger.setStatusForTransaction(transactionState + .getTransactionId(), TransactionLogger.TransactionStatus.COMMITTED); + + for (HRegionLocation location : transactionState.getParticipatingRegions()) { + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + transactionalRegionServer.commit( + location.getRegionInfo().getRegionName(), transactionState + .getTransactionId()); + } + // Tran log can be deleted now ... + } + + /** + * Abort a s transaction. + * + * @param transactionState + * @throws IOException + */ + public void abort(final TransactionState transactionState) throws IOException { + abort(transactionState, null); + } + + private void abort(final TransactionState transactionState, + final HRegionLocation locationToIgnore) throws IOException { + transactionLogger.setStatusForTransaction(transactionState + .getTransactionId(), TransactionLogger.TransactionStatus.ABORTED); + for (HRegionLocation location : transactionState.getParticipatingRegions()) { + if (locationToIgnore != null && location.equals(locationToIgnore)) { + continue; + } + + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + + transactionalRegionServer.abort(location.getRegionInfo().getRegionName(), + transactionState.getTransactionId()); + } + } +} Index: src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java (revision 0) @@ -0,0 +1,33 @@ +package org.apache.hadoop.hbase.client.transactional; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A local, in-memory implementation of the transaction logger. Useful only for + * non-distributed testing. + * + */ +public class DummyTransactionLogger implements TransactionLogger { + + private AtomicLong nextTransactionId = new AtomicLong(0); + private Map transactionIdToStatusMap = Collections + .synchronizedMap(new HashMap()); + + public long createNewTransactionLog() { + long id = nextTransactionId.getAndIncrement(); + transactionIdToStatusMap.put(id, TransactionStatus.PENDING); + return id; + } + + public TransactionStatus getStatusForTransaction(final long transactionId) { + return transactionIdToStatusMap.get(transactionId); + } + + public void setStatusForTransaction(final long transactionId, + final TransactionStatus status) { + transactionIdToStatusMap.put(transactionId, status); + } +} Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (revision 0) @@ -0,0 +1,31 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +/** + * Simple interface used to provide a log about transaction status. Written to + * by the client, and read by regionservers in case of failure. + * + */ +public interface TransactionLogger { + + enum TransactionStatus { + PENDING, COMMITTED, ABORTED + } + + /** + * Create a new transaction log. Return the transaction's globally unique id. + * Log's initial value should be PENDING + * + * @return transaction id + */ + long createNewTransactionLog(); + + TransactionStatus getStatusForTransaction(long transactionId); + + void setStatusForTransaction(long transactionId, TransactionStatus status); + +} Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java (revision 0) @@ -0,0 +1,51 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionLocation; + +/** + * Holds client-side transactional information. Client's use them as opaque + * objects passed around to transactional operations. + * + */ +public class TransactionState { + static final Log LOG = LogFactory.getLog(TransactionState.class); + + private final long transactionId; + + private Set participatingRegions = new HashSet(); + + TransactionState(final long transactionId) { + this.transactionId = transactionId; + } + + void addRegion(final HRegionLocation hregion) { + if (!participatingRegions.contains(hregion)) { + LOG.debug("Adding new hregion [" + hregion.toString() + + "] to transaction [" + transactionId + "]"); + participatingRegions.add(hregion); + } + } + + Set getParticipatingRegions() { + return participatingRegions; + } + + /** + * Get the transactionId. + * + * @return Return the transactionId. + */ + long getTransactionId() { + return transactionId; + } +} Index: src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java (revision 0) @@ -0,0 +1,29 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +/** Thrown when a transaction cannot be committed. + * + */ +public class CommitUnsuccessfulException extends Exception { + + public CommitUnsuccessfulException() { + super(); + } + + public CommitUnsuccessfulException(String arg0, Throwable arg1) { + super(arg0, arg1); + } + + public CommitUnsuccessfulException(String arg0) { + super(arg0); + } + + public CommitUnsuccessfulException(Throwable arg0) { + super(arg0); + } + +} Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (revision 0) @@ -0,0 +1,372 @@ +/* + * $Id$ + * Created on Jun 5, 2008 + * + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.io.Text; + +/** Table with transactional support. + * + */ +public class TransactionalTable extends HTable { + + public TransactionalTable(final HBaseConfiguration conf, + final String tableName) throws IOException { + super(conf, tableName); + } + + public TransactionalTable(final HBaseConfiguration conf, final Text tableName) + throws IOException { + super(conf, tableName); + } + + public TransactionalTable(final HBaseConfiguration conf, + final byte[] tableName) throws IOException { + super(conf, tableName); + } + + private static abstract class TransactionalServerCallable extends + ServerCallable { + protected TransactionState transactionState; + + protected TransactionalRegionInterface getTransactionServer() { + return (TransactionalRegionInterface) server; + } + + protected void recordServer() { + transactionState.addRegion(location); + } + + public TransactionalServerCallable(final HConnection connection, + final byte[] tableName, final byte[] row, + final TransactionState transactionState) { + super(connection, tableName, row); + this.transactionState = transactionState; + } + + } + + /** + * Get a single value for the specified row and column + * + * @param row row key + * @param column column name + * @return value for specified row/column + * @throws IOException + */ + public Cell get(final TransactionState transactionState, final byte[] row, + final byte[] column) throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column); + } + }); + } + + /** + * Get the specified number of versions of the specified row and column + * + * @param row - row key + * @param column - column name + * @param numVersions - number of versions to retrieve + * @return - array byte values + * @throws IOException + */ + public Cell[] get(final TransactionState transactionState, final byte[] row, + final byte[] column, final int numVersions) throws IOException { + Cell[] values = null; + values = super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell[] call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column, + numVersions); + } + }); + + // FIXME, not sure why this is needed, but copied over from HTable + if (values != null) { + ArrayList cellValues = new ArrayList(); + for (int i = 0; i < values.length; i++) { + cellValues.add(values[i]); + } + return cellValues.toArray(new Cell[values.length]); + } + return null; + } + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param row - row key + * @param column - column name + * @param timestamp - timestamp + * @param numVersions - number of versions to retrieve + * @return - array of values that match the above criteria + * @throws IOException + */ + public Cell[] get(final TransactionState transactionState, final byte[] row, + final byte[] column, final long timestamp, final int numVersions) + throws IOException { + Cell[] values = null; + values = super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell[] call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column, + timestamp, numVersions); + } + }); + + if (values != null) { + ArrayList cellValues = new ArrayList(); + for (int i = 0; i < values.length; i++) { + cellValues.add(values[i]); + } + return cellValues.toArray(new Cell[values.length]); + } + return null; + } + + /** + * Get all the data for the specified row at the latest timestamp + * + * @param row row key + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row) throws IOException { + return getRow(transactionState, row, HConstants.LATEST_TIMESTAMP); + } + + /** + * Get all the data for the specified row at a specified timestamp + * + * @param row row key + * @param ts timestamp + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final long ts) throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public RowResult call() throws IOException { + recordServer(); + return getTransactionServer().getRow( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, ts); + } + }); + } + + /** + * Get selected columns for the specified row at the latest timestamp + * + * @param row row key + * @param columns Array of column names you want to retrieve. + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final byte[][] columns) throws IOException { + return getRow(transactionState, row, columns, HConstants.LATEST_TIMESTAMP); + } + + /** + * Get selected columns for the specified row at a specified timestamp + * + * @param row row key + * @param columns Array of column names you want to retrieve. + * @param ts timestamp + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final byte[][] columns, final long ts) + throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public RowResult call() throws IOException { + recordServer(); + return getTransactionServer().getRow( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, columns, ts); + } + }); + } + + /** + * Get a scanner on the current table starting at first row. Return the + * specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns) throws IOException { + return getScanner(transactionState, columns, HConstants.EMPTY_START_ROW, + HConstants.LATEST_TIMESTAMP, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow) throws IOException { + return getScanner(transactionState, columns, startRow, + HConstants.LATEST_TIMESTAMP, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, final long timestamp) + throws IOException { + return getScanner(transactionState, columns, startRow, timestamp, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, + final RowFilterInterface filter) throws IOException { + return getScanner(transactionState, columns, startRow, + HConstants.LATEST_TIMESTAMP, filter); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + return new TransactionalClientScanner(transactionState, columns, startRow, + timestamp, filter); + } + + /** + * Commit a BatchUpdate to the table. + * + * @param batchUpdate + * @throws IOException + */ + public synchronized void commit(final TransactionState transactionState, + final BatchUpdate batchUpdate) throws IOException { + super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), batchUpdate.getRow(), transactionState) { + public Boolean call() throws IOException { + recordServer(); + getTransactionServer().batchUpdate( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), batchUpdate); + return null; + } + }); + } + + protected class TransactionalClientScanner extends HTable.ClientScanner { + + private TransactionState transactionState; + + protected TransactionalClientScanner( + final TransactionState transactionState, final byte[][] columns, + final byte[] startRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + super(columns, startRow, timestamp, filter); + this.transactionState = transactionState; + } + + @Override + protected ScannerCallable getScannerCallable(final byte[] localStartKey) { + return new TransactionScannerCallable(transactionState, getConnection(), + getTableName(), getColumns(), localStartKey, getTimestamp(), + getFilter()); + } + } + +} Index: src/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 663789) +++ src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.RowResult; @@ -38,7 +39,7 @@ private final long timestamp; private final RowFilterInterface filter; - ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns, + protected ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns, byte [] startRow, long timestamp, RowFilterInterface filter) { super(connection, tableName, startRow); this.columns = columns; @@ -65,15 +66,31 @@ scannerId = -1L; } else if (scannerId == -1L && !closed) { // open the scanner - scannerId = server.openScanner( - this.location.getRegionInfo().getRegionName(), columns, row, - timestamp, filter); + scannerId = openScanner(); } else { return server.next(scannerId); } return null; } + protected long openScanner() throws IOException { + return server.openScanner( + this.location.getRegionInfo().getRegionName(), columns, row, + timestamp, filter); + } + + protected byte [][] getColumns() { + return columns; + } + + protected long getTimestamp() { + return timestamp; + } + + protected RowFilterInterface getFilter() { + return filter; + } + /** * Call this when the next invocation of call should close the scanner */