Index: hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java (revision 1413899) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java (working copy) @@ -23,18 +23,12 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowLock; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -151,6 +145,8 @@ HConstants.EMPTY_END_ROW); } + /* + * TODO @Test public void testPut() throws Exception{ byte[] row = "row".getBytes(); byte[] fam = "fam".getBytes(); @@ -254,6 +250,7 @@ } } } + */ @Test public void testGet() throws Exception{ byte[] row = "row".getBytes(); @@ -347,6 +344,8 @@ assertEquals(tr.getMin(), desTr.getMin()); } + /* + * TODO @Test public void testResultEmpty() throws Exception { List keys = new ArrayList(); Result r = new Result(keys); @@ -520,6 +519,7 @@ assertTrue(deResults.length == 0); } + */ @Test public void testTimeRange() throws Exception{ TimeRange tr = new TimeRange(0,5); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAttributes.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAttributes.java (revision 1413899) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAttributes.java (working copy) @@ -19,12 +19,6 @@ package org.apache.hadoop.hbase.client; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.hbase.SmallTests; @@ -36,29 +30,6 @@ @Category(SmallTests.class) public class TestAttributes { @Test - public void testAttributesSerialization() throws IOException { - Put put = new Put(); - put.setAttribute("attribute1", Bytes.toBytes("value1")); - put.setAttribute("attribute2", Bytes.toBytes("value2")); - put.setAttribute("attribute3", Bytes.toBytes("value3")); - - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutput out = new DataOutputStream(byteArrayOutputStream); - put.write(out); - - Put put2 = new Put(); - Assert.assertTrue(put2.getAttributesMap().isEmpty()); - - put2.readFields(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))); - - Assert.assertNull(put2.getAttribute("absent")); - Assert.assertTrue(Arrays.equals(Bytes.toBytes("value1"), put2.getAttribute("attribute1"))); - Assert.assertTrue(Arrays.equals(Bytes.toBytes("value2"), put2.getAttribute("attribute2"))); - Assert.assertTrue(Arrays.equals(Bytes.toBytes("value3"), put2.getAttribute("attribute3"))); - Assert.assertEquals(3, put2.getAttributesMap().size()); - } - - @Test public void testPutAttributes() { Put put = new Put(); Assert.assertTrue(put.getAttributesMap().isEmpty()); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java (working copy) @@ -32,7 +32,7 @@ */ @Deprecated @SuppressWarnings("unchecked") -public interface TableReduce +public interface TableReduce extends Reducer { } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java (working copy) @@ -32,7 +32,7 @@ * @param Writable value class */ @Deprecated -public interface TableMap, V extends Writable> +public interface TableMap, V> extends Mapper { } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (working copy) @@ -226,7 +226,7 @@ if (result != null && result.size() > 0) { key.set(result.getRow()); lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); + value.copyFrom(result); return true; } return false; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (working copy) @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Map; -import java.util.TreeMap; - -/** - * @deprecated Replaced by MultiResponse - * Response class for MultiPut. - */ -public class MultiPutResponse implements Writable { - - protected MultiPut request; // used in client code ONLY - - protected Map answers = new TreeMap(Bytes.BYTES_COMPARATOR); - - public MultiPutResponse() {} - - public void addResult(byte[] regionName, int result) { - answers.put(regionName, result); - } - - public Integer getAnswer(byte[] region) { - return answers.get(region); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(answers.size()); - for( Map.Entry e : answers.entrySet()) { - Bytes.writeByteArray(out, e.getKey()); - out.writeInt(e.getValue()); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - answers.clear(); - - int mapSize = in.readInt(); - for( int i = 0 ; i < mapSize ; i++ ) { - byte[] key = Bytes.readByteArray(in); - int value = in.readInt(); - - answers.put(key, value); - } - } -} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/Delete.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/Delete.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/Delete.java (working copy) @@ -24,10 +24,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -67,10 +64,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public class Delete extends Mutation - implements Writable, Comparable { - private static final byte DELETE_VERSION = (byte)3; - +public class Delete extends Mutation implements Comparable { /** Constructor for Writable. DO NOT USE */ public Delete() { this((byte [])null); @@ -264,52 +258,4 @@ map.put("ts", this.ts); return map; } - - //Writable - public void readFields(final DataInput in) throws IOException { - int version = in.readByte(); - if (version > DELETE_VERSION) { - throw new IOException("version not supported"); - } - this.row = Bytes.readByteArray(in); - this.ts = in.readLong(); - this.lockId = in.readLong(); - if (version > 2) { - this.writeToWAL = in.readBoolean(); - } - this.familyMap.clear(); - int numFamilies = in.readInt(); - for(int i=0;i list = new ArrayList(numColumns); - for(int j=0;j 1) { - readAttributes(in); - } - } - - public void write(final DataOutput out) throws IOException { - out.writeByte(DELETE_VERSION); - Bytes.writeByteArray(out, this.row); - out.writeLong(this.ts); - out.writeLong(this.lockId); - out.writeBoolean(this.writeToWAL); - out.writeInt(familyMap.size()); - for(Map.Entry> entry : familyMap.entrySet()) { - Bytes.writeByteArray(out, entry.getKey()); - List list = entry.getValue(); - out.writeInt(list.size()); - for(KeyValue kv : list) { - kv.write(out); - } - } - writeAttributes(out); - } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java (working copy) @@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.client; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -30,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.WritableUtils; @InterfaceAudience.Public @InterfaceStability.Evolving @@ -87,30 +83,6 @@ return size; } - protected void writeAttributes(final DataOutput out) throws IOException { - if (this.attributes == null) { - out.writeInt(0); - } else { - out.writeInt(this.attributes.size()); - for (Map.Entry attr : this.attributes.entrySet()) { - WritableUtils.writeString(out, attr.getKey()); - Bytes.writeByteArray(out, attr.getValue()); - } - } - } - - protected void readAttributes(final DataInput in) throws IOException { - int numAttributes = in.readInt(); - if (numAttributes > 0) { - this.attributes = new HashMap(numAttributes); - for(int i=0; i { - private static final byte PUT_VERSION = (byte)2; - +public class Put extends Mutation implements HeapSize, Comparable { private static final long OVERHEAD = ClassSize.align( ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN + @@ -363,62 +357,4 @@ return ClassSize.align((int)heapsize); } - - //Writable - public void readFields(final DataInput in) - throws IOException { - int version = in.readByte(); - if (version > PUT_VERSION) { - throw new IOException("version not supported"); - } - this.row = Bytes.readByteArray(in); - this.ts = in.readLong(); - this.lockId = in.readLong(); - this.writeToWAL = in.readBoolean(); - int numFamilies = in.readInt(); - if (!this.familyMap.isEmpty()) this.familyMap.clear(); - for(int i=0;i keys = new ArrayList(numKeys); - int totalLen = in.readInt(); - byte [] buf = new byte[totalLen]; - int offset = 0; - for (int j = 0; j < numKeys; j++) { - int keyLength = in.readInt(); - in.readFully(buf, offset, keyLength); - keys.add(new KeyValue(buf, offset, keyLength)); - offset += keyLength; - } - this.familyMap.put(family, keys); - } - if (version > 1) { - readAttributes(in); - } - } - - public void write(final DataOutput out) - throws IOException { - out.writeByte(PUT_VERSION); - Bytes.writeByteArray(out, this.row); - out.writeLong(this.ts); - out.writeLong(this.lockId); - out.writeBoolean(this.writeToWAL); - out.writeInt(familyMap.size()); - for (Map.Entry> entry : familyMap.entrySet()) { - Bytes.writeByteArray(out, entry.getKey()); - List keys = entry.getValue(); - out.writeInt(keys.size()); - int totalLen = 0; - for(KeyValue kv : keys) { - totalLen += kv.getLength(); - } - out.writeInt(totalLen); - for(KeyValue kv : keys) { - out.writeInt(kv.getLength()); - out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); - } - } - writeAttributes(out); - } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/Result.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/Result.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/Result.java (working copy) @@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.client; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -36,10 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; /** * Single row result of a {@link Get} or {@link Scan} query.

@@ -70,17 +64,13 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public class Result implements Writable, WritableWithSize { - private static final byte RESULT_VERSION = (byte)1; - private static final int DEFAULT_BUFFER_SIZE = 1024; - +public class Result { private KeyValue [] kvs = null; private NavigableMap>> familyMap = null; // We're not using java serialization. Transient here is just a marker to say // that this is where we cache row if we're ever asked for it. private transient byte [] row = null; - private ImmutableBytesWritable bytes = null; // never use directly private static byte [] buffer = null; @@ -114,23 +104,12 @@ } /** - * Instantiate a Result from the specified raw binary format. - * @param bytes raw binary format of Result - */ - public Result(ImmutableBytesWritable bytes) { - this.bytes = bytes; - } - - /** * Method for retrieving the row key that corresponds to * the row from which this Result was created. * @return row */ public byte [] getRow() { if (this.row == null) { - if(this.kvs == null) { - readFields(); - } this.row = this.kvs.length == 0? null: this.kvs[0].getRow(); } return this.row; @@ -157,9 +136,6 @@ * @return array of KeyValues */ public KeyValue[] raw() { - if(this.kvs == null) { - readFields(); - } return kvs; } @@ -171,9 +147,6 @@ * @return The sorted list of KeyValue's. */ public List list() { - if(this.kvs == null) { - readFields(); - } return isEmpty()? null: Arrays.asList(raw()); } @@ -654,25 +627,10 @@ } /** - * Returns the raw binary encoding of this Result.

- * - * Please note, there may be an offset into the underlying byte array of the - * returned ImmutableBytesWritable. Be sure to use both - * {@link ImmutableBytesWritable#get()} and {@link ImmutableBytesWritable#getOffset()} - * @return pointer to raw binary of Result - */ - public ImmutableBytesWritable getBytes() { - return this.bytes; - } - - /** * Check if the underlying KeyValue [] is empty or not * @return true if empty */ public boolean isEmpty() { - if(this.kvs == null) { - readFields(); - } return this.kvs == null || this.kvs.length == 0; } @@ -680,9 +638,6 @@ * @return the size of the underlying KeyValue [] */ public int size() { - if(this.kvs == null) { - readFields(); - } return this.kvs == null? 0: this.kvs.length; } @@ -711,179 +666,6 @@ return sb.toString(); } - //Writable - public void readFields(final DataInput in) - throws IOException { - familyMap = null; - row = null; - kvs = null; - int totalBuffer = in.readInt(); - if(totalBuffer == 0) { - bytes = null; - return; - } - byte [] raw = new byte[totalBuffer]; - readChunked(in, raw, 0, totalBuffer); - bytes = new ImmutableBytesWritable(raw, 0, totalBuffer); - } - - private void readChunked(final DataInput in, byte[] dest, int ofs, int len) - throws IOException { - int maxRead = 8192; - - for (; ofs < len; ofs += maxRead) - in.readFully(dest, ofs, Math.min(len - ofs, maxRead)); - } - - //Create KeyValue[] when needed - private void readFields() { - if (bytes == null) { - this.kvs = new KeyValue[0]; - return; - } - byte [] buf = bytes.get(); - int offset = bytes.getOffset(); - int finalOffset = bytes.getSize() + offset; - List kvs = new ArrayList(); - while(offset < finalOffset) { - int keyLength = Bytes.toInt(buf, offset); - offset += Bytes.SIZEOF_INT; - kvs.add(new KeyValue(buf, offset, keyLength)); - offset += keyLength; - } - this.kvs = kvs.toArray(new KeyValue[kvs.size()]); - } - - public long getWritableSize() { - if (isEmpty()) - return Bytes.SIZEOF_INT; // int size = 0 - - long size = Bytes.SIZEOF_INT; // totalLen - - for (KeyValue kv : kvs) { - size += kv.getLength(); - size += Bytes.SIZEOF_INT; // kv.getLength - } - - return size; - } - - public void write(final DataOutput out) - throws IOException { - if(isEmpty()) { - out.writeInt(0); - } else { - int totalLen = 0; - for(KeyValue kv : kvs) { - totalLen += kv.getLength() + Bytes.SIZEOF_INT; - } - out.writeInt(totalLen); - for(KeyValue kv : kvs) { - out.writeInt(kv.getLength()); - out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); - } - } - } - - public static long getWriteArraySize(Result [] results) { - long size = Bytes.SIZEOF_BYTE; // RESULT_VERSION - if (results == null || results.length == 0) { - size += Bytes.SIZEOF_INT; - return size; - } - - size += Bytes.SIZEOF_INT; // results.length - size += Bytes.SIZEOF_INT; // bufLen - for (Result result : results) { - size += Bytes.SIZEOF_INT; // either 0 or result.size() - if (result == null || result.isEmpty()) - continue; - - for (KeyValue kv : result.raw()) { - size += Bytes.SIZEOF_INT; // kv.getLength(); - size += kv.getLength(); - } - } - - return size; - } - - public static void writeArray(final DataOutput out, Result [] results) - throws IOException { - // Write version when writing array form. - // This assumes that results are sent to the client as Result[], so we - // have an opportunity to handle version differences without affecting - // efficiency. - out.writeByte(RESULT_VERSION); - if(results == null || results.length == 0) { - out.writeInt(0); - return; - } - out.writeInt(results.length); - int bufLen = 0; - for(Result result : results) { - bufLen += Bytes.SIZEOF_INT; - if(result == null || result.isEmpty()) { - continue; - } - for(KeyValue key : result.raw()) { - bufLen += key.getLength() + Bytes.SIZEOF_INT; - } - } - out.writeInt(bufLen); - for(Result result : results) { - if(result == null || result.isEmpty()) { - out.writeInt(0); - continue; - } - out.writeInt(result.size()); - for(KeyValue kv : result.raw()) { - out.writeInt(kv.getLength()); - out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); - } - } - } - - public static Result [] readArray(final DataInput in) - throws IOException { - // Read version for array form. - // This assumes that results are sent to the client as Result[], so we - // have an opportunity to handle version differences without affecting - // efficiency. - int version = in.readByte(); - if (version > RESULT_VERSION) { - throw new IOException("version not supported"); - } - int numResults = in.readInt(); - if(numResults == 0) { - return new Result[0]; - } - Result [] results = new Result[numResults]; - int bufSize = in.readInt(); - byte [] buf = new byte[bufSize]; - int offset = 0; - for(int i=0;i > puts = new TreeMap>(Bytes.BYTES_COMPARATOR); - - /** - * Writable constructor only. - */ - public MultiPut() {} - - /** - * MultiPut for putting multiple regions worth of puts in one RPC. - * @param a address - */ - public MultiPut(HServerAddress a) { - address = a; - } - - public int size() { - int size = 0; - for( List l : puts.values()) { - size += l.size(); - } - return size; - } - - public void add(byte[] regionName, Put aPut) { - List rsput = puts.get(regionName); - if (rsput == null) { - rsput = new ArrayList(); - puts.put(regionName, rsput); - } - rsput.add(aPut); - } - - public Collection allPuts() { - List res = new ArrayList(); - for ( List pp : puts.values() ) { - res.addAll(pp); - } - return res; - } - - /** - * Compile the table and column family (i.e. schema) information - * into a String. Useful for parsing and aggregation by debugging, - * logging, and administration tools. - * @return Map - */ - @Override - public Map getFingerprint() { - Map map = new HashMap(); - // for extensibility, we have a map of table information that we will - // populate with only family information for each table - Map tableInfo = - new HashMap(); - map.put("tables", tableInfo); - for (Map.Entry> entry : puts.entrySet()) { - // our fingerprint only concerns itself with which families are touched, - // not how many Puts touch them, so we use this Set to do just that. - Set familySet; - try { - // since the puts are stored by region, we may have already - // recorded families for this region. if that is the case, - // we want to add to the existing Set. if not, we make a new Set. - String tableName = Bytes.toStringBinary( - HRegionInfo.parseRegionName(entry.getKey())[0]); - if (tableInfo.get(tableName) == null) { - Map table = new HashMap(); - familySet = new TreeSet(); - table.put("families", familySet); - tableInfo.put(tableName, table); - } else { - familySet = (Set) tableInfo.get(tableName).get("families"); - } - } catch (IOException ioe) { - // in the case of parse error, default to labeling by region - Map table = new HashMap(); - familySet = new TreeSet(); - table.put("families", familySet); - tableInfo.put(Bytes.toStringBinary(entry.getKey()), table); - } - // we now iterate through each Put and keep track of which families - // are affected in this table. - for (Put p : entry.getValue()) { - for (byte[] fam : p.getFamilyMap().keySet()) { - familySet.add(Bytes.toStringBinary(fam)); - } - } - } - return map; - } - - /** - * Compile the details beyond the scope of getFingerprint (mostly - * toMap from the Puts) into a Map along with the fingerprinted - * information. Useful for debugging, logging, and administration tools. - * @param maxCols a limit on the number of columns output prior to truncation - * @return Map - */ - @Override - public Map toMap(int maxCols) { - Map map = getFingerprint(); - Map tableInfo = (Map) map.get("tables"); - int putCount = 0; - for (Map.Entry> entry : puts.entrySet()) { - // If the limit has been hit for put output, just adjust our counter - if (putCount >= DEFAULT_MAX_PUT_OUTPUT) { - putCount += entry.getValue().size(); - continue; - } - List regionPuts = entry.getValue(); - List> putSummaries = - new ArrayList>(); - // find out how many of this region's puts we can add without busting - // the maximum - int regionPutsToAdd = regionPuts.size(); - putCount += regionPutsToAdd; - if (putCount > DEFAULT_MAX_PUT_OUTPUT) { - regionPutsToAdd -= putCount - DEFAULT_MAX_PUT_OUTPUT; - } - for (Iterator iter = regionPuts.iterator(); regionPutsToAdd-- > 0;) { - putSummaries.add(iter.next().toMap(maxCols)); - } - // attempt to extract the table name from the region name - String tableName = ""; - try { - tableName = Bytes.toStringBinary( - HRegionInfo.parseRegionName(entry.getKey())[0]); - } catch (IOException ioe) { - // in the case of parse error, default to labeling by region - tableName = Bytes.toStringBinary(entry.getKey()); - } - // since the puts are stored by region, we may have already - // recorded puts for this table. if that is the case, - // we want to add to the existing List. if not, we place a new list - // in the map - Map table = - (Map) tableInfo.get(tableName); - if (table == null) { - // in case the Put has changed since getFingerprint's map was built - table = new HashMap(); - tableInfo.put(tableName, table); - table.put("puts", putSummaries); - } else if (table.get("puts") == null) { - table.put("puts", putSummaries); - } else { - ((List>) table.get("puts")).addAll(putSummaries); - } - } - map.put("totalPuts", putCount); - return map; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(puts.size()); - for( Map.Entry> e : puts.entrySet()) { - Bytes.writeByteArray(out, e.getKey()); - - List ps = e.getValue(); - out.writeInt(ps.size()); - for( Put p : ps ) { - p.write(out); - } - } - } - - @Override - public void readFields(DataInput in) throws IOException { - puts.clear(); - - int mapSize = in.readInt(); - - for (int i = 0 ; i < mapSize; i++) { - byte[] key = Bytes.readByteArray(in); - - int listSize = in.readInt(); - List ps = new ArrayList(listSize); - for ( int j = 0 ; j < listSize; j++ ) { - Put put = new Put(); - put.readFields(in); - ps.add(put); - } - puts.put(key, ps); - } - } -} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/Increment.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/Increment.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/Increment.java (working copy) @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.client; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Map; import java.util.NavigableMap; @@ -30,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; /** * Used to perform Increment operations on a single row. @@ -47,8 +44,6 @@ @InterfaceAudience.Public @InterfaceStability.Stable public class Increment implements Row { - private static final byte INCREMENT_VERSION = (byte)2; - private byte [] row = null; private long lockId = -1L; private boolean writeToWAL = true; @@ -275,72 +270,6 @@ return sb.toString(); } - //Writable - public void readFields(final DataInput in) - throws IOException { - int version = in.readByte(); - if (version > INCREMENT_VERSION) { - throw new IOException("unsupported version"); - } - this.row = Bytes.readByteArray(in); - this.tr = new TimeRange(); - tr.readFields(in); - this.lockId = in.readLong(); - int numFamilies = in.readInt(); - if (numFamilies == 0) { - throw new IOException("At least one column required"); - } - this.familyMap = - new TreeMap>(Bytes.BYTES_COMPARATOR); - for(int i=0; i set = null; - if(hasColumns) { - int numColumns = in.readInt(); - set = new TreeMap(Bytes.BYTES_COMPARATOR); - for(int j=0; j 1) { - this.writeToWAL = in.readBoolean(); - } - } - - public void write(final DataOutput out) - throws IOException { - out.writeByte(INCREMENT_VERSION); - Bytes.writeByteArray(out, this.row); - tr.write(out); - out.writeLong(this.lockId); - if (familyMap.size() == 0) { - throw new IOException("At least one column required"); - } - out.writeInt(familyMap.size()); - for(Map.Entry> entry : - familyMap.entrySet()) { - Bytes.writeByteArray(out, entry.getKey()); - NavigableMap columnSet = entry.getValue(); - if(columnSet == null) { - throw new IOException("At least one column required per family"); - } else { - out.writeBoolean(true); - out.writeInt(columnSet.size()); - for(Map.Entry qualifier : columnSet.entrySet()) { - Bytes.writeByteArray(out, qualifier.getKey()); - out.writeLong(qualifier.getValue()); - } - } - } - out.writeBoolean(writeToWAL); - } - @Override public int compareTo(Row i) { return Bytes.compareTo(this.getRow(), i.getRow()); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -2365,7 +2365,7 @@ * @return true if the new put was executed, false otherwise */ public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOp compareOp, ByteArrayComparable comparator, Writable w, + CompareOp compareOp, ByteArrayComparable comparator, Mutation w, Integer lockId, boolean writeToWAL) throws IOException{ checkReadOnly(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java (revision 0) @@ -0,0 +1,79 @@ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + +public class MutationSerialization implements Serialization { + @Override + public boolean accept(Class c) { + return Mutation.class.isAssignableFrom(c); + } + + @Override + public Deserializer getDeserializer(Class c) { + return new MutationDeserializer(); + } + + @Override + public Serializer getSerializer(Class c) { + return new MutationSerializer(); + } + + private static class MutationDeserializer implements Deserializer { + private InputStream in; + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public Mutation deserialize(Mutation mutation) throws IOException { + Mutate proto = Mutate.parseFrom(in); + return ProtobufUtil.toMutation(proto); + } + + @Override + public void open(InputStream in) throws IOException { + this.in = in; + } + + } + private static class MutationSerializer implements Serializer { + private OutputStream out; + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void open(OutputStream out) throws IOException { + this.out = out; + } + + @Override + public void serialize(Mutation mutation) throws IOException { + MutateType type; + if (mutation instanceof Put) { + type = MutateType.PUT; + } else if (mutation instanceof Delete) { + type = MutateType.DELETE; + } else { + throw new IllegalArgumentException("Only Put and Delete are supported"); + } + ProtobufUtil.toMutate(type, mutation).writeTo(out); + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -18,10 +18,6 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.InvocationTargetException; @@ -36,9 +32,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; @@ -49,9 +45,7 @@ import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; @@ -141,6 +135,8 @@ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); conf.set(TableInputFormat.INPUT_TABLE, table); conf.set(TableInputFormat.SCAN, convertScanToString(scan)); + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName()); if (addDependencyJars) { addDependencyJars(job); } @@ -363,6 +359,8 @@ job.setOutputFormatClass(TableOutputFormat.class); if (reducer != null) job.setReducerClass(reducer); conf.set(TableOutputFormat.OUTPUT_TABLE, table); + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName()); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { // Calling this will validate the format Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java (revision 0) @@ -0,0 +1,68 @@ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + +public class ResultSerialization implements Serialization { + @Override + public boolean accept(Class c) { + return Result.class.isAssignableFrom(c); + } + + @Override + public Deserializer getDeserializer(Class c) { + return new ResultDeserializer(); + } + + @Override + public Serializer getSerializer(Class c) { + return new ResultSerializer(); + } + + private static class ResultDeserializer implements Deserializer { + private InputStream in; + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public Result deserialize(Result mutation) throws IOException { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result proto = + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result.parseFrom(in); + return ProtobufUtil.toResult(proto); + } + + @Override + public void open(InputStream in) throws IOException { + this.in = in; + } + + } + private static class ResultSerializer implements Serializer { + private OutputStream out; + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void open(OutputStream out) throws IOException { + this.out = out; + } + + @Override + public void serialize(Result result) throws IOException { + ProtobufUtil.toResult(result).writeTo(out); + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 1413899) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -414,22 +414,6 @@ public static long getWritableSize(Object instance, Class declaredClass, Configuration conf) { - long size = Bytes.SIZEOF_BYTE; // code - if (instance == null) { - return 0L; - } - - if (declaredClass.isArray()) { - if (declaredClass.equals(Result[].class)) { - - return size + Result.getWriteArraySize((Result[])instance); - } - } - if (declaredClass.equals(Result.class)) { - Result r = (Result) instance; - // one extra class code for writable instance. - return r.getWritableSize() + size + Bytes.SIZEOF_BYTE; - } return 0L; // no hint is the default. } /** @@ -460,8 +444,6 @@ // byte-at-a-time we were previously doing. if (declClass.equals(byte [].class)) { Bytes.writeByteArray(out, (byte [])instanceObj); - } else if(declClass.equals(Result [].class)) { - Result.writeArray(out, (Result [])instanceObj); } else { //if it is a Generic array, write the element's type if (getClassCode(declaredClass) == GENERIC_ARRAY_CODE) { @@ -641,8 +623,6 @@ } else if (declaredClass.isArray()) { // array if (declaredClass.equals(byte [].class)) { instance = Bytes.readByteArray(in); - } else if(declaredClass.equals(Result [].class)) { - instance = Result.readArray(in); } else { int length = in.readInt(); instance = Array.newInstance(declaredClass.getComponentType(), length); Index: hbase-examples/src/main/java/org/apache/hadoop/hbase/mapreduce/IndexBuilder.java =================================================================== --- hbase-examples/src/main/java/org/apache/hadoop/hbase/mapreduce/IndexBuilder.java (revision 1413899) +++ hbase-examples/src/main/java/org/apache/hadoop/hbase/mapreduce/IndexBuilder.java (working copy) @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.GenericOptionsParser; @@ -73,7 +72,7 @@ * Internal Mapper to be run by Hadoop. */ public static class Map extends - Mapper { + Mapper { private byte[] family; private HashMap indexes;