From 5fe5e609e51adcfde7e919ea99cd20d82924a6a5 Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Sat, 26 Sep 2015 12:19:00 -0800 Subject: [PATCH] HIVE-11972 : [Refactor] Improve determination of dynamic partitioning columns in FileSink Operator --- .../hadoop/hive/ql/exec/FileSinkOperator.java | 19 +- .../org/apache/hadoop/hive/ql/exec/Utilities.java | 17 ++ .../ql/optimizer/ConstantPropagateProcFactory.java | 11 +- .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 10 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 30 +- .../hadoop/hive/ql/plan/DynamicPartitionCtx.java | 27 -- .../hadoop/hive/ql/exec/TestFileSinkOperator.java | 306 +++++++++++++++++---- 7 files changed, 273 insertions(+), 147 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 2604d5d..39944a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -493,24 +493,7 @@ private void dpSetup() { assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has " + inputObjInspectors.length; StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0]; - // remove the last dpMapping.size() columns from the OI - List fieldOI = soi.getAllStructFieldRefs(); - ArrayList newFieldsOI = new ArrayList(); - ArrayList newFieldsName = new ArrayList(); - this.dpStartCol = 0; - for (StructField sf : fieldOI) { - String fn = sf.getFieldName(); - if (!dpCtx.getInputToDPCols().containsKey(fn)) { - newFieldsOI.add(sf.getFieldObjectInspector()); - newFieldsName.add(sf.getFieldName()); - this.dpStartCol++; - } else { - // once we found the start column for partition column we are done - break; - } - } - assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty"; - + this.dpStartCol = Utilities.getDPColOffset(conf); this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol); this.dpVals = new ArrayList(numDynParts); this.dpWritables = new ArrayList(numDynParts); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index bcf85a4..5b21af9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -119,6 +119,7 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -3916,4 +3917,20 @@ public static void stripHivePasswordDetails(Configuration conf) { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD, ""); } } + + public static int getDPColOffset(FileSinkDesc conf) { + + if (conf.getWriteType() == AcidUtils.Operation.DELETE) { + // For deletes, there is only ROW__ID in non-partitioning, non-bucketing columns. + //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details. + return 1; + } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { + // For updates, ROW__ID is an extra column at index 0. + //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details. + return getColumnNames(conf.getTableInfo().getProperties()).size() + 1; + } else { + return getColumnNames(conf.getTableInfo().getProperties()).size(); + } + + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java index 5c6a6df..25156b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java @@ -843,7 +843,7 @@ private static ExprNodeDesc evaluateFunction(GenericUDF udf, List } } if (constant.getTypeInfo().getCategory() != Category.PRIMITIVE) { - // nested complex types cannot be folded cleanly + // nested complex types cannot be folded cleanly return null; } Object value = constant.getValue(); @@ -1163,16 +1163,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. DynamicPartitionCtx dpCtx = fsdesc.getDynPartCtx(); if (dpCtx != null) { - // If all dynamic partitions are propagated as constant, remove DP. - Set inputs = dpCtx.getInputToDPCols().keySet(); - // Assume only 1 parent for FS operator Operator parent = op.getParentOperators().get(0); Map parentConstants = cppCtx.getPropagatedConstants(parent); RowSchema rs = parent.getSchema(); boolean allConstant = true; - for (String input : inputs) { - ColumnInfo ci = rs.getColumnInfo(input); + int dpColStartIdx = Utilities.getDPColOffset(fsdesc); + List colInfos = rs.getSignature(); + for (int i = dpColStartIdx; i < colInfos.size(); i++) { + ColumnInfo ci = colInfos.get(i); if (parentConstants.get(ci) == null) { allConstant = false; break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 02fbdfe..c696fd5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -497,9 +497,6 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set signature = inputRS.getSignature(); String tblAlias = fsInputDesc.getTableInfo().getTableName(); - LinkedHashMap colMap = new LinkedHashMap(); for (String dpCol : dpCtx.getDPColNames()) { ColumnInfo colInfo = new ColumnInfo(dpCol, TypeInfoFactory.stringTypeInfo, // all partition column type should be string tblAlias, true); // partition column is virtual column signature.add(colInfo); - colMap.put(dpCol, dpCol); // input and output have the same column name } inputRS.setSignature(signature); // create another DynamicPartitionCtx, which has a different input-to-DP column mapping DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx); - dpCtx2.setInputToDPCols(colMap); fsOutputDesc.setDynPartCtx(dpCtx2); // update the FileSinkOperator to include partition columns @@ -1896,7 +1890,7 @@ static void usePartitionColumns(Properties properties, List partColNames "Partition Names, " + Arrays.toString(partNames) + " don't match partition Types, " + Arrays.toString(partTypes)); - Map typeMap = new HashMap(); + Map typeMap = new HashMap<>(); for (int i = 0; i < partNames.length; i++) { String previousValue = typeMap.put(partNames[i], partTypes[i]); Preconditions.checkArgument(previousValue == null, "Partition columns configuration is inconsistent. " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index dda28b0..a4959d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -736,7 +736,7 @@ private ASTNode genValuesTempTable(ASTNode originalFrom, QB qb) throws SemanticE Path dataDir = null; if(!qb.getEncryptedTargetTablePaths().isEmpty()) { //currently only Insert into T values(...) is supported thus only 1 values clause - //and only 1 target table are possible. If/when support for + //and only 1 target table are possible. If/when support for //select ... from values(...) is added an insert statement may have multiple //encrypted target tables. dataDir = ctx.getMRTmpPath(qb.getEncryptedTargetTablePaths().get(0).toUri()); @@ -1556,7 +1556,7 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException for (String alias : tabAliases) { String tab_name = qb.getTabNameForAlias(alias); - + // we first look for this alias from CTE, and then from catalog. /* * if this s a CTE reference: Add its AST as a SubQuery to this QB. @@ -6830,30 +6830,6 @@ Operator genConversionSelectOperator(String dest, QB qb, Operator input, .getColumnInfos()), input), rowResolver); input.setColumnExprMap(colExprMap); } - - rowFields = opParseCtx.get(input).getRowResolver() - .getColumnInfos(); - if (deleting()) { - // Figure out if we have partition columns in the list or not. If so, - // add them into the mapping. Partition columns will be located after the row id. - if (rowFields.size() > 1) { - // This means we have partition columns to deal with, so set up the mapping from the - // input to the partition columns. - dpCtx.mapInputToDP(rowFields.subList(1, rowFields.size())); - } - } else if (updating()) { - // In this case we expect the number of in fields to exceed the number of out fields by one - // (for the ROW__ID virtual column). If there are more columns than this, - // then the extras are for dynamic partitioning - if (dynPart && dpCtx != null) { - dpCtx.mapInputToDP(rowFields.subList(tableFields.size() + 1, rowFields.size())); - } - } else { - if (dynPart && dpCtx != null) { - // create the mapping from input ExprNode to dest table DP column - dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size())); - } - } return input; } @@ -10105,7 +10081,7 @@ private void preProcessForInsert(ASTNode node, QB qb) throws SemanticException { return; } for (Node child : node.getChildren()) { - //each insert of multi insert looks like + //each insert of multi insert looks like //(TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME T1))) if (((ASTNode) child).getToken().getType() != HiveParser.TOK_INSERT) { continue; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 24db7d0..95d5635 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -19,14 +19,11 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.metadata.Table; public class DynamicPartitionCtx implements Serializable { @@ -43,8 +40,6 @@ private Path rootPath; // the root path DP columns paths start from private int numBuckets; // number of buckets in each partition - private Map inputToDPCols; // mapping from input column names to DP columns - private List spNames; // sp column names private List dpNames; // dp column names private String defaultPartName; // default partition name in case of null or empty value @@ -71,7 +66,6 @@ public DynamicPartitionCtx(Table tbl, Map partSpec, String defau } this.numDPCols = dpNames.size(); this.numSPCols = spNames.size(); - this.inputToDPCols = new HashMap(); if (this.numSPCols > 0) { this.spPath = Warehouse.makeDynamicPartName(partSpec); } else { @@ -86,25 +80,12 @@ public DynamicPartitionCtx(DynamicPartitionCtx dp) { this.spPath = dp.spPath; this.rootPath = dp.rootPath; this.numBuckets = dp.numBuckets; - this.inputToDPCols = dp.inputToDPCols; this.spNames = dp.spNames; this.dpNames = dp.dpNames; this.defaultPartName = dp.defaultPartName; this.maxPartsPerNode = dp.maxPartsPerNode; } - public void mapInputToDP(List fs) { - - assert fs.size() == this.numDPCols: "input DP column size != numDPCols"; - - Iterator itr1 = fs.iterator(); - Iterator itr2 = dpNames.iterator(); - - while (itr1.hasNext() && itr2.hasNext()) { - inputToDPCols.put(itr1.next().getInternalName(), itr2.next()); - } - } - public int getMaxPartitionsPerNode() { return this.maxPartsPerNode; } @@ -161,14 +142,6 @@ public void setSPColNames(List sp) { this.spNames = sp; } - public Map getInputToDPCols() { - return this.inputToDPCols; - } - - public void setInputToDPCols(Map map) { - this.inputToDPCols = map; - } - public void setNumDPCols(int dp) { this.numDPCols = dp; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index c6ae030..a84c33a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -37,6 +36,7 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -50,7 +50,6 @@ import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -75,9 +74,10 @@ import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -94,8 +94,7 @@ private static TableDesc nonAcidTableDescriptor; private static TableDesc acidTableDescriptor; private static ObjectInspector inspector; - private static List rows; - private static ValidTxnList txnList; + private static List rows; private Path basePath; private JobConf jc; @@ -105,98 +104,97 @@ public static void classSetup() { Properties properties = new Properties(); properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName()); nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties); + properties.setProperty(serdeConstants.LIST_COLUMNS,"data"); properties = new Properties(properties); properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1"); acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties); - tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + "testFileSinkOperator"); tmpdir.mkdir(); tmpdir.deleteOnExit(); - txnList = new ValidReadTxnList(new long[]{}, 2); } @Test public void testNonAcidWrite() throws Exception { setBasePath("write"); - setupData(DataFormat.SIMPLE); + setupData(DataFormat.SIMPLE, RowType.RegRow); FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, false, 0); processRows(op); - confirmOutput(); + confirmOutput(RowType.RegRow); } @Test public void testInsert() throws Exception { setBasePath("insert"); - setupData(DataFormat.SIMPLE); + setupData(DataFormat.SIMPLE, RowType.RegRow); FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, false, 1); processRows(op); Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(RowType.RegRow); } @Test public void testUpdate() throws Exception { setBasePath("update"); - setupData(DataFormat.WITH_RECORD_ID); + setupData(DataFormat.WITH_RECORD_ID, RowType.RegRow); FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, false, 2); processRows(op); Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(RowType.RegRow); } @Test public void testDelete() throws Exception { setBasePath("delete"); - setupData(DataFormat.WITH_RECORD_ID); + setupData(DataFormat.WITH_RECORD_ID, RowType.RegRow); FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, false, 2); processRows(op); Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(RowType.RegRow); } @Test public void testNonAcidDynamicPartitioning() throws Exception { setBasePath("writeDP"); - setupData(DataFormat.WITH_PARTITION_VALUE); + setupData(DataFormat.WITH_PARTITION_VALUE, RowType.RegRow); FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, true, 0); processRows(op); - confirmOutput(); + confirmOutput(RowType.RegRow); } @Test public void testInsertDynamicPartitioning() throws Exception { setBasePath("insertDP"); - setupData(DataFormat.WITH_PARTITION_VALUE); + setupData(DataFormat.WITH_PARTITION_VALUE, RowType.InsertRow); FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, true, 1); processRows(op); // We only expect 5 here because we'll get whichever of the partitions published its stats // last. Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(RowType.InsertRow); } @Test public void testUpdateDynamicPartitioning() throws Exception { setBasePath("updateDP"); - setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE); + setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE, RowType.RegRow); FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, true, 2); processRows(op); Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(RowType.RegRow); } @Test public void testDeleteDynamicPartitioning() throws Exception { setBasePath("deleteDP"); - setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE); + setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE, RowType.DelRow); FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, true, 2); processRows(op); // We only expect -5 here because we'll get whichever of the partitions published its stats // last. Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(RowType.DelRow); } @@ -220,12 +218,41 @@ private void setBasePath(String testName) { private enum DataFormat {SIMPLE, WITH_RECORD_ID, WITH_PARTITION_VALUE, WITH_RECORD_ID_AND_PARTITION_VALUE}; - private void setupData(DataFormat format) { + private void setupData(DataFormat format, RowType rType) { + + if (rType == RowType.InsertRow) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (InsertRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + rows = new ArrayList(); + for (int i = 0; i < 10; i++) { + rows.add( + new InsertRow( + new Text("its fleect was white as snow"), + (i < 5) ? new Text("Monday") : new Text("Tuesday") + ) + ); + } + return; + } + + if (rType == RowType.DelRow) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (DeleteRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + rows = new ArrayList(); + for (int i = 0; i < 10; i++) { + rows.add( + new DeleteRow(new RecordIdentifier(1, 1, i), + (i < 5) ? new Text("Monday") : new Text("Tuesday") + ) + ); + } + return; + } // Build object inspector inspector = ObjectInspectorFactory.getReflectionObjectInspector (TFSORow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - rows = new ArrayList(); + rows = new ArrayList(); switch (format) { case SIMPLE: @@ -300,9 +327,6 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, Map partColMap= new LinkedHashMap(1); partColMap.put(PARTCOL_NAME, null); DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); - Map partColNames = new HashMap(1); - partColNames.put(PARTCOL_NAME, PARTCOL_NAME); - dpCtx.setInputToDPCols(partColNames); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null); } else { @@ -320,27 +344,43 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, } private void processRows(FileSinkOperator op) throws HiveException { - for (TFSORow r : rows) op.process(r, 0); + for (Object r : rows) op.process(r, 0); op.jobCloseOp(jc, true); op.close(false); } - private void confirmOutput() throws IOException, SerDeException { + private enum RowType {DelRow, InsertRow, RegRow}; + + private void confirmOutput(RowType rType) throws IOException, SerDeException { Path[] paths = findFilesInBasePath(); - TFSOInputFormat input = new TFSOInputFormat(); + TFSOInputFormat input = new TFSOInputFormat(rType); FileInputFormat.setInputPaths(jc, paths); InputSplit[] splits = input.getSplits(jc, 1); - RecordReader reader = input.getRecordReader(splits[0], jc, + RecordReader reader = input.getRecordReader(splits[0], jc, Mockito.mock(Reporter.class)); NullWritable key = reader.createKey(); - TFSORow value = reader.createValue(); - List results = new ArrayList(rows.size()); - List sortedRows = new ArrayList(rows.size()); + WritableComparable value = reader.createValue(); + List results = new ArrayList(rows.size()); + List sortedRows = new ArrayList(rows.size()); for (int i = 0; i < rows.size(); i++) { Assert.assertTrue(reader.next(key, value)); - results.add(new TFSORow(value)); - sortedRows.add(new TFSORow(rows.get(i))); + + switch (rType) { + case RegRow: + results.add(new TFSORow((TFSORow)value)); + break; + case InsertRow: + results.add(new InsertRow((InsertRow)value)); + break; + case DelRow: + results.add(new DeleteRow((DeleteRow)value)); + break; + default: + throw new RuntimeException("Unexpected RowType"); + } + + sortedRows.add(rows.get(i)); } Assert.assertFalse(reader.next(key, value)); Collections.sort(results); @@ -370,6 +410,143 @@ private void recurseOnPath(Path p, FileSystem fs, List paths) throws IOExc } } + private static class DeleteRow implements WritableComparable { + + private RecordIdentifier recId; + private Text partVal; + + public DeleteRow() { + } + public DeleteRow(RecordIdentifier recId, Text partVal) { + super(); + this.recId = recId; + this.partVal = partVal; + } + public DeleteRow(DeleteRow value) { + this(value.recId, value.partVal); + } + @Override + public void write(DataOutput dataOutput) throws IOException { + if (partVal == null) { + dataOutput.writeBoolean(false); + } else { + dataOutput.writeBoolean(true); + partVal.write(dataOutput); + } + if (recId == null) { + dataOutput.writeBoolean(false); + } else { + dataOutput.writeBoolean(true); + recId.write(dataOutput); + } + } + @Override + public void readFields(DataInput dataInput) throws IOException { + boolean notNull = dataInput.readBoolean(); + if (notNull) { + partVal = new Text(); + partVal.readFields(dataInput); + } + notNull = dataInput.readBoolean(); + if (notNull) { + recId = new RecordIdentifier(); + recId.readFields(dataInput); + } + + } + @Override + public int compareTo(DeleteRow other) { + if (recId == null && other.recId == null) { + return comparePartVal(other); + } else if (recId == null) { + return -1; + } else { + int rc = recId.compareTo(other.recId); + if (rc == 0) return comparePartVal(other); + else return rc; + } + } + private int comparePartVal(DeleteRow other) { + + return partVal.compareTo(other.partVal); + } + + @Override + public boolean equals(Object obj) { + return compareTo((DeleteRow)obj) == 0; + } + } + private static class InsertRow implements WritableComparable { + + public InsertRow(Text data, Text partVal) { + super(); + this.data = data; + this.partVal = partVal; + } + + public InsertRow(InsertRow other) { + this(other.data, other.partVal); + } + + public InsertRow() { + } + + private Text data; + private Text partVal; + + + @Override + public void write(DataOutput dataOutput) throws IOException { + data.write(dataOutput); + if (partVal == null) { + dataOutput.writeBoolean(false); + } else { + dataOutput.writeBoolean(true); + partVal.write(dataOutput); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + data = new Text(); + data.readFields(dataInput); + boolean notNull = dataInput.readBoolean(); + if (notNull) { + partVal = new Text(); + partVal.readFields(dataInput); + } + } + + @Override + public int compareTo(InsertRow other) { + if (partVal == null && other.partVal == null) { + return compareData(other); + } else if (partVal == null) { + return -1; + } else { + int rc = partVal.compareTo(other.partVal); + if (rc == 0) return compareData(other); + else return rc; + } + } + + private int compareData(InsertRow other) { + if (data == null && other.data == null) return 0; + else if (data == null) return -1; + else return data.compareTo(other.data); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof InsertRow) { + InsertRow other = (InsertRow) obj; + return compareTo(other) == 0; + + } else { + return false; + } + } + } private static class TFSORow implements WritableComparable { private RecordIdentifier recId; private Text data; @@ -493,14 +670,19 @@ private int compareData(TFSORow other) { } } - private static class TFSOInputFormat extends FileInputFormat - implements AcidInputFormat { + private static class TFSOInputFormat extends FileInputFormat + implements AcidInputFormat { FSDataInputStream in[] = null; int readingFrom = -1; + RowType rType; + + public TFSOInputFormat(RowType rType) { + this.rType = rType; + } @Override - public RecordReader getRecordReader( + public RecordReader getRecordReader( InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { if (in == null) { Path paths[] = FileInputFormat.getInputPaths(entries); @@ -511,13 +693,13 @@ private int compareData(TFSORow other) { } readingFrom = 0; } - return new RecordReader() { + return new RecordReader() { @Override - public boolean next(NullWritable nullWritable, TFSORow tfsoRecord) throws + public boolean next(NullWritable nullWritable, V tfsoRecord) throws IOException { try { - tfsoRecord.readFields(in[readingFrom]); + ((WritableComparable)tfsoRecord).readFields(in[readingFrom]); return true; } catch (EOFException e) { in[readingFrom].close(); @@ -532,8 +714,18 @@ public NullWritable createKey() { } @Override - public TFSORow createValue() { - return new TFSORow(); + public V createValue() { + switch (rType) { + case RegRow: + return (V)new TFSORow(); + case InsertRow: + return (V)new InsertRow(); + case DelRow: + return (V)new DeleteRow(); + + default: + throw new RuntimeException("Unknown row Type"); + } } @Override @@ -554,14 +746,14 @@ public float getProgress() throws IOException { } @Override - public RowReader getReader(InputSplit split, + public RowReader getReader(InputSplit split, Options options) throws IOException { return null; } @Override - public RawReader getRawReader(Configuration conf, + public RawReader getRawReader(Configuration conf, boolean collapseEvents, int bucket, ValidTxnList validTxnList, @@ -580,7 +772,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList public static class TFSOOutputFormat extends FileOutputFormat implements AcidOutputFormat { - List records = new ArrayList(); + List records = new ArrayList<>(); long numRecordsAdded = 0; FSDataOutputStream out = null; @@ -588,7 +780,6 @@ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList public RecordUpdater getRecordUpdater(final Path path, final Options options) throws IOException { - final StructObjectInspector inspector = (StructObjectInspector)options.getInspector(); return new RecordUpdater() { @Override public void insert(long currentTransaction, Object row) throws IOException { @@ -608,9 +799,7 @@ public void delete(long currentTransaction, Object row) throws IOException { } private void addRow(Object row) { - assert row instanceof TFSORow : "Expected TFSORow but got " + - row.getClass().getName(); - records.add((TFSORow)row); + records.add((Writable)row); } @Override @@ -619,7 +808,7 @@ public void flush() throws IOException { FileSystem fs = path.getFileSystem(options.getConfiguration()); out = fs.create(path); } - for (TFSORow r : records) r.write(out); + for (Writable r : records) r.write(out); records.clear(); out.flush(); } @@ -657,8 +846,7 @@ public SerDeStats getStats() { return new FileSinkOperator.RecordWriter() { @Override public void write(Writable w) throws IOException { - Assert.assertTrue(w instanceof TFSORow); - records.add((TFSORow) w); + records.add(w); } @Override @@ -667,7 +855,7 @@ public void close(boolean abort) throws IOException { FileSystem fs = finalOutPath.getFileSystem(jc); out = fs.create(finalOutPath); } - for (TFSORow r : records) r.write(out); + for (Writable r : records) r.write(out); records.clear(); out.flush(); out.close(); @@ -702,15 +890,11 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - assert obj instanceof TFSORow : "Expected TFSORow or decendent, got " - + obj.getClass().getName(); - return (TFSORow)obj; + return (Writable)obj; } @Override public Object deserialize(Writable blob) throws SerDeException { - assert blob instanceof TFSORow : "Expected TFSORow or decendent, got " - + blob.getClass().getName(); return blob; } -- 1.7.12.4 (Apple Git-37)