Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 676639) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchOperation; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.HbaseMapWritable; @@ -1146,6 +1147,7 @@ checkOpen(); this.requestCount.incrementAndGet(); HRegion region = getRegion(regionName); + validateValuesLength(b, region); try { cacheFlusher.reclaimMemcacheMemory(); region.batchUpdate(b); @@ -1158,6 +1160,30 @@ } } + /** + * Utility method to verify values length + * @param batchUpdate The update to verify + * @throws IOException Thrown if a value is too long + */ + private void validateValuesLength(BatchUpdate batchUpdate, + HRegion region) throws IOException { + HTableDescriptor desc = region.getTableDesc(); + for (Iterator iter = + batchUpdate.iterator(); iter.hasNext();) { + + BatchOperation operation = iter.next(); + int maxLength = + desc.getFamily(HStoreKey.getFamily(operation.getColumn())). + getMaxValueLength(); + if(operation.getValue() != null) + if(operation.getValue().length > maxLength) { + throw new IOException("Value in column " + + Bytes.toString(operation.getColumn()) + " is too long. " + + operation.getValue().length + " instead of " + maxLength); + } + } + } + // // remote scanner interface // Index: src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java =================================================================== --- src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java (revision 676639) +++ src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java (working copy) @@ -38,7 +38,11 @@ public class TestBatchUpdate extends HBaseClusterTestCase { private static final String CONTENTS_STR = "contents:"; private static final byte [] CONTENTS = Bytes.toBytes(CONTENTS_STR); + private static final String SMALLFAM_STR = "smallfam:"; + private static final byte [] SMALLFAM = Bytes.toBytes(SMALLFAM_STR); + private static final int SMALL_LENGTH = 1; private byte[] value; + private byte[] smallValue; private HTableDescriptor desc = null; private HTable table = null; @@ -49,6 +53,7 @@ public TestBatchUpdate() throws UnsupportedEncodingException { super(); value = "abcd".getBytes(HConstants.UTF8_ENCODING); + smallValue = "a".getBytes(HConstants.UTF8_ENCODING); } /** @@ -59,6 +64,12 @@ super.setUp(); this.desc = new HTableDescriptor("test"); desc.addFamily(new HColumnDescriptor(CONTENTS_STR)); + desc.addFamily(new HColumnDescriptor(SMALLFAM, + HColumnDescriptor.DEFAULT_VERSIONS, + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, SMALL_LENGTH, + HColumnDescriptor.DEFAULT_TTL, HColumnDescriptor.DEFAULT_BLOOMFILTER)); HBaseAdmin admin = new HBaseAdmin(conf); admin.createTable(desc); table = new HTable(conf, desc.getName()); @@ -86,4 +97,32 @@ } } } + + public void testBatchUpdateMaxLength() { + // Test for a single good value + BatchUpdate batchUpdate = new BatchUpdate("row1"); + batchUpdate.put(SMALLFAM, value); + try { + table.commit(batchUpdate); + fail("Value is too long, should throw exception"); + } catch (IOException e) { + // This is expected + } + // Try to see if it's still inserted + try { + Cell cell = table.get("row1", SMALLFAM_STR); + assertNull(cell); + } catch (IOException e) { + e.printStackTrace(); + fail("This is unexpected"); + } + // Try to put a good value + batchUpdate = new BatchUpdate("row1"); + batchUpdate.put(SMALLFAM, smallValue); + try { + table.commit(batchUpdate); + } catch (IOException e) { + fail("Value is long enough, should not throw exception"); + } + } }