Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.NoSuchElementException; +import java.util.Map; import java.util.SortedMap; import org.apache.commons.logging.Log; @@ -25,12 +26,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Writables; /** @@ -178,15 +182,17 @@ scannerId = server.openScanner(firstMetaServer.getRegionInfo().getRegionName(), COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); - KeyedData[] values = server.next(scannerId); - if (values == null || values.length == 0) { + MapWritable values = server.next(scannerId); + if (values == null || values.size() == 0) { break; } boolean found = false; - for (int j = 0; j < values.length; j++) { - if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - info = - (HRegionInfo) Writables.getWritable(values[j].getData(), info); + for (Map.Entry e: values.entrySet()) { + HStoreKey key = (HStoreKey) e.getKey(); + if (key.getColumn().equals(COL_REGIONINFO)) { + info = (HRegionInfo) Writables.getWritable( + ((ImmutableBytesWritable) e.getValue()).get(), info); + if (info.tableDesc.getName().equals(tableName)) { found = true; } @@ -260,8 +266,8 @@ boolean isenabled = false; while (true) { - KeyedData[] values = server.next(scannerId); - if (values == null || values.length == 0) { + MapWritable values = server.next(scannerId); + if (values == null || values.size() == 0) { if (valuesfound == 0) { throw new NoSuchElementException( "table " + tableName + " not found"); @@ -269,10 +275,12 @@ break; } valuesfound += 1; - for (int j = 0; j < values.length; j++) { - if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - info = - (HRegionInfo) Writables.getWritable(values[j].getData(), info); + for (Map.Entry e: values.entrySet()) { + HStoreKey key = (HStoreKey) e.getKey(); + if (key.getColumn().equals(COL_REGIONINFO)) { + info = (HRegionInfo) Writables.getWritable( + ((ImmutableBytesWritable) e.getValue()).get(), info); + isenabled = !info.offLine; break; } @@ -359,8 +367,8 @@ boolean disabled = false; while (true) { - KeyedData[] values = server.next(scannerId); - if (values == null || values.length == 0) { + MapWritable values = server.next(scannerId); + if (values == null || values.size() == 0) { if (valuesfound == 0) { throw new NoSuchElementException("table " + tableName + " not found"); } @@ -367,10 +375,12 @@ break; } valuesfound += 1; - for (int j = 0; j < values.length; j++) { - if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - info = - (HRegionInfo) Writables.getWritable(values[j].getData(), info); + for (Map.Entry e: values.entrySet()) { + HStoreKey key = (HStoreKey) e.getKey(); + if (key.getColumn().equals(COL_REGIONINFO)) { + info = (HRegionInfo) Writables.getWritable( + ((ImmutableBytesWritable) e.getValue()).get(), info); + disabled = info.offLine; break; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (working copy) @@ -27,7 +27,6 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,8 +34,11 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.hbase.util.Writables; /** @@ -228,7 +230,7 @@ /** {@inheritDoc} */ public HTableDescriptor[] listTables() throws IOException { - TreeSet uniqueTables = new TreeSet(); + HashSet uniqueTables = new HashSet(); SortedMap metaTables = getTableServers(META_TABLE_NAME); @@ -241,16 +243,17 @@ COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(), null); + HRegionInfo info = new HRegionInfo(); while (true) { - KeyedData[] values = server.next(scannerId); - if (values.length == 0) { + MapWritable values = server.next(scannerId); + if (values == null || values.size() == 0) { break; } - for (int i = 0; i < values.length; i++) { - if (values[i].getKey().getColumn().equals(COL_REGIONINFO)) { - HRegionInfo info = - (HRegionInfo) Writables.getWritable(values[i].getData(), - new HRegionInfo()); + for (Map.Entry e: values.entrySet()) { + HStoreKey key = (HStoreKey) e.getKey(); + if (key.getColumn().equals(COL_REGIONINFO)) { + info = (HRegionInfo) Writables.getWritable( + ((ImmutableBytesWritable) e.getValue()).get(), info); // Only examine the rows where the startKey is zero length if (info.startKey.getLength() == 0) { @@ -272,9 +275,9 @@ } /** {@inheritDoc} */ - public SortedMap - getTableServers(Text tableName) - throws IOException { + public SortedMap getTableServers(Text tableName) + throws IOException { + if (tableName == null || tableName.getLength() == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); @@ -542,7 +545,7 @@ * @return map of first row to TableInfo for all meta regions * @throws IOException */ - private TreeMap loadMetaFromRoot() + private SortedMap loadMetaFromRoot() throws IOException { SortedMap rootRegion = @@ -646,7 +649,7 @@ * @throws NoServerForRegionException - if table can not be found after retrying * @throws IOException */ - private TreeMap scanOneMetaRegion( + private SortedMap scanOneMetaRegion( final HRegionLocation t, final Text tableName) throws IOException { HRegionInterface server = getHRegionConnection(t.getServerAddress()); @@ -660,8 +663,8 @@ COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null); while (true) { - KeyedData[] values = server.next(scannerId); - if (values.length == 0) { + MapWritable values = server.next(scannerId); + if (values == null || values.size() == 0) { if (servers.size() == 0) { // If we didn't find any servers then the table does not exist throw new TableNotFoundException("table '" + tableName + @@ -676,9 +679,11 @@ break; } - TreeMap results = new TreeMap(); - for (int i = 0; i < values.length; i++) { - results.put(values[i].getKey().getColumn(), values[i].getData()); + SortedMap results = new TreeMap(); + for (Map.Entry e: values.entrySet()) { + HStoreKey key = (HStoreKey) e.getKey(); + results.put(key.getColumn(), + ((ImmutableBytesWritable) e.getValue()).get()); } byte[] bytes = results.get(COL_REGIONINFO); @@ -704,8 +709,13 @@ } break; } + + if (regionInfo.isSplit()) { + // Region is a split parent. Skip it. + continue; + } - if (regionInfo.isOffline() && !regionInfo.isSplit()) { + if (regionInfo.isOffline()) { throw new IllegalStateException("table offline: " + tableName); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -45,9 +45,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; @@ -187,8 +190,8 @@ // Array to hold list of split parents found. Scan adds to list. After // scan we go check if parents can be removed. - Map> splitParents = - new HashMap>(); + Map> splitParents = + new HashMap>(); try { regionServer = connection.getHRegionConnection(region.server); scannerId = @@ -197,14 +200,16 @@ int numberOfRegionsFound = 0; while (true) { - TreeMap results = new TreeMap(); - KeyedData[] values = regionServer.next(scannerId); - if (values.length == 0) { + SortedMap results = new TreeMap(); + MapWritable values = regionServer.next(scannerId); + if (values == null || values.size() == 0) { break; } - for (int i = 0; i < values.length; i++) { - results.put(values[i].getKey().getColumn(), values[i].getData()); + for (Map.Entry e: values.entrySet()) { + HStoreKey key = (HStoreKey) e.getKey(); + results.put(key.getColumn(), + ((ImmutableBytesWritable) e.getValue()).get()); } HRegionInfo info = (HRegionInfo) Writables.getWritable( @@ -260,10 +265,10 @@ // Scan is finished. Take a look at split parents to see if any we can clean up. if (splitParents.size() > 0) { - for (Map.Entry> e: + for (Map.Entry> e: splitParents.entrySet()) { - TreeMap results = e.getValue(); + SortedMap results = e.getValue(); cleanupSplits(region.regionName, regionServer, e.getKey(), (HRegionInfo) Writables.getWritable(results.get(COL_SPLITA), new HRegionInfo()), @@ -1643,7 +1648,7 @@ try { while (true) { - KeyedData[] values = null; + MapWritable values = null; try { values = server.next(scannerId); @@ -1658,23 +1663,25 @@ break; } - if (values == null || values.length == 0) { + if (values == null || values.size() == 0) { break; } - TreeMap results = new TreeMap(); + SortedMap results = new TreeMap(); Text row = null; - for (int i = 0; i < values.length; i++) { - if(row == null) { - row = values[i].getKey().getRow(); - + for (Map.Entry e: values.entrySet()) { + HStoreKey key = (HStoreKey) e.getKey(); + Text thisRow = key.getRow(); + if (row == null) { + row = thisRow; } else { - if (!row.equals(values[i].getKey().getRow())) { + if (!row.equals(thisRow)) { LOG.error("Multiple rows in same scanner result set. firstRow=" - + row + ", currentRow=" + values[i].getKey().getRow()); + + row + ", currentRow=" + thisRow); } } - results.put(values[i].getKey().getColumn(), values[i].getData()); + results.put(key.getColumn(), + ((ImmutableBytesWritable) e.getValue()).get()); } if (LOG.isDebugEnabled() && row != null) { @@ -2317,7 +2324,7 @@ long scannerid = server.openScanner(metaRegionName, COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); try { - KeyedData[] data = server.next(scannerid); + MapWritable data = server.next(scannerid); // Test data and that the row for the data is for our table. If table // does not exist, scanner will return row after where our table would @@ -2323,13 +2330,16 @@ // does not exist, scanner will return row after where our table would // be inserted if it exists so look for exact match on table name. - if (data != null && data.length > 0 && - HRegionInfo.getTableNameFromRegionName( - data[0].getKey().getRow()).equals(tableName)) { - - // Then a region for this table already exists. Ergo table exists. - - throw new TableExistsException(tableName.toString()); + if (data != null && data.size() > 0) { + for (WritableComparable k: data.keySet()) { + if (HRegionInfo.getTableNameFromRegionName( + ((HStoreKey) k).getRow()).equals(tableName)) { + + // Then a region for this table already exists. Ergo table exists. + + throw new TableExistsException(tableName.toString()); + } + } } } finally { @@ -2462,20 +2472,23 @@ String serverName = null; long startCode = -1L; - KeyedData[] values = server.next(scannerId); - if(values == null || values.length == 0) { + MapWritable values = server.next(scannerId); + if(values == null || values.size() == 0) { break; } boolean haveRegionInfo = false; - for (int i = 0; i < values.length; i++) { - if (values[i].getData().length == 0) { + for (Map.Entry e: + values.entrySet()) { + + byte[] value = ((ImmutableBytesWritable) e.getValue()).get(); + if (value == null || value.length == 0) { break; } - Text column = values[i].getKey().getColumn(); + HStoreKey key = (HStoreKey) e.getKey(); + Text column = key.getColumn(); if (column.equals(COL_REGIONINFO)) { haveRegionInfo = true; - info = (HRegionInfo) Writables.getWritable( - values[i].getData(), info); + info = (HRegionInfo) Writables.getWritable(value, info); } else if (column.equals(COL_SERVER)) { try { @@ -2480,9 +2493,9 @@ } else if (column.equals(COL_SERVER)) { try { serverName = - Writables.bytesToString(values[i].getData()); + Writables.bytesToString(value); - } catch (UnsupportedEncodingException e) { + } catch (UnsupportedEncodingException ex) { assert(false); } @@ -2488,9 +2501,9 @@ } else if (column.equals(COL_STARTCODE)) { try { - startCode = Writables.bytesToLong(values[i].getData()); + startCode = Writables.bytesToLong(value); - } catch (UnsupportedEncodingException e) { + } catch (UnsupportedEncodingException ex) { assert(false); } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -870,15 +870,6 @@ } } - private Vector getAllStoreFiles() { - Vector allHStoreFiles = new Vector(); - for(HStore hstore: stores.values()) { - Vector hstoreFiles = hstore.getAllStoreFiles(); - allHStoreFiles.addAll(0, hstoreFiles); - } - return allHStoreFiles; - } - ////////////////////////////////////////////////////////////////////////////// // get() methods for client use. ////////////////////////////////////////////////////////////////////////////// Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (working copy) @@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.VersionedProtocol; @@ -93,11 +93,11 @@ * * @param regionName region name * @param row row key - * @return array of values + * @return map of values * @throws IOException */ - public KeyedData[] getRow(final Text regionName, final Text row) - throws IOException; //TODO + public MapWritable getRow(final Text regionName, final Text row) + throws IOException; ////////////////////////////////////////////////////////////////////////////// // Start an atomic row insertion/update. No changes are committed until the @@ -244,10 +244,10 @@ * Get the next set of values * * @param scannerId clientId passed to openScanner - * @return array of values + * @return map of values * @throws IOException */ - public KeyedData[] next(long scannerId) throws IOException; //TODO + public MapWritable next(long scannerId) throws IOException; /** * Close a scanner Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -40,12 +40,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.filter.RowFilterInterface; -import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.BatchOperation; -import org.apache.hadoop.hbase.io.KeyedData; -import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; @@ -52,6 +49,13 @@ import org.apache.hadoop.net.DNS; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.BatchOperation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; +import org.apache.hadoop.hbase.util.Writables; + /******************************************************************************* * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -1021,16 +1025,18 @@ } /** {@inheritDoc} */ - public KeyedData[] getRow(final Text regionName, final Text row) + public MapWritable getRow(final Text regionName, final Text row) throws IOException { requestCount.incrementAndGet(); HRegion region = getRegion(regionName); + MapWritable result = new MapWritable(HStoreKey.class, + ImmutableBytesWritable.class, + new TreeMap()); + TreeMap map = region.getFull(row); - KeyedData result[] = new KeyedData[map.size()]; - int counter = 0; for (Map.Entry es: map.entrySet()) { - result[counter++] = - new KeyedData(new HStoreKey(row, es.getKey()), es.getValue()); + result.put(new HStoreKey(row, es.getKey()), + new ImmutableBytesWritable(es.getValue())); } return result; } @@ -1036,7 +1042,7 @@ } /** {@inheritDoc} */ - public KeyedData[] next(final long scannerId) + public MapWritable next(final long scannerId) throws IOException { requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); @@ -1048,9 +1054,9 @@ // Collect values to be returned here - ArrayList values = new ArrayList(); - - TreeMap results = new TreeMap(); + MapWritable values = new MapWritable(HStoreKey.class, + ImmutableBytesWritable.class, + new TreeMap()); // Keep getting rows until we find one that has at least one non-deleted column value @@ -1055,6 +1061,7 @@ // Keep getting rows until we find one that has at least one non-deleted column value HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); while (s.next(key, results)) { for(Map.Entry e: results.entrySet()) { HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()); @@ -1063,8 +1070,9 @@ // Column value is deleted. Don't return it. continue; } - values.add(new KeyedData(k, val)); + values.put(k, new ImmutableBytesWritable(val)); } + if(values.size() > 0) { // Row has something in it. Return the value. break; @@ -1074,7 +1082,7 @@ results.clear(); } - return values.toArray(new KeyedData[values.size()]); + return values; } /* Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -572,6 +572,7 @@ */ void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID) throws IOException { + long maxId = maxSeenSeqID; synchronized(compactLock) { Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, familyName); @@ -607,12 +608,12 @@ // Compute the max-sequenceID seen in any of the to-be-compacted // TreeMaps if it hasn't been passed in to us. - if (maxSeenSeqID == -1) { + if (maxId == -1) { for (HStoreFile hsf: toCompactFiles) { long seqid = hsf.loadInfo(fs); if(seqid > 0) { - if(seqid > maxSeenSeqID) { - maxSeenSeqID = seqid; + if(seqid > maxId) { + maxId = seqid; } } } @@ -629,8 +630,8 @@ } // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - if((! deleteSequenceInfo) && maxSeenSeqID >= 0) { - compactedOutputFile.writeInfo(fs, maxSeenSeqID); + if((! deleteSequenceInfo) && maxId >= 0) { + compactedOutputFile.writeInfo(fs, maxId); } else { compactedOutputFile.writeInfo(fs, -1); } @@ -710,14 +711,35 @@ } } } - + + /** Interface for generic reader for compactions */ interface CompactionReader { + + /** + * Closes the reader + * @throws IOException + */ public void close() throws IOException; + + /** + * Get the next key/value pair + * + * @param key + * @param val + * @return true if more data was returned + * @throws IOException + */ public boolean next(WritableComparable key, Writable val) - throws IOException; + throws IOException; + + /** + * Resets the reader + * @throws IOException + */ public void reset() throws IOException; } - + + /** A compaction reader for MapFile */ class MapFileCompactionReader implements CompactionReader { final MapFile.Reader reader; @@ -725,6 +747,7 @@ this.reader = r; } + /** {@inheritDoc} */ public void close() throws IOException { this.reader.close(); } @@ -729,6 +752,7 @@ this.reader.close(); } + /** {@inheritDoc} */ public boolean next(WritableComparable key, Writable val) throws IOException { return this.reader.next(key, val); @@ -734,6 +758,7 @@ return this.reader.next(key, val); } + /** {@inheritDoc} */ public void reset() throws IOException { this.reader.reset(); } @@ -1217,6 +1242,7 @@ return new HStoreScanner(timestamp, targetCols, firstRow); } + /** {@inheritDoc} */ @Override public String toString() { return this.storeName; Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (working copy) @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Map; import java.util.Random; import java.util.SortedMap; import java.util.TreeMap; @@ -32,8 +33,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.ipc.RemoteException; /** @@ -183,12 +187,15 @@ break; } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } if (tries == numRetries - 1) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("reloading table servers because: " + e.getMessage()); + } tableServers = connection.reloadTableServers(tableName); } try { @@ -225,13 +232,16 @@ break; } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } if (tries == numRetries - 1) { // No more tries - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("reloading table servers because: " + e.getMessage()); + } tableServers = connection.reloadTableServers(tableName); } try { @@ -279,13 +289,16 @@ break; } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } if (tries == numRetries - 1) { // No more tries - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("reloading table servers because: " + e.getMessage()); + } tableServers = connection.reloadTableServers(tableName); } try { @@ -315,7 +328,7 @@ */ public SortedMap getRow(Text row) throws IOException { checkClosed(); - KeyedData[] value = null; + MapWritable value = null; for (int tries = 0; tries < numRetries; tries++) { HRegionLocation r = getRegionLocation(row); HRegionInterface server = @@ -326,13 +339,16 @@ break; } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } if (tries == numRetries - 1) { // No more tries - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("reloading table servers because: " + e.getMessage()); + } tableServers = connection.reloadTableServers(tableName); } try { @@ -342,10 +358,12 @@ // continue } } - TreeMap results = new TreeMap(); - if (value != null && value.length != 0) { - for (int i = 0; i < value.length; i++) { - results.put(value[i].getKey().getColumn(), value[i].getData()); + SortedMap results = new TreeMap(); + if (value != null && value.size() != 0) { + for (Map.Entry e: value.entrySet()) { + HStoreKey key = (HStoreKey) e.getKey(); + results.put(key.getColumn(), + ((ImmutableBytesWritable) e.getValue()).get()); } } return results; @@ -574,14 +592,17 @@ break; } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } if (tries < numRetries -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("reloading table servers because: " + e.getMessage()); + } tableServers = connection.reloadTableServers(tableName); } else { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException( - (RemoteException) e); - } throw e; } } @@ -589,6 +610,7 @@ Thread.sleep(pause); } catch (InterruptedException e) { + // continue } } } finally { @@ -702,13 +724,17 @@ break; } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } if (tries == numRetries - 1) { // No more tries - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } throw e; } + if (LOG.isDebugEnabled()) { + LOG.debug("reloading table servers because: " + e.getMessage()); + } tableServers = connection.reloadTableServers(tableName); loadRegions(); } @@ -732,20 +758,22 @@ if (this.closed) { return false; } - KeyedData[] values = null; + MapWritable values = null; do { values = this.server.next(this.scannerId); - } while (values != null && values.length == 0 && nextScanner()); + } while (values != null && values.size() == 0 && nextScanner()); - if (values != null && values.length != 0) { - for (int i = 0; i < values.length; i++) { - key.setRow(values[i].getKey().getRow()); - key.setVersion(values[i].getKey().getTimestamp()); + if (values != null && values.size() != 0) { + for (Map.Entry e: values.entrySet()) { + HStoreKey k = (HStoreKey) e.getKey(); + key.setRow(k.getRow()); + key.setVersion(k.getTimestamp()); key.setColumn(EMPTY_COLUMN); - results.put(values[i].getKey().getColumn(), values[i].getData()); + results.put(k.getColumn(), + ((ImmutableBytesWritable) e.getValue()).get()); } } - return values == null ? false : values.length != 0; + return values == null ? false : values.size() != 0; } /** Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java (working copy) @@ -1,74 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; -import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.io.*; - -import java.io.*; - -/******************************************************************************* - * KeyedData is just a data pair. - * It includes an HStoreKey and some associated data. - ******************************************************************************/ -public class KeyedData implements Writable { - HStoreKey key; - byte [] data; - - /** Default constructor. Used by Writable interface */ - public KeyedData() { - this.key = new HStoreKey(); - } - - /** - * Create a KeyedData object specifying the parts - * @param key HStoreKey - * @param data - */ - public KeyedData(HStoreKey key, byte [] data) { - this.key = key; - this.data = data; - } - - /** @return returns the key */ - public HStoreKey getKey() { - return key; - } - - /** @return - returns the value */ - public byte [] getData() { - return data; - } - - // Writable - - /** {@inheritDoc} */ - public void write(DataOutput out) throws IOException { - key.write(out); - out.writeInt(this.data.length); - out.write(this.data); - } - - /** {@inheritDoc} */ - public void readFields(DataInput in) throws IOException { - key.readFields(in); - this.data = new byte[in.readInt()]; - in.readFully(this.data); - } -} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java (working copy) @@ -1,86 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Writable; - -/** - * Wraps an array of KeyedData items as a Writable. The array elements - * may be null. - */ -public class KeyedDataArrayWritable implements Writable { - - private final static KeyedData NULL_KEYEDDATA = new KeyedData(); - - private KeyedData[] m_data; - - /** - * Make a record of length 0 - */ - public KeyedDataArrayWritable() { - m_data = new KeyedData[0]; - } - - /** @return the array of KeyedData */ - public KeyedData[] get() { - return m_data; - } - - /** - * Sets the KeyedData array - * - * @param data array of KeyedData - */ - public void set(KeyedData[] data) { - if(data == null) { - throw new NullPointerException("KeyedData[] cannot be null"); - } - m_data = data; - } - - // Writable - - /** {@inheritDoc} */ - public void readFields(DataInput in) throws IOException { - int len = in.readInt(); - m_data = new KeyedData[len]; - for(int i = 0; i < len; i++) { - m_data[i] = new KeyedData(); - m_data[i].readFields(in); - } - } - - /** {@inheritDoc} */ - public void write(DataOutput out) throws IOException { - int len = m_data.length; - out.writeInt(len); - for(int i = 0; i < len; i++) { - if(m_data[i] != null) { - m_data[i].write(out); - } else { - NULL_KEYEDDATA.write(out); - } - } - } -} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/MapWritable.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/MapWritable.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/MapWritable.java (revision 0) @@ -0,0 +1,303 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.TreeMap; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.HConstants; + +/** + * + */ +public class MapWritable implements Writable, Map { + private String keyClass = null; + private String valueClass = null; + private String mapClass = null; + private Map instance = null; + + /** + * Default constructor used by writable + */ + public MapWritable() {} + + /** + * @param keyClass the class of the keys + * @param valueClass the class of the values + * @param instance the Map to be wrapped in this Writable + */ + @SuppressWarnings("unchecked") + public MapWritable(Class keyClass, Class valueClass, + Map instance) { + + this.keyClass = keyClass.getName(); + this.valueClass = valueClass.getName(); + this.instance = instance; + this.mapClass = instance.getClass().getName(); + } + + private void checkInitialized() { + if (keyClass == null || + valueClass == null || + mapClass == null || + instance == null) { + + throw new IllegalStateException("object has not been properly initialized"); + } + } + + /** {@inheritDoc} */ + public void clear() { + checkInitialized(); + instance.clear(); + } + + /** {@inheritDoc} */ + public boolean containsKey(Object key) { + checkInitialized(); + return instance.containsKey(key); + } + + /** {@inheritDoc} */ + public boolean containsValue(Object value) { + checkInitialized(); + return instance.containsValue(value); + } + + /** {@inheritDoc} */ + public Set> entrySet() { + checkInitialized(); + return instance.entrySet(); + } + + /** {@inheritDoc} */ + public Writable get(Object key) { + checkInitialized(); + return instance.get(key); + } + + /** + * Returns the value to which this map maps the specified key + * @param key + * @return value associated with specified key + */ + public Writable get(WritableComparable key) { + checkInitialized(); + return instance.get(key); + } + + /** {@inheritDoc} */ + public boolean isEmpty() { + checkInitialized(); + return instance.isEmpty(); + } + + /** {@inheritDoc} */ + public Set keySet() { + checkInitialized(); + return instance.keySet(); + } + + /** {@inheritDoc} */ + public Writable put(WritableComparable key, Writable value) { + checkInitialized(); + return instance.put(key, value); + } + + /** {@inheritDoc} */ + public void putAll(Map t) { + checkInitialized(); + instance.putAll(t); + } + + /** {@inheritDoc} */ + public Writable remove(Object key) { + checkInitialized(); + return instance.remove(key); + } + + /** + * Removes the mapping for this key from this map if it is present + * @param key + * @return value corresponding to key + */ + public Writable remove(WritableComparable key) { + checkInitialized(); + return instance.remove(key); + } + + /** {@inheritDoc} */ + public int size() { + checkInitialized(); + return instance.size(); + } + + /** {@inheritDoc} */ + public Collection values() { + checkInitialized(); + return instance.values(); + } + + // Writable + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + checkInitialized(); + out.writeUTF(mapClass); + out.writeUTF(keyClass); + out.writeUTF(valueClass); + out.writeInt(instance.size()); + + for (Map.Entry e: instance.entrySet()) { + e.getKey().write(out); + e.getValue().write(out); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + public void readFields(DataInput in) throws IOException { + mapClass = in.readUTF(); + keyClass = in.readUTF(); + valueClass = in.readUTF(); + + instance = (Map) objectFactory(mapClass); + + int entries = in.readInt(); + for (int i = 0; i < entries; i++) { + WritableComparable key = (WritableComparable) objectFactory(keyClass); + key.readFields(in); + + Writable value = (Writable) objectFactory(valueClass); + value.readFields(in); + + instance.put(key, value); + } + } + + private Object objectFactory(String className) throws IOException { + Object o = null; + String exceptionMessage = null; + try { + o = Class.forName(className).newInstance(); + + } catch (ClassNotFoundException e) { + exceptionMessage = e.getMessage(); + + } catch (InstantiationException e) { + exceptionMessage = e.getMessage(); + + } catch (IllegalAccessException e) { + exceptionMessage = e.getMessage(); + + } finally { + if (exceptionMessage != null) { + throw new IOException("error instantiating " + className + " because " + + exceptionMessage); + } + } + return o; + } + + /** + * A simple main program to test this class. + * + * @param args not used + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static void main(@SuppressWarnings("unused") String[] args) + throws IOException { + + HStoreKey[] keys = { + new HStoreKey(new Text("row1"), HConstants.COL_REGIONINFO), + new HStoreKey(new Text("row2"), HConstants.COL_SERVER), + new HStoreKey(new Text("row3"), HConstants.COL_STARTCODE) + }; + + ImmutableBytesWritable[] values = { + new ImmutableBytesWritable("value1".getBytes()), + new ImmutableBytesWritable("value2".getBytes()), + new ImmutableBytesWritable("value3".getBytes()) + }; + + @SuppressWarnings("unchecked") + MapWritable inMap = new MapWritable(HStoreKey.class, + ImmutableBytesWritable.class, + (Map) new TreeMap()); + + for (int i = 0; i < keys.length; i++) { + inMap.put(keys[i], values[i]); + } + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(bytes); + try { + inMap.write(out); + + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + + MapWritable outMap = new MapWritable(); + DataInput in = + new DataInputStream(new ByteArrayInputStream(bytes.toByteArray())); + + try { + outMap.readFields(in); + + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + + if (outMap.size() != inMap.size()) { + System.err.println("outMap.size()=" + outMap.size() + " != " + + "inMap.size()=" + inMap.size()); + } + + for (Map.Entry e: inMap.entrySet()) { + if (!outMap.containsKey(e.getKey())) { + System.err.println("outMap does not contain key " + e.getKey().toString()); + continue; + } + if (((WritableComparable) outMap.get(e.getKey())).compareTo( + e.getValue()) != 0) { + System.err.println("output value for " + e.getKey().toString() + " != input value"); + } + } + System.out.println("it worked!"); + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java (working copy) @@ -21,11 +21,15 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Map; import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.hbase.io.KeyedData; -import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; + import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; @@ -55,7 +59,8 @@ * * @param table table to be processed * @param columns space separated list of columns to fetch - * @param groupColumns space separated list of columns used to form the key used in collect + * @param groupColumns space separated list of columns used to form the key + * used in collect * @param mapper map class * @param job job configuration object */ @@ -83,11 +88,11 @@ * Pass the new key and value to reduce. * If any of the grouping columns are not found in the value, the record is skipped. * - * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) */ @Override public void map(@SuppressWarnings("unused") HStoreKey key, - KeyedDataArrayWritable value, TableOutputCollector output, + MapWritable value, TableOutputCollector output, @SuppressWarnings("unused") Reporter reporter) throws IOException { byte[][] keyVals = extractKeyValues(value); @@ -106,7 +111,7 @@ * @param r * @return array of byte values */ - protected byte[][] extractKeyValues(KeyedDataArrayWritable r) { + protected byte[][] extractKeyValues(MapWritable r) { byte[][] keyVals = null; ArrayList foundList = new ArrayList(); int numCols = m_columns.length; @@ -111,15 +116,11 @@ ArrayList foundList = new ArrayList(); int numCols = m_columns.length; if(numCols > 0) { - KeyedData[] recVals = r.get(); - boolean found = true; - for(int i = 0; i < numCols && found; i++) { - found = false; - for(int j = 0; j < recVals.length; j++) { - if(recVals[j].getKey().getColumn().equals(m_columns[i])) { - found = true; - byte[] val = recVals[j].getData(); - foundList.add(val); + for (Map.Entry e: r.entrySet()) { + Text column = (Text) e.getKey(); + for (int i = 0; i < numCols; i++) { + if (column.equals(m_columns[i])) { + foundList.add(((ImmutableBytesWritable) e.getValue()).get()); break; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java (working copy) @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Reporter; @@ -40,10 +40,10 @@ /** * Pass the key, value to reduce * - * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) */ @Override - public void map(HStoreKey key, KeyedDataArrayWritable value, + public void map(HStoreKey key, MapWritable value, TableOutputCollector output, @SuppressWarnings("unused") Reporter reporter) throws IOException { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (working copy) @@ -22,7 +22,7 @@ import java.io.IOException; import java.util.Iterator; -import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Reporter; @@ -48,7 +48,7 @@ @SuppressWarnings("unused") Reporter reporter) throws IOException { while(values.hasNext()) { - KeyedDataArrayWritable r = (KeyedDataArrayWritable)values.next(); + MapWritable r = (MapWritable)values.next(); output.collect(key, r); } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (working copy) @@ -21,13 +21,11 @@ import java.io.IOException; import java.util.Map; +import java.util.SortedMap; import java.util.TreeMap; -import java.util.ArrayList; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; @@ -40,8 +38,8 @@ import org.apache.hadoop.hbase.HTable; import org.apache.hadoop.hbase.HScannerInterface; import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.hbase.io.KeyedData; -import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.log4j.Logger; @@ -49,7 +47,7 @@ * Convert HBase tabular data into a format that is consumable by Map/Reduce */ public class TableInputFormat - implements InputFormat, JobConfigurable { +implements InputFormat, JobConfigurable { static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName()); @@ -64,11 +62,12 @@ HTable m_table; /** - * Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs + * Iterate over an HBase table data, + * return (HStoreKey, MapWritable) pairs */ - class TableRecordReader implements RecordReader { + class TableRecordReader implements RecordReader { private HScannerInterface m_scanner; - private TreeMap m_row; // current buffer + private SortedMap m_row; // current buffer private Text m_endRow; /** @@ -102,12 +101,15 @@ } /** - * @return KeyedDataArrayWritable of KeyedData + * @return MapWritable * * @see org.apache.hadoop.mapred.RecordReader#createValue() */ - public KeyedDataArrayWritable createValue() { - return new KeyedDataArrayWritable(); + @SuppressWarnings("unchecked") + public MapWritable createValue() { + return new MapWritable((Class) Text.class, + (Class) ImmutableBytesWritable.class, + (Map) new TreeMap()); } /** {@inheritDoc} */ @@ -125,14 +127,16 @@ /** * @param key HStoreKey as input key. - * @param value KeyedDataArrayWritable as input value + * @param value MapWritable as input value + * + * Converts HScannerInterface.next(HStoreKey, SortedMap) to + * HStoreKey, MapWritable * - * Converts HScannerInterface.next(HStoreKey, TreeMap(Text, byte[])) to - * (HStoreKey, KeyedDataArrayWritable) * @return true if there was more data * @throws IOException */ - public boolean next(HStoreKey key, KeyedDataArrayWritable value) throws IOException { + @SuppressWarnings("unchecked") + public boolean next(HStoreKey key, MapWritable value) throws IOException { LOG.debug("start next"); m_row.clear(); HStoreKey tKey = key; @@ -139,20 +143,15 @@ boolean hasMore = m_scanner.next(tKey, m_row); if(hasMore) { - if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) { + if(m_endRow.getLength() > 0 && + (tKey.getRow().compareTo(m_endRow) < 0)) { + hasMore = false; + } else { - KeyedDataArrayWritable rowVal = value; - ArrayList columns = new ArrayList(); - for(Map.Entry e: m_row.entrySet()) { - HStoreKey keyCol = new HStoreKey(tKey); - keyCol.setColumn(e.getKey()); - columns.add(new KeyedData(keyCol, e.getValue())); + value.put(e.getKey(), new ImmutableBytesWritable(e.getValue())); } - - // set the output - rowVal.set(columns.toArray(new KeyedData[columns.size()])); } } LOG.debug("end next"); @@ -161,7 +160,8 @@ } - public RecordReader getRecordReader( + /** {@inheritDoc} */ + public RecordReader getRecordReader( InputSplit split, @SuppressWarnings("unused") JobConf job, @SuppressWarnings("unused") Reporter reporter) throws IOException { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java (working copy) @@ -28,7 +28,7 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.io.Text; @@ -39,6 +39,7 @@ * If the column does not exist, the record is not passed to Reduce. * */ +@SuppressWarnings("unchecked") public abstract class TableMap extends MapReduceBase implements Mapper { private static final Logger LOG = Logger.getLogger(TableMap.class.getName()); @@ -64,7 +65,7 @@ job.setInputFormat(TableInputFormat.class); job.setOutputKeyClass(Text.class); - job.setOutputValueClass(KeyedDataArrayWritable.class); + job.setOutputValueClass(MapWritable.class); job.setMapperClass(mapper); job.setInputPath(new Path(table)); job.set(TableInputFormat.COLUMN_LIST, columns); @@ -95,7 +96,7 @@ if(m_collector.collector == null) { m_collector.collector = output; } - map((HStoreKey)key, (KeyedDataArrayWritable)value, m_collector, reporter); + map((HStoreKey)key, (MapWritable)value, m_collector, reporter); LOG.debug("end map"); } @@ -109,6 +110,6 @@ * @param reporter * @throws IOException */ - public abstract void map(HStoreKey key, KeyedDataArrayWritable value, + public abstract void map(HStoreKey key, MapWritable value, TableOutputCollector output, Reporter reporter) throws IOException; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java (working copy) @@ -24,7 +24,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.hbase.io.MapWritable; /** * Refine the types that can be collected from a Table Map/Reduce jobs. @@ -31,6 +31,7 @@ */ public class TableOutputCollector { /** The collector object */ + @SuppressWarnings("unchecked") public OutputCollector collector; /** @@ -40,8 +41,8 @@ * @param value * @throws IOException */ - public void collect(Text key, KeyedDataArrayWritable value) - throws IOException { + @SuppressWarnings("unchecked") + public void collect(Text key, MapWritable value) throws IOException { collector.collect(key, value); } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapred; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; @@ -34,8 +35,8 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.hbase.HTable; -import org.apache.hadoop.hbase.io.KeyedData; -import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.log4j.Logger; @@ -43,7 +44,7 @@ * Convert Map/Reduce output and write it to an HBase table */ public class TableOutputFormat - extends OutputFormatBase { + extends OutputFormatBase { /** JobConf parameter that specifies the output table */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; @@ -58,8 +59,7 @@ * and write to an HBase table */ protected class TableRecordWriter - implements RecordWriter { - + implements RecordWriter { private HTable m_table; /** @@ -74,25 +74,17 @@ /** {@inheritDoc} */ public void close(@SuppressWarnings("unused") Reporter reporter) {} - /** - * Expect key to be of type Text - * Expect value to be of type KeyedDataArrayWritable - * - * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable) - */ - public void write(Text key, KeyedDataArrayWritable value) throws IOException { + /** {@inheritDoc} */ + public void write(Text key, MapWritable value) throws IOException { LOG.debug("start write"); - Text tKey = key; - KeyedDataArrayWritable tValue = value; - KeyedData[] columns = tValue.get(); // start transaction - long xid = m_table.startUpdate(tKey); - - for(int i = 0; i < columns.length; i++) { - KeyedData column = columns[i]; - m_table.put(xid, column.getKey().getColumn(), column.getData()); + long xid = m_table.startUpdate(key); + + for (Map.Entry e: value.entrySet()) { + m_table.put(xid, (Text)e.getKey(), + ((ImmutableBytesWritable)e.getValue()).get()); } // end transaction @@ -103,14 +95,14 @@ } } - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.OutputFormatBase#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, java.lang.String, org.apache.hadoop.util.Progressable) - */ /** {@inheritDoc} */ @Override - @SuppressWarnings("unused") - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, - String name, Progressable progress) throws IOException { + @SuppressWarnings("unchecked") + public RecordWriter getRecordWriter( + @SuppressWarnings("unused") FileSystem ignored, + JobConf job, + @SuppressWarnings("unused") String name, + @SuppressWarnings("unused") Progressable progress) throws IOException { // expecting exactly one path @@ -119,8 +111,9 @@ HTable table = null; try { table = new HTable(job, tableName); - } catch(Exception e) { + } catch(IOException e) { LOG.error(e); + throw e; } LOG.debug("end get writer"); return new TableRecordWriter(table); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (revision 566873) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (working copy) @@ -34,6 +34,7 @@ /** * Write a table, sorting by the input key */ +@SuppressWarnings("unchecked") public abstract class TableReduce extends MapReduceBase implements Reducer { private static final Logger LOG = Logger.getLogger(TableReduce.class.getName()); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (revision 566873) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -32,13 +32,13 @@ * Abstract base class for test cases. Performs all static initialization */ public abstract class HBaseTestCase extends TestCase { - public final static String COLFAMILY_NAME1 = "colfamily1:"; - public final static String COLFAMILY_NAME2 = "colfamily2:"; - public final static String COLFAMILY_NAME3 = "colfamily3:"; + protected final static String COLFAMILY_NAME1 = "colfamily1:"; + protected final static String COLFAMILY_NAME2 = "colfamily2:"; + protected final static String COLFAMILY_NAME3 = "colfamily3:"; protected Path testDir = null; protected FileSystem localFs = null; - public static final char FIRST_CHAR = 'a'; - public static final char LAST_CHAR = 'z'; + protected static final char FIRST_CHAR = 'a'; + protected static final char LAST_CHAR = 'z'; static { StaticTestEnvironment.initialize(); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (revision 566873) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (working copy) @@ -149,6 +149,7 @@ /** * MapReduce job that runs a performance evaluation client in each map task. */ + @SuppressWarnings("unchecked") public static class EvaluationMapTask extends MapReduceBase implements Mapper { /** configuration parameter name that contains the command */ Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (revision 566873) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (working copy) @@ -30,6 +30,7 @@ public class TestCompaction extends HBaseTestCase { static final Log LOG = LogFactory.getLog(TestCompaction.class.getName()); + @Override protected void setUp() throws Exception { super.setUp(); } @@ -34,6 +35,7 @@ super.setUp(); } + @Override protected void tearDown() throws Exception { super.tearDown(); } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (revision 566873) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (working copy) @@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.hbase.filter.RegExpRowFilter; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.filter.RowFilterSet; @@ -37,7 +39,8 @@ import org.apache.hadoop.hbase.filter.RowFilterSet; import org.apache.hadoop.hbase.filter.StopRowFilter; import org.apache.hadoop.hbase.filter.WhileMatchRowFilter; -import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; @@ -211,13 +214,15 @@ System.currentTimeMillis(), null); while (true) { TreeMap results = new TreeMap(); - KeyedData[] values = regionServer.next(scannerId); - if (values.length == 0) { + MapWritable values = regionServer.next(scannerId); + if (values == null || values.size() == 0) { break; } - - for (int i = 0; i < values.length; i++) { - results.put(values[i].getKey().getColumn(), values[i].getData()); + + for (Map.Entry e: values.entrySet()) { + HStoreKey k = (HStoreKey) e.getKey(); + results.put(k.getColumn(), + ((ImmutableBytesWritable) e.getValue()).get()); } HRegionInfo info = (HRegionInfo) Writables.getWritable( Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java (revision 566873) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java (working copy) @@ -38,8 +38,8 @@ import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.hbase.io.KeyedData; -import org.apache.hadoop.hbase.io.KeyedDataArrayWritable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.MapWritable; import org.apache.hadoop.hbase.mapred.TableMap; import org.apache.hadoop.hbase.mapred.TableOutputCollector; @@ -150,10 +150,11 @@ /** * Pass the key, and reversed value to reduce * - * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) */ + @SuppressWarnings("unchecked") @Override - public void map(HStoreKey key, KeyedDataArrayWritable value, + public void map(HStoreKey key, MapWritable value, TableOutputCollector output, @SuppressWarnings("unused") Reporter reporter) throws IOException { @@ -158,25 +159,21 @@ @SuppressWarnings("unused") Reporter reporter) throws IOException { Text tKey = key.getRow(); - KeyedData[] columns = value.get(); - if(columns.length != 1) { + if(value.size() != 1) { throw new IOException("There should only be one input column"); } - - if(!columns[0].getKey().getColumn().equals(TEXT_INPUT_COLUMN)) { + + Text[] keys = value.keySet().toArray(new Text[value.size()]); + if(!keys[0].equals(TEXT_INPUT_COLUMN)) { throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN - + " but got: " + columns[0].getKey().getColumn()); + + " but got: " + keys[0]); } - // Get the input column key and change it to the output column key - - HStoreKey column = columns[0].getKey(); - column.setColumn(TEXT_OUTPUT_COLUMN); - // Get the original value and reverse it - String originalValue = new String(columns[0].getData()); + String originalValue = + new String(((ImmutableBytesWritable)value.get(keys[0])).get()); StringBuilder newValue = new StringBuilder(); for(int i = originalValue.length() - 1; i >= 0; i--) { newValue.append(originalValue.charAt(i)); @@ -183,11 +180,14 @@ } // Now set the value to be collected - - columns[0] = new KeyedData(column, newValue.toString().getBytes()); - value.set(columns); + + MapWritable outval = new MapWritable((Class) Text.class, + (Class) ImmutableBytesWritable.class, + (Map) new TreeMap()); + outval.put(TEXT_OUTPUT_COLUMN, + new ImmutableBytesWritable(newValue.toString().getBytes())); - output.collect(tKey, value); + output.collect(tKey, outval); } }