Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (working copy) @@ -74,6 +74,9 @@ this.serverAddress = serverAddress; } + /** + * {@inheritDoc} + */ @Override public String toString() { return "address: " + this.serverAddress.toString() + ", regioninfo: " + @@ -312,7 +315,7 @@ long scannerId = -1L; try { scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName); + REGIONINFO, tableName, System.currentTimeMillis(), null); KeyedData[] values = server.next(scannerId); if(values == null || values.length == 0) { break; @@ -417,7 +420,7 @@ long scannerId = -1L; try { scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName); + REGIONINFO, tableName, System.currentTimeMillis(), null); boolean isenabled = false; while(true) { KeyedData[] values = server.next(scannerId); @@ -500,7 +503,7 @@ long scannerId = -1L; try { scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, - REGIONINFO, tableName); + REGIONINFO, tableName, System.currentTimeMillis(), null); boolean disabled = false; while(true) { KeyedData[] values = server.next(scannerId); @@ -807,7 +810,8 @@ long scannerId = -1L; try { scannerId = - server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName); + server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName, + System.currentTimeMillis(), null); DataInputBuffer inbuf = new DataInputBuffer(); while(true) { @@ -963,7 +967,7 @@ long scannerId = -1L; try { scannerId = server.openScanner(t.regionInfo.regionName, - META_COLUMNS, EMPTY_START_ROW); + META_COLUMNS, EMPTY_START_ROW, System.currentTimeMillis(), null); DataInputBuffer inbuf = new DataInputBuffer(); while(true) { @@ -1180,9 +1184,38 @@ * @throws IOException */ public synchronized HScannerInterface obtainScanner(Text[] columns, - Text startRow) - throws IOException { - return obtainScanner(columns, startRow, null); + Text startRow) throws IOException { + return obtainScanner(columns, startRow, System.currentTimeMillis(), null); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @return scanner + * @throws IOException + */ + public synchronized HScannerInterface obtainScanner(Text[] columns, + Text startRow, long timestamp) throws IOException { + return obtainScanner(columns, startRow, timestamp, null); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public synchronized HScannerInterface obtainScanner(Text[] columns, + Text startRow, RowFilterInterface filter) throws IOException { + return obtainScanner(columns, startRow, System.currentTimeMillis(), filter); } /** @@ -1191,6 +1224,7 @@ * * @param columns array of columns to return * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value * @param filter a row filter using row-key regexp and/or column data filter. * @return scanner * @throws IOException @@ -1196,14 +1230,14 @@ * @throws IOException */ public synchronized HScannerInterface obtainScanner(Text[] columns, - Text startRow, RowFilterInterface filter) - throws IOException { + Text startRow, long timestamp, RowFilterInterface filter) + throws IOException { if(this.tableServers == null) { throw new IllegalStateException("Must open table first"); } - return new ClientScanner(columns, startRow, filter); + return new ClientScanner(columns, startRow, timestamp, filter); } - + /* * @return General HClient RetryPolicy instance. */ @@ -1361,8 +1395,21 @@ * @throws IOException */ public void commit(long lockid) throws IOException { + commit(lockid, System.currentTimeMillis()); + } + + /** + * Finalize a row mutation + * + * @param lockid - lock id returned from startUpdate + * @param timestamp - time to associate with the change + * @throws IOException + */ + public void commit(long lockid, long timestamp) throws IOException { try { - this.currentServer.commit(this.currentRegion, this.clientid, lockid); + this.currentServer.commit(this.currentRegion, this.clientid, lockid, + timestamp); + } finally { this.currentServer = null; this.currentRegion = null; @@ -1399,6 +1446,7 @@ private final Text EMPTY_COLUMN = new Text(); private Text[] columns; private Text startRow; + private long scanTime; private boolean closed; private RegionLocation[] regions; @SuppressWarnings("hiding") @@ -1422,10 +1470,11 @@ this.regions = info.toArray(new RegionLocation[info.size()]); } - ClientScanner(Text[] columns, Text startRow, RowFilterInterface filter) - throws IOException { + ClientScanner(Text[] columns, Text startRow, long timestamp, + RowFilterInterface filter) throws IOException { this.columns = columns; this.startRow = startRow; + this.scanTime = timestamp; this.closed = false; this.filter = filter; if (filter != null) { @@ -1462,11 +1511,12 @@ if (this.filter == null) { this.scannerId = this.server.openScanner(info.regionInfo.regionName, this.columns, currentRegion == 0 ? this.startRow - : EMPTY_START_ROW); + : EMPTY_START_ROW, scanTime, null); } else { - this.scannerId = this.server.openScanner(info.regionInfo.regionName, - this.columns, currentRegion == 0 ? this.startRow - : EMPTY_START_ROW, filter); + this.scannerId = + this.server.openScanner(info.regionInfo.regionName, + this.columns, currentRegion == 0 ? this.startRow + : EMPTY_START_ROW, scanTime, filter); } break; @@ -1488,8 +1538,8 @@ return true; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap) + /** + * {@inheritDoc} */ public boolean next(HStoreKey key, TreeMap results) throws IOException { if(this.closed) { @@ -1511,8 +1561,8 @@ return values == null ? false : values.length != 0; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HScannerInterface#close() + /** + * {@inheritDoc} */ public void close() throws IOException { if(this.scannerId != -1L) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java (working copy) @@ -26,10 +26,22 @@ * is specified) or if multiple members of the same column family were * specified. If so, we need to ignore the timestamp to ensure that we get all * the family members, as they may have been last updated at different times. - * This interface exposes two APIs for querying the scanner. */ public interface HInternalScannerInterface { + /** + * Grab the next row's worth of values. The HScanner will return the most + * recent data value for each row that is not newer than the target time. + * + * If a dataFilter is defined, it will be used to skip rows that do not + * match its criteria. It may cause the scanner to stop prematurely if it + * knows that it will no longer accept the remaining results. + * + * @param key HStoreKey containing row and timestamp + * @param results Map of column/value pairs + * @return true if a value was found + * @throws IOException + */ public boolean next(HStoreKey key, TreeMap results) throws IOException; @@ -38,9 +50,9 @@ */ public void close(); - /** Returns true if the scanner is matching a column family or regex */ + /** @return true if the scanner is matching a column family or regex */ public boolean isWildcardScanner(); - /** Returns true if the scanner is matching multiple column family members */ + /** @return true if the scanner is matching multiple column family members */ public boolean isMultipleMatchScanner(); } \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -51,8 +51,8 @@ public class HMaster implements HConstants, HMasterInterface, HMasterRegionInterface, Runnable { - /* (non-Javadoc) - * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long) + /** + * {@inheritDoc} */ public long getProtocolVersion(String protocol, @SuppressWarnings("unused") long clientVersion) @@ -169,7 +169,7 @@ try { regionServer = client.getHRegionConnection(region.server); scannerId = regionServer.openScanner(region.regionName, METACOLUMNS, - FIRST_ROW); + FIRST_ROW, System.currentTimeMillis(), null); while (true) { TreeMap results = new TreeMap(); @@ -278,6 +278,9 @@ * Scanner for the ROOT HRegion. */ class RootScanner extends BaseScanner { + /** + * {@inheritDoc} + */ public void run() { if (LOG.isDebugEnabled()) { LOG.debug("Running ROOT scanner"); @@ -338,6 +341,9 @@ Text regionName; Text startKey; + /** + * {@inheritDoc} + */ @Override public boolean equals(Object o) { return this.compareTo(o) == 0; @@ -343,6 +349,9 @@ return this.compareTo(o) == 0; } + /** + * {@inheritDoc} + */ @Override public int hashCode() { int result = this.regionName.hashCode(); @@ -351,6 +360,10 @@ } // Comparable + + /** + * {@inheritDoc} + */ public int compareTo(Object o) { MetaRegion other = (MetaRegion)o; @@ -380,6 +393,9 @@ * action would prevent other work from getting done. */ class MetaScanner extends BaseScanner { + /** + * {@inheritDoc} + */ @SuppressWarnings("null") public void run() { while (!closed) { @@ -804,8 +820,8 @@ // HMasterRegionInterface ////////////////////////////////////////////////////////////////////////////// - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerStartup(org.apache.hadoop.hbase.HServerInfo) + /** + * {@inheritDoc} */ @SuppressWarnings("unused") public void regionServerStartup(HServerInfo serverInfo) @@ -838,8 +854,8 @@ return s.hashCode(); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerReport(org.apache.hadoop.hbase.HServerInfo, org.apache.hadoop.hbase.HMsg[]) + /** + * {@inheritDoc} */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { @@ -1336,7 +1352,7 @@ } server.delete(regionName, clientId, lockid, COL_SERVER); server.delete(regionName, clientId, lockid, COL_STARTCODE); - server.commit(regionName, clientId, lockid); + server.commit(regionName, clientId, lockid, System.currentTimeMillis()); } // Get regions reassigned @@ -1384,7 +1400,8 @@ try { LOG.debug("scanning root region"); - scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow); + scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, + columns, startRow, System.currentTimeMillis(), null); scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName); break; @@ -1413,7 +1430,8 @@ server = client.getHRegionConnection(r.server); - scannerId = server.openScanner(r.regionName, columns, startRow); + scannerId = server.openScanner(r.regionName, columns, startRow, + System.currentTimeMillis(), null); scanMetaRegion(server, scannerId, r.regionName); } @@ -1513,7 +1531,9 @@ } server.delete(metaRegionName, clientId, lockid, COL_SERVER); server.delete(metaRegionName, clientId, lockid, COL_STARTCODE); - server.commit(metaRegionName, clientId, lockid); + server.commit(metaRegionName, clientId, lockid, + System.currentTimeMillis()); + break; } catch(NotServingRegionException e) { @@ -1622,7 +1642,9 @@ long lockid = server.startUpdate(metaRegionName, clientId, regionName); server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress); server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode); - server.commit(metaRegionName, clientId, lockid); + server.commit(metaRegionName, clientId, lockid, + System.currentTimeMillis()); + break; } catch(NotServingRegionException e) { @@ -1639,8 +1661,8 @@ // HMasterInterface ////////////////////////////////////////////////////////////////////////////// - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#isMasterRunning() + /** + * {@inheritDoc} */ public boolean isMasterRunning() { return !closed; @@ -1646,8 +1668,8 @@ return !closed; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#shutdown() + /** + * {@inheritDoc} */ public void shutdown() { TimerTask tt = new TimerTask() { @@ -1664,8 +1686,8 @@ t.schedule(tt, 10); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#createTable(org.apache.hadoop.hbase.HTableDescriptor) + /** + * {@inheritDoc} */ public void createTable(HTableDescriptor desc) throws IOException { if (!isMasterRunning()) { @@ -1717,7 +1739,8 @@ long lockid = server.startUpdate(metaRegionName, clientId, regionName); server.put(metaRegionName, clientId, lockid, COL_REGIONINFO, byteValue.toByteArray()); - server.commit(metaRegionName, clientId, lockid); + server.commit(metaRegionName, clientId, lockid, + System.currentTimeMillis()); // 4. Close the new region to flush it to disk @@ -1741,8 +1764,8 @@ } } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#deleteTable(org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public void deleteTable(Text tableName) throws IOException { new TableDelete(tableName).process(); @@ -1751,8 +1774,8 @@ } } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#addColumn(org.apache.hadoop.io.Text, org.apache.hadoop.hbase.HColumnDescriptor) + /** + * {@inheritDoc} */ public void addColumn(Text tableName, HColumnDescriptor column) throws IOException { new AddColumn(tableName, column).process(); @@ -1758,8 +1781,8 @@ new AddColumn(tableName, column).process(); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#deleteColumn(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public void deleteColumn(Text tableName, Text columnName) throws IOException { new DeleteColumn(tableName, HStoreKey.extractFamily(columnName)).process(); @@ -1765,8 +1788,8 @@ new DeleteColumn(tableName, HStoreKey.extractFamily(columnName)).process(); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#enableTable(org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public void enableTable(Text tableName) throws IOException { new ChangeTableState(tableName, true).process(); @@ -1772,8 +1795,8 @@ new ChangeTableState(tableName, true).process(); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#findRootRegion() + /** + * {@inheritDoc} */ public HServerAddress findRootRegion() { return rootRegionLocation; @@ -1779,8 +1802,8 @@ return rootRegionLocation; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HMasterInterface#disableTable(org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public void disableTable(Text tableName) throws IOException { new ChangeTableState(tableName, false).process(); @@ -1837,7 +1860,8 @@ // Open a scanner on the meta region long scannerId = - server.openScanner(m.regionName, METACOLUMNS, tableName); + server.openScanner(m.regionName, METACOLUMNS, tableName, + System.currentTimeMillis(), null); try { DataInputBuffer inbuf = new DataInputBuffer(); @@ -1995,7 +2019,9 @@ updateRegionInfo(server, m.regionName, i); server.delete(m.regionName, clientId, lockid, COL_SERVER); server.delete(m.regionName, clientId, lockid, COL_STARTCODE); - server.commit(m.regionName, clientId, lockid); + server.commit(m.regionName, clientId, lockid, + System.currentTimeMillis()); + lockid = -1L; if(LOG.isDebugEnabled()) { @@ -2151,7 +2177,7 @@ lockid = server.startUpdate(regionName, clientId, i.regionName); server.put(regionName, clientId, lockid, COL_REGIONINFO, byteValue.toByteArray()); - server.commit(regionName, clientId, lockid); + server.commit(regionName, clientId, lockid, System.currentTimeMillis()); lockid = -1L; if(LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.regionName); @@ -2252,8 +2278,8 @@ this.server = server; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.LeaseListener#leaseExpired() + /** + * {@inheritDoc} */ public void leaseExpired() { LOG.info(server + " lease expired"); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (working copy) @@ -46,6 +46,7 @@ final HLocking lock = new HLocking(); + /** constructor */ public HMemcache() { super(); } @@ -50,11 +51,12 @@ super(); } + /** represents the state of the memcache at a specified point in time */ public static class Snapshot { - public TreeMap memcacheSnapshot = null; - public long sequenceId = 0; + TreeMap memcacheSnapshot = null; + long sequenceId = 0; - public Snapshot() { + Snapshot() { super(); } } @@ -72,7 +74,7 @@ * * @return frozen HMemcache TreeMap and HLog sequence number. */ - public Snapshot snapshotMemcacheForLog(HLog log) throws IOException { + Snapshot snapshotMemcacheForLog(HLog log) throws IOException { Snapshot retval = new Snapshot(); this.lock.obtainWriteLock(); @@ -112,6 +114,7 @@ * Delete the snapshot, remove from history. * * Modifying the structure means we need to obtain a writelock. + * @throws IOException */ public void deleteSnapshot() throws IOException { this.lock.obtainWriteLock(); @@ -251,6 +254,9 @@ for (Map.Entry es: tailMap.entrySet()) { HStoreKey itKey = es.getKey(); if (itKey.matchesRowCol(curKey)) { + if(HConstants.DELETE_BYTES.compareTo(es.getValue()) == 0) { + break; + } result.add(tailMap.get(itKey)); curKey.setVersion(itKey.getTimestamp() - 1); } @@ -264,7 +270,7 @@ /** * Return a scanner over the keys in the HMemcache */ - public HInternalScannerInterface getScanner(long timestamp, + HInternalScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow) throws IOException { return new HMemcacheScanner(timestamp, targetCols, firstRow); @@ -280,7 +286,7 @@ final Iterator keyIterators[]; @SuppressWarnings("unchecked") - public HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow) + HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow) throws IOException { super(timestamp, targetCols); @@ -331,6 +337,7 @@ * @param firstRow seek to this row * @return true if this is the first row */ + @Override boolean findFirstRow(int i, Text firstRow) { return firstRow.getLength() == 0 || keys[i].getRow().compareTo(firstRow) >= 0; @@ -342,6 +349,7 @@ * @param i Which iterator to fetch next value from * @return true if there is more data available */ + @Override boolean getNext(int i) { if (!keyIterators[i].hasNext()) { closeSubScanner(i); @@ -353,6 +361,7 @@ } /** Shut down an individual map iterator. */ + @Override void closeSubScanner(int i) { keyIterators[i] = null; keys[i] = null; @@ -361,6 +370,7 @@ } /** Shut down map iterators, and release the lock */ + @Override public void close() { if(! scannerClosed) { try { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (working copy) @@ -316,7 +316,6 @@ /** Instantiated to compact the meta region */ private static class OfflineMerger extends Merger { - private Path dir; private TreeSet metaRegions; private TreeMap results; @@ -324,7 +323,6 @@ throws IOException { super(conf, fs, tableName); - this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); this.metaRegions = new TreeSet(); this.results = new TreeMap(); @@ -334,7 +332,7 @@ new HRegion(dir, hlog,fs, conf, HGlobals.rootRegionInfo, null); HInternalScannerInterface rootScanner = - root.getScanner(META_COLS, new Text()); + root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null); try { while(rootScanner.next(key, results)) { @@ -357,7 +355,7 @@ } @Override - protected TreeSet next() throws IOException { + protected TreeSet next() { more = false; return metaRegions; } @@ -380,7 +378,7 @@ root.delete(lockid, COL_REGIONINFO); root.delete(lockid, COL_SERVER); root.delete(lockid, COL_STARTCODE); - root.commit(lockid); + root.commit(lockid, System.currentTimeMillis()); lockid = -1L; if(LOG.isDebugEnabled()) { @@ -405,7 +403,7 @@ try { lockid = root.startUpdate(newRegion.getRegionName()); root.put(lockid, COL_REGIONINFO, byteValue.toByteArray()); - root.commit(lockid); + root.commit(lockid, System.currentTimeMillis()); lockid = -1L; if(LOG.isDebugEnabled()) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -95,7 +95,11 @@ * HRegionServer. Returns a brand-new active HRegion, also * running on the current HRegionServer. */ - static HRegion closeAndMerge(HRegion srcA, HRegion srcB) throws IOException { + static HRegion closeAndMerge(final HRegion srcA, final HRegion srcB) + throws IOException { + + HRegion a = srcA; + HRegion b = srcB; // Make sure that srcA comes first; important for key-ordering during // write of the merged file. @@ -109,25 +113,24 @@ } else if((srcB.getStartKey() == null) // A is not null but B is || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B - HRegion tmp = srcA; - srcA = srcB; - srcB = tmp; + a = srcB; + b = srcA; } - if (! srcA.getEndKey().equals(srcB.getStartKey())) { + if (! a.getEndKey().equals(b.getStartKey())) { throw new IOException("Cannot merge non-adjacent regions"); } - FileSystem fs = srcA.getFilesystem(); - Configuration conf = srcA.getConf(); - HTableDescriptor tabledesc = srcA.getTableDesc(); - HLog log = srcA.getLog(); - Path rootDir = srcA.getRootDir(); + FileSystem fs = a.getFilesystem(); + Configuration conf = a.getConf(); + HTableDescriptor tabledesc = a.getTableDesc(); + HLog log = a.getLog(); + Path rootDir = a.getRootDir(); - Text startKey = srcA.getStartKey(); - Text endKey = srcB.getEndKey(); + Text startKey = a.getStartKey(); + Text endKey = b.getEndKey(); - Path merges = new Path(srcA.getRegionDir(), MERGEDIR); + Path merges = new Path(a.getRegionDir(), MERGEDIR); if(! fs.exists(merges)) { fs.mkdirs(merges); } @@ -141,8 +144,8 @@ throw new IOException("Cannot merge; target file collision at " + newRegionDir); } - LOG.info("starting merge of regions: " + srcA.getRegionName() + " and " - + srcB.getRegionName() + " new region start key is '" + LOG.info("starting merge of regions: " + a.getRegionName() + " and " + + b.getRegionName() + " new region start key is '" + (startKey == null ? "" : startKey) + "', end key is '" + (endKey == null ? "" : endKey) + "'"); @@ -151,7 +154,7 @@ TreeSet alreadyMerged = new TreeSet(); TreeMap> filesToMerge = new TreeMap>(); - for(HStoreFile src: srcA.flushcache(true)) { + for(HStoreFile src: a.flushcache(true)) { Vector v = filesToMerge.get(src.getColFamily()); if(v == null) { v = new Vector(); @@ -160,7 +163,7 @@ v.add(src); } - for(HStoreFile src: srcB.flushcache(true)) { + for(HStoreFile src: b.flushcache(true)) { Vector v = filesToMerge.get(src.getColFamily()); if(v == null) { v = new Vector(); @@ -188,7 +191,7 @@ if(LOG.isDebugEnabled()) { LOG.debug("flushing changes since start of merge for region " - + srcA.getRegionName()); + + a.getRegionName()); } filesToMerge.clear(); @@ -193,7 +196,7 @@ filesToMerge.clear(); - for(HStoreFile src: srcA.close()) { + for(HStoreFile src: a.close()) { if(! alreadyMerged.contains(src)) { Vector v = filesToMerge.get(src.getColFamily()); if(v == null) { @@ -206,10 +209,10 @@ if(LOG.isDebugEnabled()) { LOG.debug("flushing changes since start of merge for region " - + srcB.getRegionName()); + + b.getRegionName()); } - for(HStoreFile src: srcB.close()) { + for(HStoreFile src: b.close()) { if(! alreadyMerged.contains(src)) { Vector v = filesToMerge.get(src.getColFamily()); if(v == null) { @@ -391,11 +394,13 @@ * Close down this HRegion. Flush the cache, shut down each HStore, don't * service any more calls. * - * The returned Vector is a list of all the storage files that the HRegion's - * component HStores make use of. It's a list of HStoreFile objects. - * * This method could take some time to execute, so don't call it from a * time-sensitive thread. + * + * @return Vector of all the storage files that the HRegion's component + * HStores make use of. It's a list of HStoreFile objects. + * + * @throws IOException */ public Vector close() throws IOException { lock.obtainWriteLock(); @@ -559,14 +564,17 @@ // HRegion accessors ////////////////////////////////////////////////////////////////////////////// + /** @return start key for region */ public Text getStartKey() { return regionInfo.startKey; } - + + /** @return end key for region */ public Text getEndKey() { return regionInfo.endKey; } - + + /** @return region id */ public long getRegionId() { return regionInfo.regionId; } @@ -571,30 +579,37 @@ return regionInfo.regionId; } + /** @return region name */ public Text getRegionName() { return regionInfo.regionName; } - + + /** @return root directory path */ public Path getRootDir() { return rootDir; } - + + /** @return HTableDescriptor for this region */ public HTableDescriptor getTableDesc() { return regionInfo.tableDesc; } - + + /** @return HLog in use for this region */ public HLog getLog() { return log; } - + + /** @return Configuration object */ public Configuration getConf() { return conf; } - + + /** @return region directory Path */ public Path getRegionDir() { return regiondir; } - + + /** @return FileSystem being used by this region */ public FileSystem getFilesystem() { return fs; } @@ -982,19 +997,17 @@ /** * Return an iterator that scans over the HRegion, returning the indicated - * columns. This Iterator must be closed by the caller. - */ - public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) - throws IOException { - return getScanner(cols, firstRow, null); - } - - /** - * Return an iterator that scans over the HRegion, returning the indicated * columns for only the rows that match the data filter. This Iterator must be closed by the caller. + * + * @param cols columns desired in result set + * @param firstRow row which is the starting point of the scan + * @param timestamp only return rows whose timestamp is <= this value + * @param filter row filter + * @return HScannerInterface + * @throws IOException */ - public HInternalScannerInterface getScanner(Text[] cols, Text firstRow, RowFilterInterface filter) - throws IOException { + public HInternalScannerInterface getScanner(Text[] cols, Text firstRow, + long timestamp, RowFilterInterface filter) throws IOException { lock.obtainReadLock(); try { TreeSet families = new TreeSet(); @@ -1007,7 +1020,7 @@ for (Text family: families) { storelist[i++] = stores.get(family); } - return new HScanner(cols, firstRow, memcache, storelist, filter); + return new HScanner(cols, firstRow, timestamp, memcache, storelist, filter); } finally { lock.releaseReadLock(); } @@ -1029,6 +1042,7 @@ * * @param row Row to update * @return lockid + * @throws IOException * @see #put(long, Text, byte[]) */ public long startUpdate(Text row) throws IOException { @@ -1048,6 +1062,11 @@ * * This method really just tests the input, then calls an internal localput() * method. + * + * @param lockid lock id obtained from startUpdate + * @param targetCol name of column to be updated + * @param val new value for column + * @throws IOException */ public void put(long lockid, Text targetCol, byte [] val) throws IOException { if (DELETE_BYTES.compareTo(val) == 0) { @@ -1058,6 +1077,10 @@ /** * Delete a value or write a value. This is a just a convenience method for put(). + * + * @param lockid lock id obtained from startUpdate + * @param targetCol name of column to be deleted + * @throws IOException */ public void delete(long lockid, Text targetCol) throws IOException { localput(lockid, targetCol, DELETE_BYTES.get()); @@ -1109,6 +1132,9 @@ * Abort a pending set of writes. This dumps from memory all in-progress * writes associated with the given row-lock. These values have not yet * been placed in memcache or written to the log. + * + * @param lockid lock id obtained from startUpdate + * @throws IOException */ public void abort(long lockid) throws IOException { Text row = getRowFromLock(lockid); @@ -1142,9 +1168,10 @@ * Once updates hit the change log, they are safe. They will either be moved * into an HStore in the future, or they will be recovered from the log. * @param lockid Lock for row we're to commit. + * @param timestamp the time to associate with this change * @throws IOException */ - public void commit(final long lockid) throws IOException { + public void commit(final long lockid, long timestamp) throws IOException { // Remove the row from the pendingWrites list so // that repeated executions won't screw this up. Text row = getRowFromLock(lockid); @@ -1156,12 +1183,11 @@ // hasn't aborted/committed the write-operation synchronized(row) { // Add updates to the log and add values to the memcache. - long commitTimestamp = System.currentTimeMillis(); TreeMap columns = this.targetColumns.get(lockid); if (columns != null && columns.size() > 0) { log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), - row, columns, commitTimestamp); - memcache.add(row, columns, commitTimestamp); + row, columns, timestamp); + memcache.add(row, columns, timestamp); // OK, all done! } targetColumns.remove(lockid); @@ -1287,9 +1313,8 @@ /** Create an HScanner with a handle on many HStores. */ @SuppressWarnings("unchecked") - HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores, RowFilterInterface filter) - throws IOException { - long scanTime = System.currentTimeMillis(); + HScanner(Text[] cols, Text firstRow, long timestamp, HMemcache memcache, + HStore[] stores, RowFilterInterface filter) throws IOException { this.dataFilter = filter; if (null != dataFilter) { dataFilter.reset(); @@ -1310,7 +1335,7 @@ // NOTE: the memcache scanner should be the first scanner try { HInternalScannerInterface scanner = - memcache.getScanner(scanTime, cols, firstRow); + memcache.getScanner(timestamp, cols, firstRow); if(scanner.isWildcardScanner()) { this.wildcardMatch = true; } @@ -1320,7 +1345,7 @@ scanners[0] = scanner; for(int i = 0; i < stores.length; i++) { - scanner = stores[i].getScanner(scanTime, cols, firstRow); + scanner = stores[i].getScanner(timestamp, cols, firstRow); if(scanner.isWildcardScanner()) { this.wildcardMatch = true; } @@ -1347,8 +1372,8 @@ } } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HInternalScannerInterface#isWildcardScanner() + /** + * {@inheritDoc} */ public boolean isWildcardScanner() { return wildcardMatch; @@ -1354,8 +1379,8 @@ return wildcardMatch; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HInternalScannerInterface#isMultipleMatchScanner() + /** + * {@inheritDoc} */ public boolean isMultipleMatchScanner() { return multipleMatchers; @@ -1361,18 +1386,8 @@ return multipleMatchers; } - /* - * (non-Javadoc) - * - * Grab the next row's worth of values. The HScanner will return the most - * recent data value for each row that is not newer than the target time. - * - * If a dataFilter is defined, it will be used to skip rows that do not - * match its criteria. It may cause the scanner to stop prematurely if it - * knows that it will no longer accept the remaining results. - * - * @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, - * java.util.TreeMap) + /** + * {@inheritDoc} */ public boolean next(HStoreKey key, TreeMap results) throws IOException { @@ -1501,8 +1516,8 @@ } } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HInternalScannerInterface#close() + /** + * {@inheritDoc} */ public void close() { for(int i = 0; i < scanners.length; i++) { @@ -1573,7 +1588,7 @@ DataOutputStream s = new DataOutputStream(bytes); r.getRegionInfo().write(s); meta.put(writeid, COL_REGIONINFO, bytes.toByteArray()); - meta.commit(writeid); + meta.commit(writeid, System.currentTimeMillis()); } static void addRegionToMETA(final HClient client, Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegiondirReader.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegiondirReader.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegiondirReader.java (working copy) @@ -165,7 +165,9 @@ HRegion r = new HRegion(this.parentdir, null, FileSystem.get(this.conf), conf, info, null); Text [] families = info.tableDesc.families().keySet().toArray(new Text [] {}); - HInternalScannerInterface scanner = r.getScanner(families, new Text()); + HInternalScannerInterface scanner = + r.getScanner(families, new Text(), System.currentTimeMillis(), null); + HStoreKey key = new HStoreKey(); TreeMap results = new TreeMap(); // Print out table header line. Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (working copy) @@ -171,10 +171,11 @@ * @param regionName region name * @param clientid a unique value to identify the client * @param lockid lock id returned from startUpdate + * @param timestamp the time (in milliseconds to associate with this change) * @throws IOException */ public void commit(final Text regionName, final long clientid, - final long lockid) + final long lockid, final long timestamp) throws IOException; /** @@ -191,19 +192,6 @@ ////////////////////////////////////////////////////////////////////////////// /** - * Opens a remote scanner. - * - * @param regionName name of region to scan - * @param columns columns to scan - * @param startRow starting row to scan - * - * @return scannerId scanner identifier used in other calls - * @throws IOException - */ - public long openScanner(Text regionName, Text[] columns, Text startRow) - throws IOException; - - /** * Opens a remote scanner with a RowFilter. * * @param regionName name of region to scan @@ -209,6 +197,7 @@ * @param regionName name of region to scan * @param columns columns to scan * @param startRow starting row to scan + * @param timestamp only return values whose timestamp is <= this value * @param filter RowFilter for filtering results at the row-level. * * @return scannerId scanner identifier used in other calls @@ -214,7 +203,8 @@ * @return scannerId scanner identifier used in other calls * @throws IOException */ - public long openScanner(Text regionName, Text[] columns, Text startRow, RowFilterInterface filter) + public long openScanner(Text regionName, Text[] columns, Text startRow, + long timestamp, RowFilterInterface filter) throws IOException; /** Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -48,8 +48,8 @@ ******************************************************************************/ public class HRegionServer implements HConstants, HRegionInterface, Runnable { - /* (non-Javadoc) - * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long) + /** + * {@inheritDoc} */ public long getProtocolVersion(final String protocol, @SuppressWarnings("unused") final long clientVersion) @@ -107,8 +107,8 @@ class SplitOrCompactChecker implements Runnable, RegionUnavailableListener { HClient client = new HClient(conf); - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.RegionUnavailableListener#closing(org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public void closing(final Text regionName) { lock.writeLock().lock(); @@ -124,8 +124,8 @@ } } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.RegionUnavailableListener#closed(org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public void closed(final Text regionName) { lock.writeLock().lock(); @@ -139,8 +139,8 @@ } } - /* (non-Javadoc) - * @see java.lang.Runnable#run() + /** + * {@inheritDoc} */ public void run() { while(! stopRequested) { @@ -261,8 +261,8 @@ protected final Integer cacheFlusherLock = new Integer(0); /** Runs periodically to flush the memcache */ class Flusher implements Runnable { - /* (non-Javadoc) - * @see java.lang.Runnable#run() + /** + * {@inheritDoc} */ public void run() { while(! stopRequested) { @@ -327,8 +327,8 @@ private int maxLogEntries = conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); - /* (non-Javadoc) - * @see java.lang.Runnable#run() + /** + * {@inheritDoc} */ public void run() { while(! stopRequested) { @@ -769,8 +769,8 @@ } } - /* (non-Javadoc) - * @see java.lang.Runnable#run() + /** + * {@inheritDoc} */ public void run() { for(ToDoEntry e = null; !stopRequested; ) { @@ -895,8 +895,8 @@ // HRegionInterface ////////////////////////////////////////////////////////////////////////////// - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#getRegionInfo(org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public HRegionInfo getRegionInfo(final Text regionName) throws NotServingRegionException { @@ -903,8 +903,8 @@ return getRegion(regionName).getRegionInfo(); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public byte [] get(final Text regionName, final Text row, final Text column) @@ -912,8 +912,8 @@ return getRegion(regionName).get(row, column); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, int) + /** + * {@inheritDoc} */ public byte [][] get(final Text regionName, final Text row, final Text column, final int numVersions) @@ -921,8 +921,8 @@ return getRegion(regionName).get(row, column, numVersions); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, long, int) + /** + * {@inheritDoc} */ public byte [][] get(final Text regionName, final Text row, final Text column, final long timestamp, final int numVersions) throws IOException { @@ -929,8 +929,8 @@ return getRegion(regionName).get(row, column, timestamp, numVersions); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#getRow(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public KeyedData[] getRow(final Text regionName, final Text row) throws IOException { HRegion region = getRegion(regionName); @@ -944,8 +944,8 @@ return result; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#next(long) + /** + * {@inheritDoc} */ public KeyedData[] next(final long scannerId) throws IOException { @@ -993,8 +993,8 @@ return values.toArray(new KeyedData[values.size()]); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#startUpdate(org.apache.hadoop.io.Text, long, org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public long startUpdate(Text regionName, long clientid, Text row) throws IOException { @@ -1015,6 +1015,9 @@ this.localLockId = lockId; } + /** + * {@inheritDoc} + */ public void leaseExpired() { try { localRegion.abort(localLockId); @@ -1024,8 +1027,8 @@ } } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#put(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text, org.apache.hadoop.io.BytesWritable) + /** + * {@inheritDoc} */ public void put(final Text regionName, final long clientid, final long lockid, final Text column, final byte [] val) @@ -1035,8 +1038,8 @@ region.put(lockid, column, val); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#delete(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException { @@ -1045,8 +1048,8 @@ region.delete(lockid, column); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#abort(org.apache.hadoop.io.Text, long, long) + /** + * {@inheritDoc} */ public void abort(Text regionName, long clientid, long lockid) throws IOException { @@ -1055,18 +1058,18 @@ region.abort(lockid); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#commit(org.apache.hadoop.io.Text, long, long) + /** + * {@inheritDoc} */ - public void commit(Text regionName, long clientid, long lockid) - throws IOException { + public void commit(Text regionName, final long clientid, final long lockid, + final long timestamp) throws IOException { HRegion region = getRegion(regionName, true); leases.cancelLease(clientid, lockid); - region.commit(lockid); + region.commit(lockid, timestamp); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#renewLease(long, long) + /** + * {@inheritDoc} */ public void renewLease(long lockid, long clientid) throws IOException { leases.renewLease(clientid, lockid); @@ -1136,8 +1139,8 @@ this.scannerName = n; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.LeaseListener#leaseExpired() + /** + * {@inheritDoc} */ public void leaseExpired() { LOG.info("Scanner " + this.scannerName + " lease expired"); @@ -1154,17 +1157,8 @@ /** * {@inheritDoc} */ - public long openScanner(final Text regionName, final Text[] cols, - final Text firstRow) - throws IOException{ - return openScanner(regionName, cols, firstRow, null); - } - - /** - * {@inheritDoc} - */ public long openScanner(Text regionName, Text[] cols, Text firstRow, - final RowFilterInterface filter) + final long timestamp, final RowFilterInterface filter) throws IOException { HRegion r = getRegion(regionName); long scannerId = -1L; @@ -1169,7 +1163,8 @@ HRegion r = getRegion(regionName); long scannerId = -1L; try { - HInternalScannerInterface s = r.getScanner(cols, firstRow, filter); + HInternalScannerInterface s = + r.getScanner(cols, firstRow, timestamp, filter); scannerId = rand.nextLong(); String scannerName = String.valueOf(scannerId); synchronized(scanners) { @@ -1184,8 +1179,8 @@ return scannerId; } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#close(long) + /** + * {@inheritDoc} */ public void close(final long scannerId) throws IOException { String scannerName = String.valueOf(scannerId); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 554801) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -391,6 +391,7 @@ super(fs, dirName, conf); } + /** {@inheritDoc} */ @Override public Writable get(WritableComparable key, Writable val) throws IOException { // Note - the key being passed to us is always a HStoreKey @@ -407,6 +408,7 @@ return null; } + /** {@inheritDoc} */ @Override public WritableComparable getClosest(WritableComparable key, Writable val) throws IOException { @@ -438,6 +440,7 @@ super(conf, fs, dirName, keyClass, valClass, compression); } + /** {@inheritDoc} */ @Override public void append(WritableComparable key, Writable val) throws IOException { // Note - the key being passed to us is always a HStoreKey @@ -1031,6 +1034,9 @@ Text readcol = readkey.getColumn(); if (results.get(readcol) == null && key.matchesWithoutColumn(readkey)) { + if(readval.equals(HConstants.DELETE_BYTES)) { + break; + } results.put(new Text(readcol), readval.get()); readval = new ImmutableBytesWritable(); } else if(key.getRow().compareTo(readkey.getRow()) > 0) { @@ -1078,10 +1084,14 @@ continue; } if (readkey.matchesRowCol(key)) { + if(readval.equals(HConstants.DELETE_BYTES)) { + break; + } results.add(readval.get()); readval = new ImmutableBytesWritable(); while(map.next(readkey, readval) && readkey.matchesRowCol(key)) { - if (numVersions > 0 && (results.size() >= numVersions)) { + if ((numVersions > 0 && (results.size() >= numVersions)) + || readval.equals(HConstants.DELETE_BYTES)) { break; } results.add(readval.get()); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (revision 554801) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (working copy) @@ -36,6 +36,9 @@ protected FileSystem fs; protected Path dir; + /** + * {@inheritDoc} + */ @Override public void setUp() throws Exception { super.setUp(); @@ -111,6 +114,9 @@ } } + /** + * {@inheritDoc} + */ @Override public void tearDown() throws Exception { super.tearDown(); @@ -128,7 +134,7 @@ + String.format("%1$05d", i))); region.put(lockid, COLUMN_NAME, value.get()); - region.commit(lockid); + region.commit(lockid, System.currentTimeMillis()); if(i % 10000 == 0) { System.out.println("Flushing write #" + i); region.flushcache(false); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (revision 554801) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (working copy) @@ -104,7 +104,7 @@ r.put(lockid, HConstants.COL_REGIONINFO, bytes.toByteArray()); - r.commit(lockid); + r.commit(lockid, System.currentTimeMillis()); lockid = r.startUpdate(ROW_KEY); @@ -120,7 +120,7 @@ r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "region"), "region".getBytes(HConstants.UTF8_ENCODING)); - r.commit(lockid); + r.commit(lockid, System.currentTimeMillis()); // Verify that get works the same from memcache as when reading from disk // NOTE dumpRegion won't work here because it only reads from disk. @@ -152,7 +152,7 @@ r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "junk"), "junk".getBytes()); - r.commit(lockid); + r.commit(lockid, System.currentTimeMillis()); verifyGet(r, otherServerName); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (revision 554801) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (working copy) @@ -120,7 +120,7 @@ long writeid = region.startUpdate(new Text("row_" + k)); region.put(writeid, CONTENTS_BASIC, (CONTENTSTR + k).getBytes()); region.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes()); - region.commit(writeid); + region.commit(writeid, System.currentTimeMillis()); } System.out.println("Write " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -275,7 +275,7 @@ long lockid = region.startUpdate(new Text("row_vals1_" + kLabel)); region.put(lockid, cols[0], vals1[k].getBytes()); region.put(lockid, cols[1], vals1[k].getBytes()); - region.commit(lockid); + region.commit(lockid, System.currentTimeMillis()); numInserted += 2; } @@ -286,7 +286,8 @@ startTime = System.currentTimeMillis(); - HInternalScannerInterface s = region.getScanner(cols, new Text()); + HInternalScannerInterface s = + region.getScanner(cols, new Text(), System.currentTimeMillis(), null); int numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -331,7 +332,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text()); + s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -373,7 +374,7 @@ long lockid = region.startUpdate(new Text("row_vals1_" + kLabel)); region.put(lockid, cols[0], vals1[k].getBytes()); region.put(lockid, cols[1], vals1[k].getBytes()); - region.commit(lockid); + region.commit(lockid, System.currentTimeMillis()); numInserted += 2; } @@ -384,7 +385,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text()); + s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -429,7 +430,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text()); + s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -464,7 +465,9 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text("row_vals1_500")); + s = region.getScanner(cols, new Text("row_vals1_500"), + System.currentTimeMillis(), null); + numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -524,7 +527,7 @@ // Write to the HRegion long writeid = region.startUpdate(new Text("row_" + k)); region.put(writeid, CONTENTS_BODY, buf1.toString().getBytes()); - region.commit(writeid); + region.commit(writeid, System.currentTimeMillis()); if (k > 0 && k % (N_ROWS / 100) == 0) { System.out.println("Flushing write #" + k); @@ -609,8 +612,8 @@ fs.delete(oldRegion2); } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ public void closing(@SuppressWarnings("unused") final Text regionName) { // We don't use this here. It is only for the HRegionServer @@ -616,6 +619,9 @@ // We don't use this here. It is only for the HRegionServer } + /** + * {@inheritDoc} + */ public void closed(@SuppressWarnings("unused") final Text regionName) { // We don't use this here. It is only for the HRegionServer } @@ -633,7 +639,8 @@ long startTime = System.currentTimeMillis(); - HInternalScannerInterface s = region.getScanner(cols, new Text()); + HInternalScannerInterface s = + region.getScanner(cols, new Text(), System.currentTimeMillis(), null); try { @@ -689,7 +696,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text()); + s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); try { int numFetched = 0; HStoreKey curKey = new HStoreKey(); @@ -726,7 +733,9 @@ if(StaticTestEnvironment.debugging) { startTime = System.currentTimeMillis(); - s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text()); + s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text(), + System.currentTimeMillis(), null); + try { int numFetched = 0; HStoreKey curKey = new HStoreKey(); @@ -762,7 +771,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text()); + s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); try { int fetched = 0; Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (revision 554801) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (working copy) @@ -78,7 +78,9 @@ for(int i = 0; i < scanColumns.length; i++) { try { - scanner = region.getScanner(scanColumns[i], FIRST_ROW); + scanner = region.getScanner(scanColumns[i], FIRST_ROW, + System.currentTimeMillis(), null); + while(scanner.next(key, results)) { assertTrue(results.containsKey(HConstants.COL_REGIONINFO)); byte [] val = results.get(HConstants.COL_REGIONINFO); @@ -155,7 +157,7 @@ DataOutputStream s = new DataOutputStream(byteStream); HGlobals.rootRegionInfo.write(s); region.put(lockid, HConstants.COL_REGIONINFO, byteStream.toByteArray()); - region.commit(lockid); + region.commit(lockid, System.currentTimeMillis()); // What we just committed is in the memcache. Verify that we can get // it back both with scanning and get @@ -186,7 +188,7 @@ region.put(lockid, HConstants.COL_STARTCODE, String.valueOf(START_CODE).getBytes(HConstants.UTF8_ENCODING)); - region.commit(lockid); + region.commit(lockid, System.currentTimeMillis()); // Validate that we can still get the HRegionInfo, even though it is in // an older row on disk and there is a newer row in the memcache @@ -223,7 +225,7 @@ region.put(lockid, HConstants.COL_SERVER, address.toString().getBytes(HConstants.UTF8_ENCODING)); - region.commit(lockid); + region.commit(lockid, System.currentTimeMillis()); // Validate again Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (revision 554801) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (working copy) @@ -81,7 +81,7 @@ HClient.RegionLocation rl = client.getRegionLocation(table); regionServer = client.getHRegionConnection(rl.serverAddress); scannerId = regionServer.openScanner(rl.regionInfo.regionName, - HMaster.METACOLUMNS, new Text()); + HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null); while (true) { TreeMap results = new TreeMap(); KeyedData[] values = regionServer.next(scannerId); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java (revision 554801) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java (working copy) @@ -66,6 +66,9 @@ "6789".getBytes() }; + /** + * {@inheritDoc} + */ @Override public void setUp() throws Exception { super.setUp(); @@ -96,7 +99,7 @@ + String.format("%1$05d", i))); region.put(lockid, TEXT_INPUT_COLUMN, values[i]); - region.commit(lockid); + region.commit(lockid, System.currentTimeMillis()); } region.close(); @@ -117,6 +120,9 @@ } } + /** + * {@inheritDoc} + */ @Override public void tearDown() throws Exception { super.tearDown(); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (revision 0) @@ -0,0 +1,175 @@ +package org.apache.hadoop.hbase; + +import java.util.TreeMap; +import org.apache.hadoop.io.Text; + +/** Tests user specifyable time stamps */ +public class TestTimestamp extends HBaseClusterTestCase { + private static final long T0 = 10L; + private static final long T1 = 100L; + + private static final String COLUMN_NAME = "contents:"; + private static final String TABLE_NAME = "test"; + private static final String VERSION1 = "version1"; + private static final String LATEST = "latest"; + + private static final Text COLUMN = new Text(COLUMN_NAME); + private static final Text[] COLUMNS = { + COLUMN + }; + private static final Text TABLE = new Text(TABLE_NAME); + private static final Text ROW = new Text("row"); + + private HClient client; + + /** constructor */ + public TestTimestamp() { + super(); + client = new HClient(conf); + } + + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + super.setUp(); + + HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(COLUMN_NAME)); + + try { + client.createTable(desc); + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + /** the test */ + public void testTimestamp() { + try { + client.openTable(TABLE); + + // store a value specifying an update time + + long lockid = client.startUpdate(ROW); + client.put(lockid, COLUMN, VERSION1.getBytes(HConstants.UTF8_ENCODING)); + client.commit(lockid, T0); + + // store a value specifying 'now' as the update time + + lockid = client.startUpdate(ROW); + client.put(lockid, COLUMN, LATEST.getBytes(HConstants.UTF8_ENCODING)); + client.commit(lockid); + + // delete values older than T1 + + lockid = client.startUpdate(ROW); + client.delete(lockid, COLUMN); + client.commit(lockid, T1); + + // now retrieve... + + // the most recent version: + + byte[] bytes = client.get(ROW, COLUMN); + assertTrue(bytes != null && bytes.length != 0); + assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING))); + + // any version <= time T1 + + byte[][] values = client.get(ROW, COLUMN, T1, 3); + assertNull(values); + + // the version from T0 + + values = client.get(ROW, COLUMN, T0, 3); + assertTrue(values.length == 1 + && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING))); + + // flush everything out to disk + + HRegionServer s = cluster.regionServers.get(0); + for(HRegion r: s.onlineRegions.values() ) { + r.flushcache(false); + } + + // now retrieve... + + // the most recent version: + + bytes = client.get(ROW, COLUMN); + assertTrue(bytes != null && bytes.length != 0); + assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING))); + + // any version <= time T1 + + values = client.get(ROW, COLUMN, T1, 3); + assertNull(values); + + // the version from T0 + + values = client.get(ROW, COLUMN, T0, 3); + assertTrue(values.length == 1 + && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING))); + + // three versions older than now + + values = client.get(ROW, COLUMN, 3); + assertTrue(values.length == 1 + && LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING))); + + // Test scanners + + HScannerInterface scanner = + client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW); + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + int count = 0; + while(scanner.next(key, results)) { + count++; + } + assertEquals(count, 1); + assertEquals(results.size(), 1); + + } finally { + scanner.close(); + } + + scanner = client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW, T1); + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + int count = 0; + while(scanner.next(key, results)) { + count++; + } + assertEquals(count, 0); + assertEquals(results.size(), 0); + + } finally { + scanner.close(); + } + + scanner = client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW, T0); + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + int count = 0; + while(scanner.next(key, results)) { + count++; + } + assertEquals(count, 0); + assertEquals(results.size(), 0); + + } finally { + scanner.close(); + } + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } +}