getTransactionsToCheck() {
+ return additionalTransactionsToCheck;
+ }
+
+ /**
+ * Get the status.
+ *
+ * @return Return the status.
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Set the status.
+ *
+ * @param status The status to set.
+ */
+ public void setStatus(final Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Get the startSequenceNumber.
+ *
+ * @return Return the startSequenceNumber.
+ */
+ public int getStartSequenceNumber() {
+ return startSequenceNumber;
+ }
+
+ /**
+ * Set the startSequenceNumber.
+ *
+ * @param startSequenceNumber.
+ */
+ public void setStartSequenceNumber(int startSequenceNumber) {
+ this.startSequenceNumber = startSequenceNumber;
+ }
+
+ /**
+ * Get the sequenceNumber.
+ *
+ * @return Return the sequenceNumber.
+ */
+ public Integer getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ /**
+ * Set the sequenceNumber.
+ *
+ * @param sequenceNumber The sequenceNumber to set.
+ */
+ public void setSequenceNumber(final Integer sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ result.append("[transactionId: ");
+ result.append(transactionId);
+ result.append(" status: ");
+ result.append(status.name());
+ result.append(" read Size: ");
+ result.append(readSet.size());
+ result.append(" write Size: ");
+ result.append(writeSet.size());
+ result.append(" startSQ: ");
+ result.append(startSequenceNumber);
+ if (sequenceNumber != null) {
+ result.append(" commitedSQ:");
+ result.append(sequenceNumber);
+ }
+ result.append("]");
+
+ return result.toString();
+ }
+
+ /**
+ * Get the transactionId.
+ *
+ * @return Return the transactionId.
+ */
+ public long getTransactionId() {
+ return transactionId;
+ }
+
+ /**
+ * Get the startSequenceId.
+ * @return Return the startSequenceId.
+ */
+ public long getHLogStartSequenceId() {
+ return hLogStartSequenceId;
+ }
+
+}
Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 0)
@@ -0,0 +1,625 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.LeaseException;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Regionserver which provides transactional support for atomic transactions.
+ * This is achieved with optimistic concurrency control (see
+ * http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf). We keep track
+ * read and write sets for each transaction, and hold off on processing the
+ * writes. To decide to commit a transaction we check its read sets with all
+ * transactions that have committed while it was running for overlaps.
+ *
+ * Because transactions can span multiple regions, all regions must agree to
+ * commit a transactions. The client side of this commit protocol is encoded in
+ * org.apache.hadoop.hbase.client.transactional.TransactionManger
+ *
+ * In the event of an failure of the client mid-commit, (after we voted yes), we
+ * will have to consult the transaction log to determine the final decision of
+ * the transaction. This is not yet implemented.
+ */
+class TransactionalRegion extends HRegion {
+
+ private static final String LEASE_TIME = "hbase.transaction.leaseTime";
+ private static final int DEFAULT_LEASE_TIME = 60 * 1000;
+ private static final int LEASE_CHECK_FREQUENCY = 1000;
+
+ private static final Log LOG = LogFactory.getLog(TransactionalRegion.class);
+
+ // Collection of active transactions (PENDING) keyed by id.
+ private Map transactionsById = new HashMap();
+
+ // Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by
+ // their sequence number
+ private SortedMap commitedTransactionsBySequenceNumber = Collections
+ .synchronizedSortedMap(new TreeMap());
+
+ // Collection of transactions that are COMMIT_PENDING
+ private Set commitPendingTransactions = Collections
+ .synchronizedSet(new HashSet());
+
+ private final Leases transactionLeases;
+ private AtomicInteger nextSequenceId = new AtomicInteger(0);
+ private Object commitCheckLock = new Object();
+ private TransactionalHLogManager logManager;
+
+ public TransactionalRegion(final Path basedir, final HLog log,
+ final FileSystem fs, final HBaseConfiguration conf,
+ final HRegionInfo regionInfo, final FlushRequester flushListener) {
+ super(basedir, log, fs, conf, regionInfo, flushListener);
+ transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME),
+ LEASE_CHECK_FREQUENCY);
+ logManager = new TransactionalHLogManager(this);
+ }
+
+ @Override
+ protected void doReconstructionLog(final Path oldLogFile,
+ final long maxSeqId, final Progressable reporter)
+ throws UnsupportedEncodingException, IOException {
+ super.doReconstructionLog(oldLogFile, maxSeqId, reporter);
+
+ Map> commitedTransactionsById = logManager
+ .getCommitsFromLog(oldLogFile, maxSeqId, reporter);
+
+ if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
+ LOG.debug("found " + commitedTransactionsById.size()
+ + " COMMITED transactions");
+
+ for (Entry> entry : commitedTransactionsById
+ .entrySet()) {
+ LOG.debug("Writing " + entry.getValue().size()
+ + " updates for transaction " + entry.getKey());
+ for (BatchUpdate b : entry.getValue()) {
+ super.batchUpdate(b, true); // These are walled so they live forever
+ }
+ }
+
+ // LOG.debug("Flushing cache"); // We must trigger a cache flush,
+ // otherwise
+ // we will would ignore the log on subsequent failure
+ // if (!super.flushcache()) {
+ // LOG.warn("Did not flush cache");
+ // }
+ }
+ }
+
+ /**
+ * We need to make sure that we don't complete a cache flush between running
+ * transactions. If we did, then we would not find all log messages needed to
+ * restore the transaction, as some of them would be before the last
+ * "complete" flush id.
+ */
+ @Override
+ protected long getCompleteCacheFlushSequenceId(final long currentSequenceId) {
+ long minPendingStartSequenceId = currentSequenceId;
+ for (TransactionState transactionState : transactionsById.values()) {
+ minPendingStartSequenceId = Math.max(minPendingStartSequenceId,
+ transactionState.getHLogStartSequenceId());
+ }
+ return minPendingStartSequenceId;
+ }
+
+ public void beginTransaction(final long transactionId) throws IOException {
+ String key = String.valueOf(transactionId);
+ if (transactionsById.get(key) != null) {
+ TransactionState alias = getTransactionState(transactionId);
+ if (alias != null) {
+ alias.setStatus(Status.ABORTED);
+ retireTransaction(alias);
+ }
+ throw new IOException("Already exiting transaction id: " + key);
+ }
+
+ TransactionState state = new TransactionState(transactionId, super.getLog()
+ .getSequenceNumber());
+
+ // Order is important here
+ for (TransactionState commitPending : commitPendingTransactions) {
+ state.addTransactionToCheck(commitPending);
+ }
+ state.setStartSequenceNumber(nextSequenceId.get());
+
+ transactionsById.put(String.valueOf(key), state);
+ try {
+ transactionLeases.createLease(key, new TransactionLeaseListener(key));
+ } catch (LeaseStillHeldException e) {
+ throw new RuntimeException(e);
+ }
+ LOG.debug("Begining transaction " + key + " in region "
+ + super.getRegionInfo().getRegionNameAsString());
+ logManager.writeStartToLog(transactionId);
+ }
+
+ /**
+ * Fetch a single data item.
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @return column value
+ * @throws IOException
+ */
+ public Cell get(final long transactionId, final byte[] row,
+ final byte[] column) throws IOException {
+ Cell[] results = get(transactionId, row, column, 1);
+ return (results == null || results.length == 0) ? null : results[0];
+ }
+
+ /**
+ * Fetch multiple versions of a single data item
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @param numVersions
+ * @return array of values one element per version
+ * @throws IOException
+ */
+ public Cell[] get(final long transactionId, final byte[] row,
+ final byte[] column, final int numVersions) throws IOException {
+ return get(transactionId, row, column, Long.MAX_VALUE, numVersions);
+ }
+
+ /**
+ * Fetch multiple versions of a single data item, with timestamp.
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @param timestamp
+ * @param numVersions
+ * @return array of values one element per version that matches the timestamp
+ * @throws IOException
+ */
+ public Cell[] get(final long transactionId, final byte[] row,
+ final byte[] column, final long timestamp, final int numVersions)
+ throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+
+ state.addRead(row);
+
+ Cell [] localCells = state.localGet(row, column, timestamp);
+
+ if (localCells != null && localCells.length > 0) {
+ LOG.info("Transactional get of something we've written in the same transaction "+transactionId);
+
+ if (numVersions > 1) {
+ Cell [] globalCells = get(row, column, timestamp, numVersions-1);
+ Cell [] result = new Cell[globalCells.length + localCells.length];
+ System.arraycopy(localCells, 0, result, 0, localCells.length);
+ System.arraycopy(globalCells, 0, result, localCells.length, globalCells.length);
+ return result;
+ }
+ return localCells;
+ }
+
+ return get(row, column, timestamp, numVersions);
+ }
+
+ /**
+ * Fetch all the columns for the indicated row at a specified timestamp.
+ * Returns a TreeMap that maps column names to values.
+ *
+ * @param transactionId
+ * @param row
+ * @param columns Array of columns you'd like to retrieve. When null, get all.
+ * @param ts
+ * @return Map values
+ * @throws IOException
+ */
+ public Map getFull(final long transactionId, final byte[] row,
+ final Set columns, final long ts) throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+
+ state.addRead(row);
+
+ Map localCells = state.localGetFull(row, columns, ts);
+
+ if (localCells != null && localCells.size() > 0) {
+ LOG.info("Transactional get of something we've written in the same transaction "+transactionId);
+ Map internalResults = getFull(row, columns, ts);
+ internalResults.putAll(localCells);
+ return internalResults;
+ }
+
+ return getFull(row, columns, ts);
+ }
+
+ /**
+ * Return an iterator that scans over the HRegion, returning the indicated
+ * columns for only the rows that match the data filter. This Iterator must be
+ * closed by the caller.
+ *
+ * @param transactionId
+ * @param cols columns to scan. If column name is a column family, all columns
+ * of the specified column family are returned. Its also possible to pass a
+ * regex in the column qualifier. A column qualifier is judged to be a regex
+ * if it contains at least one of the following characters:
+ * \+|^&*$[]]}{)(.
+ * @param firstRow row which is the starting point of the scan
+ * @param timestamp only return rows whose timestamp is <= this value
+ * @param filter row filter
+ * @return InternalScanner
+ * @throws IOException
+ */
+ public InternalScanner getScanner(final long transactionId,
+ final byte[][] cols, final byte[] firstRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
+ timestamp, filter));
+ }
+
+ /**
+ * Add a write to the transaction. Does not get applied until commit process.
+ *
+ * @param b
+ * @throws IOException
+ */
+ public void batchUpdate(final long transactionId, final BatchUpdate b)
+ throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+ state.addWrite(b);
+ logManager.writeUpdateToLog(transactionId, b);
+ }
+
+ public boolean commitRequest(final long transactionId) throws IOException {
+ synchronized (commitCheckLock) {
+ TransactionState state = getTransactionState(transactionId);
+ if (state == null) {
+ return false;
+ }
+
+ if (hasConflict(state)) {
+ state.setStatus(Status.ABORTED);
+ retireTransaction(state);
+ return false;
+ }
+
+ // No conflicts, we can commit.
+ LOG.trace("No conflicts for transaction " + transactionId
+ + " found in region " + super.getRegionInfo().getRegionNameAsString()
+ + ". Voting for commit");
+ state.setStatus(Status.COMMIT_PENDING);
+
+ // If there are writes we must keep record off the transaction
+ if (state.getWriteSet().size() > 0) {
+ // Order is important
+ commitPendingTransactions.add(state);
+ state.setSequenceNumber(nextSequenceId.getAndIncrement());
+ commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(),
+ state);
+ }
+
+ return true;
+ }
+ }
+
+ private boolean hasConflict(final TransactionState state) {
+ // Check against transactions that committed while we were running
+ for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) {
+ TransactionState other = commitedTransactionsBySequenceNumber.get(i);
+ if (other == null) {
+ continue;
+ }
+ if (hasConflict(state, other)) {
+ return true;
+
+ }
+ }
+
+ // Check against transactions that were COMMIT_PENDING when we started
+ for (TransactionState other : state.getTransactionsToCheck()) {
+ if (other.getStatus().equals(TransactionState.Status.ABORTED)) {
+ continue;
+ }
+ if (hasConflict(state, other)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean hasConflict(final TransactionState checkTransaction,
+ final TransactionState checkAgainst) {
+ for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) {
+ if (checkTransaction.getReadSet().contains(otherUpdate.getRow())) {
+ LOG.trace("Transaction " + checkTransaction.toString()
+ + " conflicts with " + checkAgainst.toString());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Commit the transaction.
+ *
+ * @param transactionId
+ * @return
+ * @throws IOException
+ */
+ public void commit(final long transactionId) throws IOException {
+ TransactionState state;
+ try {
+ state = getTransactionState(transactionId);
+ } catch (UnknownTransactionException e) {
+ LOG.fatal("Asked to commit unknown transaction: " + transactionId
+ + " in region " + super.getRegionInfo().getRegionNameAsString());
+ // FIXME Write to the transaction log that this transaction was corrupted
+ throw e;
+ }
+
+ if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
+ LOG.fatal("Asked to commit a non pending transaction");
+ // FIXME Write to the transaction log that this transaction was corrupted
+ throw new IOException("commit failure");
+ }
+
+ commit(state);
+ }
+
+ /**
+ * Commit the transaction.
+ *
+ * @param transactionId
+ * @return
+ * @throws IOException
+ */
+ public void abort(final long transactionId) throws IOException {
+ TransactionState state;
+ try {
+ state = getTransactionState(transactionId);
+ } catch (UnknownTransactionException e) {
+ LOG.error("Asked to abort unknown transaction: " + transactionId);
+ return;
+ }
+
+ state.setStatus(Status.ABORTED);
+
+ if (state.getWriteSet().size() > 0) {
+ logManager.writeAbortToLog(state.getTransactionId());
+ }
+
+ // Following removes needed if we have voted
+ if (state.getSequenceNumber() != null) {
+ commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber());
+ }
+ commitPendingTransactions.remove(state);
+
+ retireTransaction(state);
+ }
+
+ private void commit(final TransactionState state) throws IOException {
+
+ LOG.debug("Commiting transaction: " + state.toString() + " to "
+ + super.getRegionInfo().getRegionNameAsString());
+
+ if (state.getWriteSet().size() > 0) {
+ logManager.writeCommitToLog(state.getTransactionId());
+ }
+
+ for (BatchUpdate update : state.getWriteSet()) {
+ super.batchUpdate(update, false); // Don't need to WAL these
+ // FIME, maybe should be walled so we don't need to look so far back.
+ }
+
+ state.setStatus(Status.COMMITED);
+ if (state.getWriteSet().size() > 0
+ && !commitPendingTransactions.remove(state)) {
+ LOG
+ .fatal("Commiting a non-query transaction that is not in commitPendingTransactions");
+ throw new IOException("commit failure"); // FIXME, how to handle?
+ }
+ retireTransaction(state);
+ }
+
+ // Cancel leases, and removed from lease lookup. This transaction may still
+ // live in commitedTransactionsBySequenceNumber and commitPendingTransactions
+ private void retireTransaction(final TransactionState state) {
+ String key = String.valueOf(state.getTransactionId());
+ try {
+ transactionLeases.cancelLease(key);
+ } catch (LeaseException e) {
+ // Ignore
+ }
+
+ transactionsById.remove(key);
+ }
+
+ private TransactionState getTransactionState(final long transactionId)
+ throws UnknownTransactionException {
+ String key = String.valueOf(transactionId);
+ TransactionState state = null;
+
+ state = transactionsById.get(key);
+
+ if (state == null) {
+ LOG.trace("Unknown transaction: " + key);
+ throw new UnknownTransactionException(key);
+ }
+
+ try {
+ transactionLeases.renewLease(key);
+ } catch (LeaseException e) {
+ throw new RuntimeException(e);
+ }
+
+ return state;
+ }
+
+ /**
+ * Cleanup references to committed transactions that are no longer needed.
+ *
+ */
+ void removeUnNeededCommitedTransactions() {
+ Integer minStartSeqNumber = getMinStartSequenceNumber();
+ if (minStartSeqNumber == null) {
+ minStartSeqNumber = Integer.MAX_VALUE; // Remove all
+ }
+
+ Iterator> it = commitedTransactionsBySequenceNumber
+ .entrySet().iterator();
+
+ int numRemoved = 0;
+ while (it.hasNext()) {
+ if (it.next().getKey() >= minStartSeqNumber) {
+ break;
+ }
+ it.remove();
+ numRemoved++;
+ }
+
+ LOG.debug("Removed " + numRemoved
+ + " commited transactions with sequence lower than "
+ + minStartSeqNumber);
+ }
+
+ private Integer getMinStartSequenceNumber() {
+ Integer min = null;
+ for (TransactionState transactionState : transactionsById.values()) {
+ if (min == null || transactionState.getStartSequenceNumber() < min) {
+ min = transactionState.getStartSequenceNumber();
+ }
+ }
+ return min;
+ }
+
+ // TODO, resolve from the global transaction log
+ private void resolveTransactionFromLog(final long transactionId) {
+ throw new RuntimeException("NYI");
+ }
+
+ private class TransactionLeaseListener implements LeaseListener {
+ private final String transactionName;
+
+ TransactionLeaseListener(final String n) {
+ this.transactionName = n;
+ }
+
+ /** {@inheritDoc} */
+ public void leaseExpired() {
+ LOG.info("Transaction " + this.transactionName + " lease expired");
+ TransactionState s = null;
+ synchronized (transactionsById) {
+ s = transactionsById.remove(transactionName);
+ }
+ if (s == null) {
+ LOG.warn("Unknown transaction expired " + this.transactionName);
+ return;
+ }
+
+ switch (s.getStatus()) {
+ case PENDING:
+ s.setStatus(Status.ABORTED); // Other transactions may have a ref
+ break;
+ case COMMIT_PENDING:
+ LOG.info("Transaction " + s.getTransactionId()
+ + " expired in COMMIT_PENDING state");
+ LOG.info("Checking transaction status in transaction log");
+ resolveTransactionFromLog(s.getTransactionId());
+ break;
+ default:
+ LOG.warn("Unexpected status on expired lease");
+ }
+ }
+ }
+
+ /** Wrapper which keeps track of rows returned by scanner. */
+ private class ScannerWrapper implements InternalScanner {
+ private long transactionId;
+ private InternalScanner scanner;
+
+ public ScannerWrapper(final long transactionId,
+ final InternalScanner scanner) {
+ this.transactionId = transactionId;
+ this.scanner = scanner;
+ }
+
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ public boolean isMultipleMatchScanner() {
+ return scanner.isMultipleMatchScanner();
+ }
+
+ public boolean isWildcardScanner() {
+ return scanner.isWildcardScanner();
+ }
+
+ public boolean next(final HStoreKey key,
+ final SortedMap results) throws IOException {
+ boolean result = scanner.next(key, results);
+ TransactionState state = getTransactionState(transactionId);
+ state.addRead(key.getRow());
+
+ if(result) {
+ Map localWrites = state.localGetFull(key.getRow(), null, Integer.MAX_VALUE);
+ if (localWrites != null) {
+ LOG.info("Scanning over row that has been writen to "+transactionId);
+ for(Entry entry : localWrites.entrySet()) {
+ results.put(entry.getKey(), entry.getValue().getValue());
+ }
+ }
+ }
+
+ return result;
+ }
+ }
+}
Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java (revision 0)
@@ -0,0 +1,254 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HLogEdit;
+import org.apache.hadoop.hbase.regionserver.HLogKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Responsible for writing and reading (recovering) transactional information
+ * to/from the HLog.
+ *
+ *
+ */
+class TransactionalHLogManager {
+
+ private static final Log LOG = LogFactory
+ .getLog(TransactionalHLogManager.class);
+
+ private final HLog hlog;
+ private final FileSystem fileSystem;
+ private final HRegionInfo regionInfo;
+ private final HBaseConfiguration conf;
+
+ public TransactionalHLogManager(final TransactionalRegion region) {
+ this.hlog = region.getLog();
+ this.fileSystem = region.getFilesystem();
+ this.regionInfo = region.getRegionInfo();
+ this.conf = region.getConf();
+ }
+
+ // For Testing
+ TransactionalHLogManager(final HLog hlog, final FileSystem fileSystem,
+ final HRegionInfo regionInfo, final HBaseConfiguration conf) {
+ this.hlog = hlog;
+ this.fileSystem = fileSystem;
+ this.regionInfo = regionInfo;
+ this.conf = conf;
+ }
+
+ public void writeStartToLog(final long transactionId) throws IOException {
+ HLogEdit logEdit;
+ logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.START);
+
+ hlog.append(regionInfo, logEdit);
+ }
+
+ public void writeUpdateToLog(final long transactionId,
+ final BatchUpdate update) throws IOException {
+
+ long commitTime = update.getTimestamp() == HConstants.LATEST_TIMESTAMP ? System
+ .currentTimeMillis()
+ : update.getTimestamp();
+
+ for (BatchOperation op : update) {
+ HLogEdit logEdit = new HLogEdit(transactionId, op, commitTime);
+ hlog.append(regionInfo, update.getRow(), logEdit);
+ }
+ }
+
+ public void writeCommitToLog(final long transactionId) throws IOException {
+ HLogEdit logEdit;
+ logEdit = new HLogEdit(transactionId,
+ HLogEdit.TransactionalOperation.COMMIT);
+
+ hlog.append(regionInfo, logEdit);
+ }
+
+ public void writeAbortToLog(final long transactionId) throws IOException {
+ HLogEdit logEdit;
+ logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT);
+
+ hlog.append(regionInfo, logEdit);
+ }
+
+ public Map> getCommitsFromLog(
+ final Path reconstructionLog, final long maxSeqID,
+ final Progressable reporter) throws UnsupportedEncodingException,
+ IOException {
+ if (reconstructionLog == null || !fileSystem.exists(reconstructionLog)) {
+ // Nothing to do.
+ return null;
+ }
+ // Check its not empty.
+ FileStatus[] stats = fileSystem.listStatus(reconstructionLog);
+ if (stats == null || stats.length == 0) {
+ LOG.warn("Passed reconstruction log " + reconstructionLog
+ + " is zero-length");
+ return null;
+ }
+
+ SortedMap> pendingTransactionsById = new TreeMap>();
+ SortedMap> commitedTransactionsById = new TreeMap>();
+ Set abortedTransactions = new HashSet();
+
+ SequenceFile.Reader logReader = new SequenceFile.Reader(fileSystem,
+ reconstructionLog, conf);
+
+ try {
+ HLogKey key = new HLogKey();
+ HLogEdit val = new HLogEdit();
+ long skippedEdits = 0;
+ long totalEdits = 0;
+ long startCount = 0;
+ long writeCount = 0;
+ long abortCount = 0;
+ long commitCount = 0;
+ // How many edits to apply before we send a progress report.
+ int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
+ 2000);
+ while (logReader.next(key, val)) {
+ LOG.debug("Processing edit: key: "+key.toString()+" val: " + val.toString());
+ if (key.getLogSeqNum() <= maxSeqID) {
+ skippedEdits++;
+ continue;
+ }
+
+ // Check this edit is for me.
+ byte[] column = val.getColumn();
+ Long transactionId = val.getTransactionId();
+ if (!val.isTransactionEntry() || HLog.isMetaColumn(column)
+ || !Bytes.equals(key.getRegionName(), regionInfo.getRegionName())) {
+ continue;
+ }
+
+ List updates = pendingTransactionsById.get(transactionId);
+ switch (val.getOperation()) {
+ case START:
+ if (updates != null || abortedTransactions.contains(transactionId)
+ || commitedTransactionsById.containsKey(transactionId)) {
+ LOG.error("Processing start for transaction: " + transactionId
+ + ", but have already seen start message");
+ throw new IOException("Corrupted transaction log");
+ }
+ updates = new LinkedList();
+ pendingTransactionsById.put(transactionId, updates);
+ startCount++;
+ break;
+
+ case WRITE:
+ if (updates == null) {
+ LOG.error("Processing edit for transaction: " + transactionId
+ + ", but have not seen start message");
+ throw new IOException("Corrupted transaction log");
+ }
+
+ BatchUpdate tranUpdate = new BatchUpdate(key.getRow());
+ tranUpdate.put(val.getColumn(), val.getVal());
+ updates.add(tranUpdate);
+ writeCount++;
+ break;
+
+ case ABORT:
+ if (updates == null) {
+ LOG.error("Processing abort for transaction: " + transactionId
+ + ", but have not seen start message");
+ throw new IOException("Corrupted transaction log");
+ }
+ abortedTransactions.add(transactionId);
+ abortCount++;
+ break;
+
+ case COMMIT:
+ if (updates == null) {
+ LOG.error("Processing commit for transaction: " + transactionId
+ + ", but have not seen start message");
+ throw new IOException("Corrupted transaction log");
+ }
+ if (abortedTransactions.contains(transactionId)) {
+ LOG.error("Processing commit for transaction: " + transactionId
+ + ", but also have abort message");
+ throw new IOException("Corrupted transaction log");
+ }
+ if (updates.size() == 0) {
+ LOG
+ .warn("Transaciton " + transactionId
+ + " has no writes in log. ");
+ }
+ if (commitedTransactionsById.containsKey(transactionId)) {
+ LOG.error("Processing commit for transaction: " + transactionId
+ + ", but have already commited transaction with that id");
+ throw new IOException("Corrupted transaction log");
+ }
+ pendingTransactionsById.remove(transactionId);
+ commitedTransactionsById.put(transactionId, updates);
+ commitCount++;
+ }
+ totalEdits++;
+
+ if (reporter != null && (totalEdits % reportInterval) == 0) {
+ reporter.progress();
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Read " + totalEdits + " tranasctional operations (skipped "
+ + skippedEdits + " because sequence id <= " + maxSeqID + "): "
+ + startCount + " starts, " + writeCount + " writes, " + abortCount
+ + " aborts, and " + commitCount + " commits.");
+ }
+ } finally {
+ logReader.close();
+ }
+
+ if (pendingTransactionsById.size() > 0) {
+ LOG
+ .info("Region log has "
+ + pendingTransactionsById.size()
+ + " unfinished transactions. Going to the transaction log to resolve");
+ throw new RuntimeException("Transaction log not yet implemented");
+ }
+
+ return commitedTransactionsById;
+ }
+}
Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (revision 0)
@@ -0,0 +1,53 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Cleans up committed transactions when they are no longer needed to verify
+ * pending transactions.
+ */
+class CleanOldTransactionsChore extends Chore {
+
+ private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
+ private static final int DEFAULT_SLEEP = 60 * 1000;
+
+ private final TransactionalRegionServer regionServer;
+
+ public CleanOldTransactionsChore(
+ final TransactionalRegionServer regionServer,
+ final AtomicBoolean stopRequest) {
+ super(regionServer.getConfiguration().getInt(SLEEP_CONF, DEFAULT_SLEEP),
+ stopRequest);
+ this.regionServer = regionServer;
+ }
+
+ @Override
+ protected void chore() {
+ for (HRegion region : regionServer.getOnlineRegions()) {
+ ((TransactionalRegion) region).removeUnNeededCommitedTransactions();
+ }
+ }
+
+}
Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 0)
@@ -0,0 +1,283 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * RegionServer with support for transactions. Transactional logic is at the
+ * region level, so we mostly just delegate to the appropriate
+ * TransactionalRegion.
+ */
+public class TransactionalRegionServer extends HRegionServer implements
+ TransactionalRegionInterface {
+ static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class);
+
+ private final CleanOldTransactionsChore cleanOldTransactionsThread;
+
+ public TransactionalRegionServer(final HBaseConfiguration conf)
+ throws IOException {
+ this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+ DEFAULT_REGIONSERVER_ADDRESS)), conf);
+ }
+
+ public TransactionalRegionServer(final HServerAddress address,
+ final HBaseConfiguration conf) throws IOException {
+ super(address, conf);
+ cleanOldTransactionsThread = new CleanOldTransactionsChore(this,
+ super.stopRequested);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getProtocolVersion(final String protocol, final long clientVersion)
+ throws IOException {
+ if (protocol.equals(TransactionalRegionInterface.class.getName())) {
+ return TransactionalRegionInterface.versionID;
+ }
+ return super.getProtocolVersion(protocol, clientVersion);
+ }
+
+ @Override
+ protected void init(final MapWritable c) throws IOException {
+ super.init(c);
+ String n = Thread.currentThread().getName();
+ UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ abort();
+ LOG.fatal("Set stop flag in " + t.getName(), e);
+ }
+ };
+ Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n
+ + ".oldTransactionCleaner", handler);
+
+ }
+
+ @Override
+ protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+ throws IOException {
+ HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
+ .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+ .getFileSystem(), super.conf, regionInfo, super
+ .getFlushRequester());
+ r.initialize(null, new Progressable() {
+ public void progress() {
+ addProcessingMessage(regionInfo);
+ }
+ });
+ return r;
+ }
+
+ protected TransactionalRegion getTransactionalRegion(final byte[] regionName)
+ throws NotServingRegionException {
+ return (TransactionalRegion) super.getRegion(regionName);
+ }
+
+ public void abort(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).abort(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void batchUpdate(final long transactionId, final byte[] regionName,
+ final BatchUpdate b) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).batchUpdate(transactionId, b);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void commit(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).commit(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public boolean commitRequest(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).commitRequest(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell[] get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final int numVersions)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column,
+ numVersions);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell[] get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final long timestamp,
+ final int numVersions) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column,
+ timestamp, numVersions);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final long ts) throws IOException {
+ return getRow(transactionId, regionName, row, null, ts);
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns) throws IOException {
+ return getRow(transactionId, regionName, row, columns,
+ HConstants.LATEST_TIMESTAMP);
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns, final long ts)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ // convert the columns array into a set so it's easy to check later.
+ Set columnSet = null;
+ if (columns != null) {
+ columnSet = new TreeSet(Bytes.BYTES_COMPARATOR);
+ columnSet.addAll(Arrays.asList(columns));
+ }
+
+ TransactionalRegion region = getTransactionalRegion(regionName);
+ Map map = region.getFull(transactionId, row, columnSet, ts);
+ HbaseMapWritable result = new HbaseMapWritable();
+ result.putAll(map);
+ return new RowResult(row, result);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+
+ }
+
+ public long openScanner(final long transactionId, final byte[] regionName,
+ final byte[][] cols, final byte[] firstRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ checkOpen();
+ NullPointerException npe = null;
+ if (regionName == null) {
+ npe = new NullPointerException("regionName is null");
+ } else if (cols == null) {
+ npe = new NullPointerException("columns to scan is null");
+ } else if (firstRow == null) {
+ npe = new NullPointerException("firstRow for scanner is null");
+ }
+ if (npe != null) {
+ IOException io = new IOException("Invalid arguments to openScanner");
+ io.initCause(npe);
+ throw io;
+ }
+ super.getRequestCount().incrementAndGet();
+ try {
+ TransactionalRegion r = getTransactionalRegion(regionName);
+ long scannerId = -1L;
+ InternalScanner s = r.getScanner(transactionId, cols, firstRow,
+ timestamp, filter);
+ scannerId = super.addScanner(s);
+ return scannerId;
+ } catch (IOException e) {
+ LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
+ RemoteExceptionHandler.checkIOException(e));
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void beginTransaction(final long transactionId, final byte[] regionName)
+ throws IOException {
+ getTransactionalRegion(regionName).beginTransaction(transactionId);
+ }
+
+}
Index: src/java/org/apache/hadoop/hbase/regionserver/HLog.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HLog.java (revision 668256)
+++ src/java/org/apache/hadoop/hbase/regionserver/HLog.java (working copy)
@@ -171,7 +171,7 @@
* Accessor for tests.
* @return Current state of the monotonically increasing file id.
*/
- long getFilenum() {
+ public long getFilenum() {
return this.filenum;
}
@@ -204,6 +204,10 @@
}
}
}
+
+ public long getSequenceNumber() {
+ return logSeqNum;
+ }
/**
* Roll the log writer. That is, start writing log messages to a new file.
@@ -310,7 +314,7 @@
* This is a convenience method that computes a new filename with a given
* file-number.
*/
- Path computeFilename(final long fn) {
+ public Path computeFilename(final long fn) {
return new Path(dir, HLOG_DATFILE + fn);
}
@@ -329,7 +333,7 @@
*
* @throws IOException
*/
- void close() throws IOException {
+ public void close() throws IOException {
cacheFlushLock.lock();
try {
synchronized (updateLock) {
@@ -390,19 +394,7 @@
new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
HLogEdit logEdit =
new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
- try {
- this.writer.append(logKey, logEdit);
- } catch (IOException e) {
- LOG.error("Could not append to log. Opening new log. Exception: ", e);
- rollWriter();
- try {
- this.writer.append(logKey, logEdit);
- } catch (IOException e2) {
- LOG.fatal("Could not append to log the second time because " +
- e2.toString() + ", aborting.");
- throw e2;
- }
- }
+ doWrite(logKey, logEdit);
this.numEntries++;
}
}
@@ -412,7 +404,70 @@
}
}
}
+
+ private void doWrite(HLogKey logKey, HLogEdit logEdit) throws IOException {
+ try {
+ this.writer.append(logKey, logEdit);
+ } catch (IOException e) {
+ LOG.error("Could not append to log. Opening new log. Exception: ", e);
+ rollWriter();
+ try {
+ this.writer.append(logKey, logEdit);
+ } catch (IOException e2) {
+ LOG.fatal("Could not append to log the second time because " +
+ e2.toString() + ", aborting.");
+ throw e2;
+ }
+ }
+ }
+
+ /** Append an entry without a row to the log.
+ *
+ * @param regionInfo
+ * @param logEdit
+ * @throws IOException
+ */
+ public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws IOException {
+ this.append(regionInfo, new byte[0], logEdit);
+ }
+
+ /** Append an entry to the log.
+ *
+ * @param regionName
+ * @param tableName
+ * @param row
+ * @param logEdit
+ * @throws IOException
+ */
+ public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit) throws IOException {
+ if (closed) {
+ throw new IOException("Cannot append; log is closed");
+ }
+ byte [] regionName = regionInfo.getRegionName();
+ byte [] tableName = regionInfo.getTableDesc().getName();
+
+ synchronized (updateLock) {
+ long seqNum = obtainSeqNum();
+ // The 'lastSeqWritten' map holds the sequence number of the oldest
+ // write for each region. When the cache is flushed, the entry for the
+ // region being flushed is removed if the sequence number of the flush
+ // is greater than or equal to the value in lastSeqWritten.
+ if (!this.lastSeqWritten.containsKey(regionName)) {
+ this.lastSeqWritten.put(regionName, Long.valueOf(seqNum));
+ }
+ HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum);
+ doWrite(logKey, logEdit);
+ this.numEntries++;
+ }
+
+ if (this.numEntries > this.maxlogentries) {
+ if (listener != null) {
+ listener.logRollRequested();
+ }
+ }
+ }
+
/** @return How many items have been added to the log */
int getNumEntries() {
return numEntries;
@@ -509,6 +564,10 @@
this.cacheFlushLock.unlock();
}
+ public static boolean isMetaColumn(byte [] column) {
+ return Bytes.equals(METACOLUMN, column);
+ }
+
/**
* Split up a bunch of log files, that are no longer being written to, into
* new files, one per region. Delete the old log files when finished.
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 668256)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -244,8 +244,8 @@
LOG.debug("Files for new region");
listPaths(fs, newRegionDir);
}
- HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo,
- null, null);
+ HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null);
+ dstRegion.initialize(null, null);
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
@@ -358,7 +358,7 @@
private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock();
private final Integer splitLock = new Integer(0);
- private final long minSequenceId;
+ private long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
//////////////////////////////////////////////////////////////////////////////
@@ -368,8 +368,6 @@
/**
* HRegion constructor.
*
- * @param basedir qualified path of directory where region should be located,
- * usually the table directory.
* @param log The HLog is the outbound log for any updates to the HRegion
* (There's a single HLog for all the HRegions on a single HRegionServer.)
* The log file is a logfile from the previous execution that's
@@ -377,31 +375,6 @@
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
- * @param fs is the filesystem.
- * @param conf is global configuration settings.
- * @param regionInfo - HRegionInfo that describes the region
- * @param initialFiles If there are initial files (implying that the HRegion
- * is new), then read them from the supplied path.
- * @param flushListener an object that implements CacheFlushListener or null
- * or null
- * @throws IOException
- */
- public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles,
- FlushRequester flushListener) throws IOException {
- this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
- }
-
- /**
- * HRegion constructor.
- *
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
- * custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
- * (implying that the HRegion has been written-to before), then read it from
- * the supplied path.
* @param basedir qualified path of directory where region should be located,
* usually the table directory.
* @param fs is the filesystem.
@@ -416,10 +389,8 @@
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles,
- FlushRequester flushListener, final Progressable reporter)
- throws IOException {
-
+ HRegionInfo regionInfo,
+ FlushRequester flushListener) {
this.basedir = basedir;
this.log = log;
this.fs = fs;
@@ -430,7 +401,6 @@
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName());
this.regiondir = new Path(basedir, encodedNameStr);
- Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
this.historian = RegionHistorian.getInstance();
if (LOG.isDebugEnabled()) {
@@ -441,6 +411,25 @@
this.regionCompactionDir =
new Path(getCompactionDir(basedir), encodedNameStr);
+ // By default, we flush the cache when 64M.
+ this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
+ 1024*1024*64);
+
+ this.blockingMemcacheSize = this.memcacheFlushSize *
+ conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
+
+ }
+
+ /** Initialize this region and get it ready to roll.
+ *
+ * @param initialFiles
+ * @param reporter
+ * @throws IOException
+ */
+ public void initialize( Path initialFiles,
+ final Progressable reporter) throws IOException {
+ Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
+
// Move prefab HStore files into place (if any). This picks up split files
// and any merges from splits and merges dirs.
if (initialFiles != null && fs.exists(initialFiles)) {
@@ -484,14 +473,7 @@
if (fs.exists(merges)) {
fs.delete(merges, true);
}
-
- // By default, we flush the cache when 64M.
- this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
- 1024*1024*64);
-
- this.blockingMemcacheSize = this.memcacheFlushSize *
- conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
-
+
// HRegion is ready to go!
this.writestate.compacting = false;
this.lastFlushTime = System.currentTimeMillis();
@@ -769,10 +751,12 @@
// Opening the region copies the splits files from the splits directory
// under each region.
HRegion regionA =
- new HRegion(basedir, log, fs, conf, regionAInfo, dirA, null);
+ new HRegion(basedir, log, fs, conf, regionAInfo, null);
+ regionA.initialize(dirA, null);
regionA.close();
HRegion regionB =
- new HRegion(basedir, log, fs, conf, regionBInfo, dirB, null);
+ new HRegion(basedir, log, fs, conf, regionBInfo, null);
+ regionB.initialize(dirB, null);
regionB.close();
// Cleanup
@@ -995,12 +979,14 @@
// goes into the HLog after we've flushed all these snapshots also goes
// into the info file that sits beside the flushed files.
long sequenceId = -1L;
+ long completeSequenceId = -1L;
this.updatesLock.writeLock().lock();
try {
for (HStore s: stores.values()) {
s.snapshot();
}
sequenceId = log.startCacheFlush();
+ completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
} finally {
this.updatesLock.writeLock().unlock();
}
@@ -1045,7 +1031,7 @@
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(getRegionName(),
- regionInfo.getTableDesc().getName(), sequenceId);
+ regionInfo.getTableDesc().getName(), completeSequenceId);
// C. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
@@ -1068,6 +1054,18 @@
return true;
}
+ /**
+ * Get the sequence number to be associated with this cache flush. Used by
+ * TransactionalRegion to not complete pending transactions.
+ *
+ *
+ * @param currentSequenceId
+ * @return sequence id to complete the cache flush with
+ */
+ protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
+ return currentSequenceId;
+ }
+
//////////////////////////////////////////////////////////////////////////////
// get() methods for client use.
//////////////////////////////////////////////////////////////////////////////
@@ -1288,7 +1286,16 @@
* @param b
* @throws IOException
*/
- public void batchUpdate(BatchUpdate b)
+ public void batchUpdate(BatchUpdate b) throws IOException {
+ this.batchUpdate(b, true);
+ }
+
+ /**
+ * @param b
+ * @param writeToWal if true, then we write this update to the log
+ * @throws IOException
+ */
+ public void batchUpdate(BatchUpdate b, boolean writeToWAL)
throws IOException {
// Do a rough check that we have resources to accept a write. The check is
@@ -1335,7 +1342,7 @@
this.targetColumns.remove(lid);
if (edits != null && edits.size() > 0) {
- update(edits);
+ update(edits, writeToWAL);
}
if (deletes != null && deletes.size() > 0) {
@@ -1507,16 +1514,26 @@
}
targets.put(key, val);
}
-
- /*
+
+ /**
* Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
- * @param row Row to update.
- * @param timestamp Timestamp to record the updates against
+ * @param writeToWAL if true, then we should write to the log
* @param updatesByColumn Cell updates by column
* @throws IOException
*/
- private void update(final TreeMap updatesByColumn)
+ private void update(final TreeMap updatesByColumn) throws IOException {
+ this.update(updatesByColumn, true);
+ }
+
+ /**
+ * Add updates first to the hlog (if writeToWal) and then add values to memcache.
+ * Warning: Assumption is caller has lock on passed in row.
+ * @param writeToWAL if true, then we should write to the log
+ * @param updatesByColumn Cell updates by column
+ * @throws IOException
+ */
+ private void update(final TreeMap updatesByColumn, boolean writeToWAL)
throws IOException {
if (updatesByColumn == null || updatesByColumn.size() <= 0) {
return;
@@ -1524,8 +1541,10 @@
boolean flush = false;
this.updatesLock.readLock().lock();
try {
- this.log.append(regionInfo.getRegionName(),
- regionInfo.getTableDesc().getName(), updatesByColumn);
+ if (writeToWAL) {
+ this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc()
+ .getName(), updatesByColumn);
+ }
long size = 0;
for (Map.Entry e: updatesByColumn.entrySet()) {
HStoreKey key = e.getKey();
@@ -1960,9 +1979,11 @@
if (!info.isMetaRegion()) {
RegionHistorian.getInstance().addRegionCreation(info);
}
- return new HRegion(tableDir,
- new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
- fs, conf, info, null, null);
+ HRegion region = new HRegion(tableDir,
+ new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
+ fs, conf, info, null);
+ region.initialize(null, null);
+ return region;
}
/**
@@ -1989,7 +2010,8 @@
}
HRegion r = new HRegion(
HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
- log, FileSystem.get(conf), conf, info, null, null);
+ log, FileSystem.get(conf), conf, info, null);
+ r.initialize(null, null);
if (log != null) {
log.setSequenceNumber(r.getMinSequenceId());
}
Index: src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (revision 0)
@@ -0,0 +1,163 @@
+/*
+ * $Id$
+ * Created on Jun 4, 2008
+ *
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+
+/** Interface for transactional region servers.
+ *
+ */
+public interface TransactionalRegionInterface extends HRegionInterface {
+ /** Interface version number */
+ public static final long versionID = 1L;
+
+ /** Sent to initiate a transaction.
+ *
+ * @param transactionId
+ * @param regionName name of region
+ */
+ public void beginTransaction(long transactionId, final byte[] regionName) throws IOException;
+
+ /**
+ * Retrieve a single value from the specified region for the specified row and
+ * column keys
+ *
+ * @param regionName name of region
+ * @param row row key
+ * @param column column key
+ * @return alue for that region/row/column
+ * @throws IOException
+ */
+ public Cell get(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column) throws IOException;
+
+ /**
+ * Get the specified number of versions of the specified row and column
+ *
+ * @param regionName region name
+ * @param row row key
+ * @param column column key
+ * @param numVersions number of versions to return
+ * @return array of values
+ * @throws IOException
+ */
+ public Cell[] get(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final int numVersions)
+ throws IOException;
+
+ /**
+ * Get the specified number of versions of the specified row and column with
+ * the specified timestamp.
+ *
+ * @param regionName region name
+ * @param row row key
+ * @param column column key
+ * @param timestamp timestamp
+ * @param numVersions number of versions to return
+ * @return array of values
+ * @throws IOException
+ */
+ public Cell[] get(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final long timestamp,
+ final int numVersions) throws IOException;
+
+ /**
+ * Get all the data for the specified row at a given timestamp
+ *
+ * @param regionName region name
+ * @param row row key
+ * @return map of values
+ * @throws IOException
+ */
+ public RowResult getRow(long transactionId, final byte[] regionName,
+ final byte[] row, final long ts) throws IOException;
+
+ /**
+ * Get selected columns for the specified row at a given timestamp.
+ *
+ * @param regionName region name
+ * @param row row key
+ * @return map of values
+ * @throws IOException
+ */
+ public RowResult getRow(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns, final long ts)
+ throws IOException;
+
+ /**
+ * Get selected columns for the specified row at the latest timestamp.
+ *
+ * @param regionName region name
+ * @param row row key
+ * @return map of values
+ * @throws IOException
+ */
+ public RowResult getRow(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns) throws IOException;
+
+ /**
+ * Opens a remote scanner with a RowFilter.
+ *
+ * @param transactionId
+ * @param regionName name of region to scan
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex for column family name. A column name is judged to be regex if
+ * it contains at least one of the following characters:
+ * \+|^&*$[]]}{)(.
+ * @param startRow starting row to scan
+ * @param timestamp only return values whose timestamp is <= this value
+ * @param filter RowFilter for filtering results at the row-level.
+ *
+ * @return scannerId scanner identifier used in other calls
+ * @throws IOException
+ */
+ public long openScanner(final long transactionId, final byte[] regionName,
+ final byte[][] columns, final byte[] startRow, long timestamp,
+ RowFilterInterface filter) throws IOException;
+
+ /**
+ * Applies a batch of updates via one RPC
+ *
+ * @param regionName name of the region to update
+ * @param b BatchUpdate
+ * @throws IOException
+ */
+ public void batchUpdate(long transactionId, final byte[] regionName,
+ final BatchUpdate b) throws IOException;
+
+ /**
+ * Ask if we can commit the given transaction.
+ *
+ * @param transactionId
+ * @return true if we can commit
+ */
+ public boolean commitRequest(final byte[] regionName, long transactionId)
+ throws IOException;
+
+ /**
+ * Commit the transaction.
+ *
+ * @param transactionId
+ * @return
+ */
+ public void commit(final byte[] regionName, long transactionId)
+ throws IOException;
+
+ /**
+ * Abort the transaction.
+ *
+ * @param transactionId
+ * @return
+ */
+ public void abort(final byte[] regionName, long transactionId)
+ throws IOException;
+}
Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (revision 0)
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+
+class TransactionScannerCallable extends ScannerCallable {
+
+ private TransactionState transactionState;
+
+ TransactionScannerCallable(final TransactionState transactionState,
+ final HConnection connection, final byte[] tableName,
+ final byte[][] columns, final byte[] startRow, final long timestamp,
+ final RowFilterInterface filter) {
+ super(connection, tableName, columns, startRow, timestamp, filter);
+ this.transactionState = transactionState;
+ }
+
+ @Override
+ protected long openScanner() throws IOException {
+ if (transactionState.addRegion(location)) {
+ ((TransactionalRegionInterface) server).beginTransaction(transactionState
+ .getTransactionId(), location.getRegionInfo().getRegionName());
+ }
+ return ((TransactionalRegionInterface) server).openScanner(transactionState
+ .getTransactionId(), this.location.getRegionInfo().getRegionName(),
+ getColumns(), row, getTimestamp(), getFilter());
+ }
+}
Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (revision 0)
@@ -0,0 +1,135 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+
+/**
+ * Transaction Manager. Responsible for committing transactions.
+ *
+ */
+public class TransactionManager {
+ static final Log LOG = LogFactory.getLog(TransactionManager.class);
+
+ private final HConnection connection;
+ private final TransactionLogger transactionLogger;
+
+ public TransactionManager(final TransactionLogger transactionLogger) {
+ this(transactionLogger, new HBaseConfiguration());
+ }
+
+ public TransactionManager(final TransactionLogger transactionLogger,
+ final HBaseConfiguration conf) {
+ this.transactionLogger = transactionLogger;
+ connection = HConnectionManager.getConnection(conf);
+ }
+
+ /**
+ * Called to start a transaction.
+ *
+ * @return new transaction state
+ */
+ public TransactionState beginTransaction() {
+ long transactionId = transactionLogger.createNewTransactionLog();
+ LOG.debug("Begining transaction " + transactionId);
+ return new TransactionState(transactionId);
+ }
+
+ /**
+ * Try and commit a transaction.
+ *
+ * @param transactionState
+ * @return
+ * @throws IOException
+ */
+ public void tryCommit(final TransactionState transactionState)
+ throws CommitUnsuccessfulException, IOException {
+ LOG.debug("atempting to commit trasaction: " + transactionState.toString());
+
+ for (HRegionLocation location : transactionState.getParticipatingRegions()) {
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+ boolean canCommit = transactionalRegionServer
+ .commitRequest(location.getRegionInfo().getRegionName(),
+ transactionState.getTransactionId());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Region [" + location.getRegionInfo().getRegionNameAsString()
+ + "] votes " + (canCommit ? "to commit" : "to abort")
+ + " transaction " + transactionState.getTransactionId());
+ }
+
+ if (!canCommit) {
+ LOG.debug("Aborting [" + transactionState.getTransactionId() + "]");
+ abort(transactionState, location);
+ throw new CommitUnsuccessfulException();
+ }
+ }
+
+ LOG.debug("Commiting [" + transactionState.getTransactionId() + "]");
+
+ transactionLogger.setStatusForTransaction(transactionState
+ .getTransactionId(), TransactionLogger.TransactionStatus.COMMITTED);
+
+ for (HRegionLocation location : transactionState.getParticipatingRegions()) {
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+ transactionalRegionServer.commit(
+ location.getRegionInfo().getRegionName(), transactionState
+ .getTransactionId());
+ }
+ // Tran log can be deleted now ...
+ }
+
+ /**
+ * Abort a s transaction.
+ *
+ * @param transactionState
+ * @throws IOException
+ */
+ public void abort(final TransactionState transactionState) throws IOException {
+ abort(transactionState, null);
+ }
+
+ private void abort(final TransactionState transactionState,
+ final HRegionLocation locationToIgnore) throws IOException {
+ transactionLogger.setStatusForTransaction(transactionState
+ .getTransactionId(), TransactionLogger.TransactionStatus.ABORTED);
+
+ for (HRegionLocation location : transactionState.getParticipatingRegions()) {
+ if (locationToIgnore != null && location.equals(locationToIgnore)) {
+ continue;
+ }
+
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+
+ transactionalRegionServer.abort(location.getRegionInfo().getRegionName(),
+ transactionState.getTransactionId());
+ }
+ }
+}
Index: src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/transactional/DummyTransactionLogger.java (revision 0)
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A local, in-memory implementation of the transaction logger. Useful only for
+ * non-distributed testing.
+ *
+ */
+public class DummyTransactionLogger implements TransactionLogger {
+
+ private AtomicLong nextTransactionId = new AtomicLong(0);
+ private Map transactionIdToStatusMap = Collections
+ .synchronizedMap(new HashMap());
+
+ public long createNewTransactionLog() {
+ long id = nextTransactionId.getAndIncrement();
+ transactionIdToStatusMap.put(id, TransactionStatus.PENDING);
+ return id;
+ }
+
+ public TransactionStatus getStatusForTransaction(final long transactionId) {
+ return transactionIdToStatusMap.get(transactionId);
+ }
+
+ public void setStatusForTransaction(final long transactionId,
+ final TransactionStatus status) {
+ transactionIdToStatusMap.put(transactionId, status);
+ }
+}
Index: src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java (revision 0)
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Thrown if a region server is passed an unknown transaction id
+ */
+ public class UnknownTransactionException extends DoNotRetryIOException {
+
+ /** constructor */
+ public UnknownTransactionException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public UnknownTransactionException(String s) {
+ super(s);
+ }
+}
Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (revision 0)
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+/**
+ * Simple interface used to provide a log about transaction status. Written to
+ * by the client, and read by regionservers in case of failure.
+ *
+ */
+public interface TransactionLogger {
+
+ enum TransactionStatus {
+ PENDING, COMMITTED, ABORTED
+ }
+
+ /**
+ * Create a new transaction log. Return the transaction's globally unique id.
+ * Log's initial value should be PENDING
+ *
+ * @return transaction id
+ */
+ long createNewTransactionLog();
+
+ TransactionStatus getStatusForTransaction(long transactionId);
+
+ void setStatusForTransaction(long transactionId, TransactionStatus status);
+
+}
Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java (revision 0)
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+
+/**
+ * Holds client-side transaction information. Client's use them as opaque
+ * objects passed around to transaction operations.
+ *
+ */
+public class TransactionState {
+ static final Log LOG = LogFactory.getLog(TransactionState.class);
+
+ private final long transactionId;
+
+ private Set participatingRegions = new HashSet();
+
+ TransactionState(final long transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ boolean addRegion(final HRegionLocation hregion) {
+ boolean added = participatingRegions.add(hregion);
+
+ if (added) {
+ LOG.debug("Adding new hregion ["
+ + hregion.getRegionInfo().getRegionNameAsString()
+ + "] to transaction [" + transactionId + "]");
+ }
+
+ return added;
+ }
+
+ Set getParticipatingRegions() {
+ return participatingRegions;
+ }
+
+ /**
+ * Get the transactionId.
+ *
+ * @return Return the transactionId.
+ */
+ public long getTransactionId() {
+ return transactionId;
+ }
+
+ @Override
+ public String toString() {
+ return "id: " + transactionId + ", particpants: "
+ + participatingRegions.size();
+ }
+}
Index: src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java (revision 0)
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+/** Thrown when a transaction cannot be committed.
+ *
+ */
+public class CommitUnsuccessfulException extends Exception {
+
+ public CommitUnsuccessfulException() {
+ super();
+ }
+
+ public CommitUnsuccessfulException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ public CommitUnsuccessfulException(String arg0) {
+ super(arg0);
+ }
+
+ public CommitUnsuccessfulException(Throwable arg0) {
+ super(arg0);
+ }
+
+}
Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (revision 0)
@@ -0,0 +1,375 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Table with transactional support.
+ *
+ */
+public class TransactionalTable extends HTable {
+
+ public TransactionalTable(final HBaseConfiguration conf,
+ final String tableName) throws IOException {
+ super(conf, tableName);
+ }
+
+ public TransactionalTable(final HBaseConfiguration conf, final Text tableName)
+ throws IOException {
+ super(conf, tableName);
+ }
+
+ public TransactionalTable(final HBaseConfiguration conf,
+ final byte[] tableName) throws IOException {
+ super(conf, tableName);
+ }
+
+ private static abstract class TransactionalServerCallable extends
+ ServerCallable {
+ protected TransactionState transactionState;
+
+ protected TransactionalRegionInterface getTransactionServer() {
+ return (TransactionalRegionInterface) server;
+ }
+
+ protected void recordServer() throws IOException {
+ if (transactionState.addRegion(location)) {
+ getTransactionServer().beginTransaction(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName());
+ }
+ }
+
+ public TransactionalServerCallable(final HConnection connection,
+ final byte[] tableName, final byte[] row,
+ final TransactionState transactionState) {
+ super(connection, tableName, row);
+ this.transactionState = transactionState;
+ }
+
+ }
+
+ /**
+ * Get a single value for the specified row and column
+ *
+ * @param row row key
+ * @param column column name
+ * @return value for specified row/column
+ * @throws IOException
+ */
+ public Cell get(final TransactionState transactionState, final byte[] row,
+ final byte[] column) throws IOException {
+ return super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public Cell call() throws IOException {
+ recordServer();
+ return getTransactionServer().get(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, column);
+ }
+ });
+ }
+
+ /**
+ * Get the specified number of versions of the specified row and column
+ *
+ * @param row - row key
+ * @param column - column name
+ * @param numVersions - number of versions to retrieve
+ * @return - array byte values
+ * @throws IOException
+ */
+ public Cell[] get(final TransactionState transactionState, final byte[] row,
+ final byte[] column, final int numVersions) throws IOException {
+ Cell[] values = null;
+ values = super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public Cell[] call() throws IOException {
+ recordServer();
+ return getTransactionServer().get(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, column,
+ numVersions);
+ }
+ });
+
+ return values;
+ }
+
+ /**
+ * Get the specified number of versions of the specified row and column with
+ * the specified timestamp.
+ *
+ * @param row - row key
+ * @param column - column name
+ * @param timestamp - timestamp
+ * @param numVersions - number of versions to retrieve
+ * @return - array of values that match the above criteria
+ * @throws IOException
+ */
+ public Cell[] get(final TransactionState transactionState, final byte[] row,
+ final byte[] column, final long timestamp, final int numVersions)
+ throws IOException {
+ Cell[] values = null;
+ values = super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public Cell[] call() throws IOException {
+ recordServer();
+ return getTransactionServer().get(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, column,
+ timestamp, numVersions);
+ }
+ });
+
+ return values;
+ }
+
+ /**
+ * Get all the data for the specified row at the latest timestamp
+ *
+ * @param row row key
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final TransactionState transactionState,
+ final byte[] row) throws IOException {
+ return getRow(transactionState, row, HConstants.LATEST_TIMESTAMP);
+ }
+
+ /**
+ * Get all the data for the specified row at a specified timestamp
+ *
+ * @param row row key
+ * @param ts timestamp
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final TransactionState transactionState,
+ final byte[] row, final long ts) throws IOException {
+ return super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public RowResult call() throws IOException {
+ recordServer();
+ return getTransactionServer().getRow(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, ts);
+ }
+ });
+ }
+
+ /**
+ * Get selected columns for the specified row at the latest timestamp
+ *
+ * @param row row key
+ * @param columns Array of column names you want to retrieve.
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final TransactionState transactionState,
+ final byte[] row, final byte[][] columns) throws IOException {
+ return getRow(transactionState, row, columns, HConstants.LATEST_TIMESTAMP);
+ }
+
+ /**
+ * Get selected columns for the specified row at a specified timestamp
+ *
+ * @param row row key
+ * @param columns Array of column names you want to retrieve.
+ * @param ts timestamp
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final TransactionState transactionState,
+ final byte[] row, final byte[][] columns, final long ts)
+ throws IOException {
+ return super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public RowResult call() throws IOException {
+ recordServer();
+ return getTransactionServer().getRow(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, columns, ts);
+ }
+ });
+ }
+
+ /**
+ * Get a scanner on the current table starting at first row. Return the
+ * specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * \+|^&*$[]]}{)(.
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns) throws IOException {
+ return getScanner(transactionState, columns, HConstants.EMPTY_START_ROW,
+ HConstants.LATEST_TIMESTAMP, null);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row. Return
+ * the specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * \+|^&*$[]]}{)(.
+ * @param startRow starting row in table to scan
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns, final byte[] startRow) throws IOException {
+ return getScanner(transactionState, columns, startRow,
+ HConstants.LATEST_TIMESTAMP, null);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row. Return
+ * the specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * \+|^&*$[]]}{)(.
+ * @param startRow starting row in table to scan
+ * @param timestamp only return results whose timestamp <= this value
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns, final byte[] startRow, final long timestamp)
+ throws IOException {
+ return getScanner(transactionState, columns, startRow, timestamp, null);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row. Return
+ * the specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * \+|^&*$[]]}{)(.
+ * @param startRow starting row in table to scan
+ * @param filter a row filter using row-key regexp and/or column data filter.
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns, final byte[] startRow,
+ final RowFilterInterface filter) throws IOException {
+ return getScanner(transactionState, columns, startRow,
+ HConstants.LATEST_TIMESTAMP, filter);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row. Return
+ * the specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * \+|^&*$[]]}{)(.
+ * @param startRow starting row in table to scan
+ * @param timestamp only return results whose timestamp <= this value
+ * @param filter a row filter using row-key regexp and/or column data filter.
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns, final byte[] startRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ return new TransactionalClientScanner(transactionState, columns, startRow,
+ timestamp, filter);
+ }
+
+ /**
+ * Commit a BatchUpdate to the table.
+ *
+ * @param batchUpdate
+ * @throws IOException
+ */
+ public synchronized void commit(final TransactionState transactionState,
+ final BatchUpdate batchUpdate) throws IOException {
+ super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable(super.getConnection(), super
+ .getTableName(), batchUpdate.getRow(), transactionState) {
+ public Boolean call() throws IOException {
+ recordServer();
+ getTransactionServer().batchUpdate(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), batchUpdate);
+ return null;
+ }
+ });
+ }
+
+ protected class TransactionalClientScanner extends HTable.ClientScanner {
+
+ private TransactionState transactionState;
+
+ protected TransactionalClientScanner(
+ final TransactionState transactionState, final byte[][] columns,
+ final byte[] startRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ super(columns, startRow, timestamp, filter);
+ this.transactionState = transactionState;
+ }
+
+ @Override
+ protected ScannerCallable getScannerCallable(final byte[] localStartKey) {
+ return new TransactionScannerCallable(transactionState, getConnection(),
+ getTableName(), getColumns(), localStartKey, getTimestamp(),
+ getFilter());
+ }
+ }
+
+}
| | |