diff --git a/src/java/org/apache/hcatalog/data/DataType.java b/src/java/org/apache/hcatalog/data/DataType.java index a5947e3..c4642d8 100644 --- a/src/java/org/apache/hcatalog/data/DataType.java +++ b/src/java/org/apache/hcatalog/data/DataType.java @@ -36,6 +36,7 @@ public abstract class DataType { public static final byte FLOAT = 20; public static final byte DOUBLE = 25; public static final byte STRING = 55; + public static final byte BINARY = 60; public static final byte MAP = 100; public static final byte STRUCT = 110; @@ -75,6 +76,8 @@ public abstract class DataType { return LIST; } else if (o instanceof Map) { return MAP; + }else if (o instanceof byte[]) { + return BINARY; } else {return ERROR;} } @@ -112,6 +115,9 @@ public abstract class DataType { case SHORT: return ((Short)o1).compareTo((Short)o2); + + case BINARY: + return compareByteArray((byte[])o1, (byte[])o2); case LIST: List l1 = (List)o1; @@ -170,4 +176,30 @@ public abstract class DataType { return dt1 < dt2 ? -1 : 1; } } + + private static int compareByteArray(byte[] o1, byte[] o2) { + + for(int i = 0; i < o1.length; i++){ + if(i == o2.length){ + return 1; + } + if(o1[i] == o2[i]){ + continue; + } + if(o1[i] > o1[i]){ + return 1; + } + else{ + return -1; + } + } + + //bytes in o1 are same as o2 + //in case o2 was longer + if(o2.length > o1.length){ + return -1; + } + return 0; //equals + } + } diff --git a/src/java/org/apache/hcatalog/data/HCatRecord.java b/src/java/org/apache/hcatalog/data/HCatRecord.java index 36fed9e..9404a7d 100644 --- a/src/java/org/apache/hcatalog/data/HCatRecord.java +++ b/src/java/org/apache/hcatalog/data/HCatRecord.java @@ -50,6 +50,14 @@ public abstract class HCatRecord implements HCatRecordable { public void setBoolean(String fieldName, HCatSchema recordSchema, Boolean value) throws HCatException { set(fieldName,recordSchema,value); } + + public byte[] getByteArray(String fieldName, HCatSchema recordSchema) throws HCatException { + return (byte[]) get(fieldName, recordSchema, byte[].class); + } + + public void setByteArray(String fieldName, HCatSchema recordSchema, byte[] value) throws HCatException { + set(fieldName,recordSchema,value); + } public Byte getByte(String fieldName, HCatSchema recordSchema) throws HCatException { //TINYINT diff --git a/src/java/org/apache/hcatalog/data/ReaderWriter.java b/src/java/org/apache/hcatalog/data/ReaderWriter.java index 5f4c483..83856e7 100644 --- a/src/java/org/apache/hcatalog/data/ReaderWriter.java +++ b/src/java/org/apache/hcatalog/data/ReaderWriter.java @@ -73,7 +73,13 @@ public abstract class ReaderWriter { case DataType.NULL: return null; - + + case DataType.BINARY: + int len = in.readInt(); + byte[] ba = new byte[len]; + in.readFully(ba); + return ba; + case DataType.MAP: int size = in.readInt(); Map m = new HashMap(size); @@ -166,7 +172,13 @@ public abstract class ReaderWriter { out.write(utfBytes); return; - + case DataType.BINARY: + byte[] ba = (byte[])val; + out.writeByte(DataType.BINARY); + out.writeInt(ba.length); + out.write(ba); + return; + case DataType.NULL: out.writeByte(DataType.NULL); return; diff --git a/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java b/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java index 2355dc3..f91db66 100644 --- a/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java +++ b/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; @@ -42,7 +41,6 @@ import org.apache.pig.ResourceStatistics; import org.apache.pig.StoreFunc; import org.apache.pig.StoreMetadata; import org.apache.pig.backend.BackendException; -import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; @@ -247,10 +245,10 @@ public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata switch(type){ case BINARY: - ByteArrayRef ba = new ByteArrayRef(); - byte[] bytes = (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get(); - ba.setData(bytes); - return ba; + if (pigObj == null) { + return null; + } + return ((DataByteArray)pigObj).get(); case STRUCT: if (pigObj == null) { diff --git a/src/java/org/apache/hcatalog/pig/PigHCatUtil.java b/src/java/org/apache/hcatalog/pig/PigHCatUtil.java index 3ef5763..938507b 100644 --- a/src/java/org/apache/hcatalog/pig/PigHCatUtil.java +++ b/src/java/org/apache/hcatalog/pig/PigHCatUtil.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -307,7 +306,7 @@ public class PigHCatUtil { Type itemType = hfs.getType(); switch (itemType){ case BINARY: - result = (o == null) ? null : new DataByteArray(((ByteArrayRef)o).getData()); + result = (o == null) ? null : new DataByteArray((byte[])o); break; case STRUCT: result = transformToTuple((List)o,hfs); diff --git a/src/test/org/apache/hcatalog/data/TestDefaultHCatRecord.java b/src/test/org/apache/hcatalog/data/TestDefaultHCatRecord.java index 7093e5e..250c5dc 100644 --- a/src/test/org/apache/hcatalog/data/TestDefaultHCatRecord.java +++ b/src/test/org/apache/hcatalog/data/TestDefaultHCatRecord.java @@ -32,9 +32,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; import junit.framework.Assert; import junit.framework.TestCase; @@ -75,12 +78,82 @@ public class TestDefaultHCatRecord extends TestCase{ public void testCompareTo() { HCatRecord[] recs = getHCatRecords(); Assert.assertTrue(HCatDataCheckUtil.compareRecords(recs[0],recs[1]) == 0); + Assert.assertTrue(HCatDataCheckUtil.compareRecords(recs[4],recs[5]) == 0); } public void testEqualsObject() { HCatRecord[] recs = getHCatRecords(); Assert.assertTrue(HCatDataCheckUtil.recordsEqual(recs[0],recs[1])); + Assert.assertTrue(HCatDataCheckUtil.recordsEqual(recs[4],recs[5])); + } + + /** + * Test get and set calls with type + * @throws HCatException + */ + public void testGetSetByType1() throws HCatException{ + HCatRecord inpRec = getHCatRecords()[0]; + HCatRecord newRec = new DefaultHCatRecord(inpRec.size()); + HCatSchema hsch = + HCatSchemaUtils.getHCatSchema( + "a:tinyint,b:smallint,c:int,d:bigint,e:float,f:double,g:boolean,h:string,i:binary,j:string"); + + + newRec.setByte("a", hsch, inpRec.getByte("a", hsch) ); + newRec.setShort("b", hsch, inpRec.getShort("b", hsch) ); + newRec.setInteger("c", hsch, inpRec.getInteger("c", hsch) ); + newRec.setLong("d", hsch, inpRec.getLong("d", hsch) ); + newRec.setFloat("e", hsch, inpRec.getFloat("e", hsch) ); + newRec.setDouble("f", hsch, inpRec.getDouble("f", hsch) ); + newRec.setBoolean("g", hsch, inpRec.getBoolean("g", hsch) ); + newRec.setString("h", hsch, inpRec.getString("h", hsch) ); + newRec.setByteArray("i", hsch, inpRec.getByteArray("i", hsch) ); + newRec.setString("j", hsch, inpRec.getString("j", hsch) ); + + Assert.assertTrue(HCatDataCheckUtil.recordsEqual(newRec,inpRec)); + + + } + + /** + * Test get and set calls with type + * @throws HCatException + */ + public void testGetSetByType2() throws HCatException{ + HCatRecord inpRec = getGetSet2InpRec(); + + HCatRecord newRec = new DefaultHCatRecord(inpRec.size()); + HCatSchema hsch = + HCatSchemaUtils.getHCatSchema("a:binary,b:map,c:array,d:struct"); + + + newRec.setByteArray("a", hsch, inpRec.getByteArray("a", hsch) ); + newRec.setMap("b", hsch, inpRec.getMap("b", hsch) ); + newRec.setList("c", hsch, inpRec.getList("c", hsch) ); + newRec.setStruct("d", hsch, inpRec.getStruct("d", hsch) ); + + Assert.assertTrue(HCatDataCheckUtil.recordsEqual(newRec,inpRec)); + } + + + private HCatRecord getGetSet2InpRec() { + List rlist = new ArrayList(); + + rlist.add(new byte[]{1,2,3}); + + Map mapcol = new HashMap(3); + mapcol.put(new Short("2"), "hcat is cool"); + mapcol.put(new Short("3"), "is it?"); + mapcol.put(new Short("4"), "or is it not?"); + rlist.add(mapcol); + + List listcol = new ArrayList(); + listcol.add(314); + listcol.add(007); + rlist.add( listcol);//list + rlist.add( listcol);//struct + return new DefaultHCatRecord(rlist); } private HCatRecord[] getHCatRecords(){ @@ -90,7 +163,9 @@ public class TestDefaultHCatRecord extends TestCase{ rec_1.add(new Short("456")); rec_1.add( new Integer(789)); rec_1.add( new Long(1000L)); + rec_1.add( new Float(5.3F)); rec_1.add( new Double(5.3D)); + rec_1.add( new Boolean(true)); rec_1.add( new String("hcat and hadoop")); rec_1.add( null); rec_1.add( "null"); @@ -102,7 +177,9 @@ public class TestDefaultHCatRecord extends TestCase{ rec_2.add( new Short("456")); rec_2.add( new Integer(789)); rec_2.add( new Long(1000L)); + rec_2.add( new Float(5.3F)); rec_2.add( new Double(5.3D)); + rec_2.add( new Boolean(true)); rec_2.add( new String("hcat and hadoop")); rec_2.add( null); rec_2.add( "null"); @@ -149,7 +226,37 @@ public class TestDefaultHCatRecord extends TestCase{ rec_4.add( innerList2); HCatRecord tup_4 = new DefaultHCatRecord(rec_4); - return new HCatRecord[]{tup_1,tup_2,tup_3,tup_4}; + + List rec_5 = new ArrayList(3); + rec_5.add( getByteArray()); + rec_5.add( getStruct()); + rec_5.add( getList()); + HCatRecord tup_5 = new DefaultHCatRecord(rec_5); + + + List rec_6 = new ArrayList(3); + rec_6.add( getByteArray()); + rec_6.add( getStruct()); + rec_6.add( getList()); + HCatRecord tup_6 = new DefaultHCatRecord(rec_6); + + + return new HCatRecord[]{tup_1,tup_2,tup_3,tup_4,tup_5,tup_6}; + + } + + private Object getList() { + return getStruct(); + } + + private Object getByteArray() { + return new byte[]{1,2,3,4}; + } + private List getStruct() { + List struct = new ArrayList(); + struct.add(new Integer(1)); + struct.add(new String("x")); + return struct; } } diff --git a/src/test/org/apache/hcatalog/pig/TestHCatStorer.java b/src/test/org/apache/hcatalog/pig/TestHCatStorer.java index 5ecfc9f..39d82c1 100644 --- a/src/test/org/apache/hcatalog/pig/TestHCatStorer.java +++ b/src/test/org/apache/hcatalog/pig/TestHCatStorer.java @@ -473,7 +473,7 @@ public class TestHCatStorer extends TestCase { driver.getResults(res); Iterator itr = res.iterator(); - assertEquals( "0\tNULL\tNULL\tNULL\tNULL\t\tnull" ,itr.next()); + assertEquals( "0\tNULL\tNULL\tNULL\tNULL\tnull\tnull" ,itr.next()); assertEquals( "NULL\t4.2\t2.2\t4\tlets hcat\tbinary-data\tnull" ,itr.next()); assertEquals( "3\t6.2999997\t3.3000000000000003\t6\tlets hcat\tbinary-data\tnull",itr.next()); assertFalse(itr.hasNext()); @@ -481,13 +481,19 @@ public class TestHCatStorer extends TestCase { server.registerQuery("B = load 'junit_unparted' using "+HCatLoader.class.getName()+";"); Iterator iter = server.openIterator("B"); int count = 0; + int num5nulls = 0; while(iter.hasNext()){ Tuple t = iter.next(); - assertTrue(t.get(5) instanceof DataByteArray); + if(t.get(5) == null){ + num5nulls++; + }else { + assertTrue(t.get(5) instanceof DataByteArray); + } assertNull(t.get(6)); count++; } assertEquals(3, count); + assertEquals(1, num5nulls); driver.run("drop table junit_unparted"); }