diff --git ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java index b9b151a..3d93a40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java @@ -44,12 +44,13 @@ * An {@link org.apache.hadoop.mapred.InputFormat} for Plain files with * {@link Deserializer} records. */ +@Deprecated public class FlatFileInputFormat extends FileInputFormat> { /** * A work-around until HADOOP-1230 is fixed. - * + * * Allows boolean next(k,v) to be called by reference but still allow the * deserializer to create a new object (i.e., row) on every call to next. */ @@ -61,7 +62,7 @@ * An implementation of SerializationContext is responsible for looking up the * Serialization implementation for the given RecordReader. Potentially based * on the Configuration or some other mechanism - * + * * The SerializationFactory does not give this functionality since: 1. * Requires Serialization implementations to be specified in the Configuration * a-priori (although same as setting a SerializationContext) 2. Does not @@ -73,7 +74,7 @@ /** * An {@link Serialization} object for objects of type S. - * + * * @return a serialization object for this context */ Serialization getSerialization() throws IOException; @@ -93,7 +94,7 @@ * An implementation of {@link SerializationContext} that reads the * Serialization class and specific subclass to be deserialized from the * JobConf. - * + * */ public static class SerializationContextFromConf implements FlatFileInputFormat.SerializationContext { @@ -110,10 +111,12 @@ */ private Configuration conf; + @Override public void setConf(Configuration conf) { this.conf = conf; } + @Override public Configuration getConf() { return conf; } @@ -123,6 +126,7 @@ public Configuration getConf() { * @exception does * not currently throw IOException */ + @Override public Class getRealClass() throws IOException { return (Class) conf.getClass(SerializationSubclassKey, null, Object.class); @@ -130,7 +134,7 @@ public Configuration getConf() { /** * Looks up and instantiates the Serialization Object - * + * * Important to note here that we are not relying on the Hadoop * SerializationFactory part of the Serialization framework. This is because * in the case of Non-Writable Objects, we cannot make any assumptions about @@ -139,11 +143,12 @@ public Configuration getConf() { * Serialization classes. The SerializationFactory currently returns the * first (de)serializer that is compatible with the class to be * deserialized; in this context, that assumption isn't necessarily true. - * + * * @return the serialization object for this context * @exception does * not currently throw any IOException */ + @Override public Serialization getSerialization() throws IOException { Class> tClass = (Class>) conf.getClass( SerializationImplKey, null, Serialization.class); @@ -154,11 +159,12 @@ public Configuration getConf() { /** * An {@link RecordReader} for plain files with {@link Deserializer} records - * + * * Reads one row at a time of type R. R is intended to be a base class of * something such as: Record, Writable, Text, ... - * + * */ + @Deprecated public class FlatFileRecordReader implements RecordReader> { @@ -211,7 +217,7 @@ public Configuration getConf() { /** * FlatFileRecordReader constructor constructs the underlying stream * (potentially decompressed) and creates the deserializer. - * + * * @param conf * the jobconf * @param split @@ -245,7 +251,7 @@ public FlatFileRecordReader(Configuration conf, FileSplit split) throws IOExcept .getClass(SerializationContextImplKey, SerializationContextFromConf.class); - sinfo = (SerializationContext)ReflectionUtils.newInstance(sinfoClass, conf); + sinfo = ReflectionUtils.newInstance(sinfoClass, conf); // Get the Serialization object and the class being deserialized Serialization serialization = sinfo.getSerialization(); @@ -264,6 +270,7 @@ public FlatFileRecordReader(Configuration conf, FileSplit split) throws IOExcept /** * @return null */ + @Override public Void createKey() { return null; } @@ -271,15 +278,16 @@ public Void createKey() { /** * @return a new R instance. */ + @Override public RowContainer createValue() { RowContainer r = new RowContainer(); - r.row = (R)ReflectionUtils.newInstance(realRowClass, conf); + r.row = ReflectionUtils.newInstance(realRowClass, conf); return r; } /** * Returns the next row # and value. - * + * * @param key * - void as these files have a value only * @param value @@ -290,6 +298,7 @@ public Void createKey() { * @exception IOException * from the deserializer */ + @Override public synchronized boolean next(Void key, RowContainer value) throws IOException { if (isEOF || in.available() == 0) { isEOF = true; @@ -311,6 +320,7 @@ public synchronized boolean next(Void key, RowContainer value) throws IOExcep } } + @Override public synchronized float getProgress() throws IOException { // this assumes no splitting if (end == 0) { @@ -322,6 +332,7 @@ public synchronized float getProgress() throws IOException { } } + @Override public synchronized long getPos() throws IOException { // assumes deserializer is not buffering itself // position over uncompressed stream. not sure what @@ -329,6 +340,7 @@ public synchronized long getPos() throws IOException { return fsin.getPos(); } + @Override public synchronized void close() throws IOException { // assuming that this closes the underlying streams deserializer.close(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/RecordTestObj.java ql/src/test/org/apache/hadoop/hive/ql/io/RecordTestObj.java deleted file mode 100644 index 1bb846b..0000000 --- ql/src/test/org/apache/hadoop/hive/ql/io/RecordTestObj.java +++ /dev/null @@ -1,276 +0,0 @@ -// File generated by hadoop record compiler. Do not edit. -package org.apache.hadoop.hive.ql.io; - -public class RecordTestObj extends org.apache.hadoop.record.Record { - private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo; - private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter; - private static int[] _rio_rtiFilterFields; - static { - _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo( - "RecordTestObj"); - _rio_recTypeInfo.addField("s", - org.apache.hadoop.record.meta.TypeID.StringTypeID); - _rio_recTypeInfo.addField("num", - org.apache.hadoop.record.meta.TypeID.LongTypeID); - } - - private String s; - private long num; - - public RecordTestObj() { - } - - public RecordTestObj(final String s, final long num) { - this.s = s; - this.num = num; - } - - public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() { - return _rio_recTypeInfo; - } - - public static void setTypeFilter( - org.apache.hadoop.record.meta.RecordTypeInfo rti) { - if (null == rti) { - return; - } - _rio_rtiFilter = rti; - _rio_rtiFilterFields = null; - } - - private static void setupRtiFields() { - if (null == _rio_rtiFilter) { - return; - } - // we may already have done this - if (null != _rio_rtiFilterFields) { - return; - } - int _rio_i, _rio_j; - _rio_rtiFilterFields = new int[_rio_rtiFilter.getFieldTypeInfos().size()]; - for (_rio_i = 0; _rio_i < _rio_rtiFilterFields.length; _rio_i++) { - _rio_rtiFilterFields[_rio_i] = 0; - } - java.util.Iterator _rio_itFilter = _rio_rtiFilter - .getFieldTypeInfos().iterator(); - _rio_i = 0; - while (_rio_itFilter.hasNext()) { - org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter - .next(); - java.util.Iterator _rio_it = _rio_recTypeInfo - .getFieldTypeInfos().iterator(); - _rio_j = 1; - while (_rio_it.hasNext()) { - org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next(); - if (_rio_tInfo.equals(_rio_tInfoFilter)) { - _rio_rtiFilterFields[_rio_i] = _rio_j; - break; - } - _rio_j++; - } - _rio_i++; - } - } - - public String getS() { - return s; - } - - public void setS(final String s) { - this.s = s; - } - - public long getNum() { - return num; - } - - public void setNum(final long num) { - this.num = num; - } - - @Override - public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, - final String _rio_tag) throws java.io.IOException { - _rio_a.startRecord(this, _rio_tag); - _rio_a.writeString(s, "s"); - _rio_a.writeLong(num, "num"); - _rio_a.endRecord(this, _rio_tag); - } - - private void deserializeWithoutFilter( - final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) - throws java.io.IOException { - _rio_a.startRecord(_rio_tag); - s = _rio_a.readString("s"); - num = _rio_a.readLong("num"); - _rio_a.endRecord(_rio_tag); - } - - @Override - public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, - final String _rio_tag) throws java.io.IOException { - if (null == _rio_rtiFilter) { - deserializeWithoutFilter(_rio_a, _rio_tag); - return; - } - // if we're here, we need to read based on version info - _rio_a.startRecord(_rio_tag); - setupRtiFields(); - for (int _rio_i = 0; _rio_i < _rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) { - if (1 == _rio_rtiFilterFields[_rio_i]) { - s = _rio_a.readString("s"); - } else if (2 == _rio_rtiFilterFields[_rio_i]) { - num = _rio_a.readLong("num"); - } else { - java.util.ArrayList typeInfos = (java.util.ArrayList) (_rio_rtiFilter - .getFieldTypeInfos()); - org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i) - .getFieldID(), typeInfos.get(_rio_i).getTypeID()); - } - } - _rio_a.endRecord(_rio_tag); - } - - @Override - public int compareTo(final Object _rio_peer_) throws ClassCastException { - if (!(_rio_peer_ instanceof RecordTestObj)) { - throw new ClassCastException("Comparing different types of records."); - } - RecordTestObj _rio_peer = (RecordTestObj) _rio_peer_; - int _rio_ret = 0; - _rio_ret = s.compareTo(_rio_peer.s); - if (_rio_ret != 0) { - return _rio_ret; - } - _rio_ret = (num == _rio_peer.num) ? 0 : ((num < _rio_peer.num) ? -1 : 1); - if (_rio_ret != 0) { - return _rio_ret; - } - return _rio_ret; - } - - @Override - public boolean equals(final Object _rio_peer_) { - if (!(_rio_peer_ instanceof RecordTestObj)) { - return false; - } - if (_rio_peer_ == this) { - return true; - } - RecordTestObj _rio_peer = (RecordTestObj) _rio_peer_; - boolean _rio_ret = false; - _rio_ret = s.equals(_rio_peer.s); - if (!_rio_ret) { - return _rio_ret; - } - _rio_ret = (num == _rio_peer.num); - if (!_rio_ret) { - return _rio_ret; - } - return _rio_ret; - } - - @Override - public Object clone() throws CloneNotSupportedException { - RecordTestObj _rio_other = new RecordTestObj(); - _rio_other.s = s; - _rio_other.num = num; - return _rio_other; - } - - @Override - public int hashCode() { - int _rio_result = 17; - int _rio_ret; - _rio_ret = s.hashCode(); - _rio_result = 37 * _rio_result + _rio_ret; - _rio_ret = (int) (num ^ (num >>> 32)); - _rio_result = 37 * _rio_result + _rio_ret; - return _rio_result; - } - - public static String signature() { - return "LRecordTestObj(sl)"; - } - - public static class Comparator extends - org.apache.hadoop.record.RecordComparator { - public Comparator() { - super(RecordTestObj.class); - } - - static public int slurpRaw(byte[] b, int s, int l) { - try { - int os = s; - { - int i = org.apache.hadoop.record.Utils.readVInt(b, s); - int z = org.apache.hadoop.record.Utils.getVIntSize(i); - s += (z + i); - l -= (z + i); - } - { - long i = org.apache.hadoop.record.Utils.readVLong(b, s); - int z = org.apache.hadoop.record.Utils.getVIntSize(i); - s += z; - l -= z; - } - return (os - s); - } catch (java.io.IOException e) { - throw new RuntimeException(e); - } - } - - static public int compareRaw(byte[] b1, int s1, int l1, byte[] b2, int s2, - int l2) { - try { - int os1 = s1; - { - int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1); - int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2); - int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); - int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); - s1 += z1; - s2 += z2; - l1 -= z1; - l2 -= z2; - int r1 = org.apache.hadoop.record.Utils.compareBytes(b1, s1, i1, b2, - s2, i2); - if (r1 != 0) { - return (r1 < 0) ? -1 : 0; - } - s1 += i1; - s2 += i2; - l1 -= i1; - l1 -= i2; - } - { - long i1 = org.apache.hadoop.record.Utils.readVLong(b1, s1); - long i2 = org.apache.hadoop.record.Utils.readVLong(b2, s2); - if (i1 != i2) { - return ((i1 - i2) < 0) ? -1 : 0; - } - int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); - int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); - s1 += z1; - s2 += z2; - l1 -= z1; - l2 -= z2; - } - return (os1 - s1); - } catch (java.io.IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - int ret = compareRaw(b1, s1, l1, b2, s2, l2); - return (ret == -1) ? -1 : ((ret == 0) ? 1 : 0); - } - } - - static { - org.apache.hadoop.record.RecordComparator.define(RecordTestObj.class, - new Comparator()); - } -} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestFlatFileInputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/TestFlatFileInputFormat.java deleted file mode 100644 index c46d754..0000000 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestFlatFileInputFormat.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.Serializable; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.serializer.JavaSerialization; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -//import org.apache.hadoop.contrib.serialization.thrift.*; - -/** - * TestFlatFileInputFormat. - * - */ -public class TestFlatFileInputFormat extends TestCase { - - public void testFlatFileInputJava() throws Exception { - Configuration conf; - JobConf job; - FileSystem fs; - Path dir; - Path file; - Reporter reporter; - FSDataOutputStream ds; - - try { - // - // create job and filesystem and reporter and such. - // - conf = new Configuration(); - job = new JobConf(conf); - fs = FileSystem.getLocal(conf); - dir = new Path(System.getProperty("test.tmp.dir", ".") + "/mapred"); - file = new Path(dir, "test.txt"); - reporter = Reporter.NULL; - fs.delete(dir, true); - - job.setClass(FlatFileInputFormat.SerializationImplKey, - org.apache.hadoop.io.serializer.JavaSerialization.class, - org.apache.hadoop.io.serializer.Serialization.class); - - job - .setClass( - FlatFileInputFormat.SerializationContextFromConf.SerializationSubclassKey, - JavaTestObjFlatFileInputFormat.class, java.io.Serializable.class); - - // - // Write some data out to a flat file - // - FileInputFormat.setInputPaths(job, dir); - ds = fs.create(file); - Serializer serializer = new JavaSerialization().getSerializer(null); - - // construct some data and write it - serializer.open(ds); - for (int i = 0; i < 10; i++) { - serializer.serialize(new JavaTestObjFlatFileInputFormat("Hello World! " - + String.valueOf(i), i)); - } - serializer.close(); - - // - // Construct the reader - // - FileInputFormat> format = - new FlatFileInputFormat(); - InputSplit[] splits = format.getSplits(job, 1); - - // construct the record reader - RecordReader> reader = format - .getRecordReader(splits[0], job, reporter); - - // create key/value - Void key = reader.createKey(); - FlatFileInputFormat.RowContainer value = reader - .createValue(); - - // - // read back the data using the FlatFileRecordReader - // - int count = 0; - while (reader.next(key, value)) { - assertTrue(key == null); - assertTrue(((JavaTestObjFlatFileInputFormat) value.row).s - .equals("Hello World! " + String.valueOf(count))); - assertTrue(((JavaTestObjFlatFileInputFormat) value.row).num == count); - count++; - } - reader.close(); - - } catch (Exception e) { - System.err.println("caught: " + e); - e.printStackTrace(); - } finally { - } - - } - - public void testFlatFileInputRecord() throws Exception { - Configuration conf; - JobConf job; - FileSystem fs; - Path dir; - Path file; - Reporter reporter; - FSDataOutputStream ds; - - try { - // - // create job and filesystem and reporter and such. - // - conf = new Configuration(); - job = new JobConf(conf); - fs = FileSystem.getLocal(conf); - dir = new Path(System.getProperty("test.tmp.dir", ".") + "/mapred"); - file = new Path(dir, "test.txt"); - reporter = Reporter.NULL; - fs.delete(dir, true); - - job.setClass(FlatFileInputFormat.SerializationImplKey, - org.apache.hadoop.io.serializer.WritableSerialization.class, - org.apache.hadoop.io.serializer.Serialization.class); - - job - .setClass( - FlatFileInputFormat.SerializationContextFromConf.SerializationSubclassKey, - RecordTestObj.class, Writable.class); - - // - // Write some data out to a flat file - // - FileInputFormat.setInputPaths(job, dir); - ds = fs.create(file); - Serializer serializer = new WritableSerialization() - .getSerializer(Writable.class); - - // construct some data and write it - serializer.open(ds); - for (int i = 0; i < 10; i++) { - serializer.serialize(new RecordTestObj("Hello World! " - + String.valueOf(i), i)); - } - serializer.close(); - - // - // Construct the reader - // - FileInputFormat> format = - new FlatFileInputFormat(); - InputSplit[] splits = format.getSplits(job, 1); - - // construct the record reader - RecordReader> reader = format - .getRecordReader(splits[0], job, reporter); - - // create key/value - Void key = reader.createKey(); - FlatFileInputFormat.RowContainer value = reader.createValue(); - - // - // read back the data using the FlatFileRecordReader - // - int count = 0; - while (reader.next(key, value)) { - assertTrue(key == null); - assertTrue(((RecordTestObj) value.row).getS().equals( - "Hello World! " + String.valueOf(count))); - assertTrue(((RecordTestObj) value.row).getNum() == count); - count++; - } - reader.close(); - - } catch (Exception e) { - System.err.println("caught: " + e); - e.printStackTrace(); - } finally { - } - - } - - /* - * public void testFlatFileInputThrift() throws Exception { Configuration - * conf; JobConf job ; FileSystem fs; Path dir ; Path file; Reporter reporter; - * FSDataOutputStream ds; - * - * try { // // create job and filesystem and reporter and such. // conf = new - * Configuration(); job = new JobConf(conf); fs = FileSystem.getLocal(conf); - * dir = new Path(System.getProperty("test.tmp.dir",".") + "/mapred"); file = - * new Path(dir, "test.txt"); reporter = Reporter.NULL; fs.delete(dir, true); - * - * job.setClass(FlatFileInputFormat.SerializationContextFromConf. - * SerializationImplKey, - * org.apache.hadoop.contrib.serialization.thrift.ThriftSerialization.class, - * org.apache.hadoop.io.serializer.Serialization.class); - * - * job.setClass(FlatFileInputFormat.SerializationContextFromConf. - * SerializationSubclassKey, FlatFileThriftTestObj.class, TBase.class); - * - * // // Write some data out to a flat file // - * FileInputFormat.setInputPaths(job, dir); ds = fs.create(file); Serializer - * serializer = new ThriftSerialization().getSerializer(TBase.class); - * - * // construct some data and write it serializer.open(ds); for (int i = 0; i - * < 10; i++) { serializer.serialize(new FlatFileThriftTestObj("Hello World! " - * + String.valueOf(i), i)); } serializer.close(); - * - * // // Construct the reader // FileInputFormat> format = new - * FlatFileInputFormat(); InputSplit[] splits = format.getSplits(job, - * 1); - * - * // construct the record reader RecordReader> reader = - * format.getRecordReader(splits[0], job, reporter); - * - * // create key/value Void key = reader.createKey(); - * FlatFileInputFormat.RowContainer value = reader.createValue(); - * - * // // read back the data using the FlatFileRecordReader // int count = 0; - * while (reader.next(key, value)) { assertTrue(key == null); - * assertTrue(((FlatFileThriftTestObj)value.row).s.equals("Hello World! " - * +String.valueOf(count))); assertTrue(((FlatFileThriftTestObj)value.row).num - * == count); count++; } reader.close(); - * - * } catch(Exception e) { System.err.println("caught: " + e); - * e.printStackTrace(); } finally { } - * - * } - */ - - public static void main(String[] args) throws Exception { - new TestFlatFileInputFormat().testFlatFileInputJava(); - new TestFlatFileInputFormat().testFlatFileInputRecord(); - // new TestFlatFileInputFormat().testFlatFileInputThrift(); - } -}