diff --git common/src/java/org/apache/hadoop/hive/conf/Constants.java common/src/java/org/apache/hadoop/hive/conf/Constants.java index 51408b1..10aaee1 100644 --- common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -33,7 +33,10 @@ public static final String DRUID_DATA_SOURCE = "druid.datasource"; public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity"; public static final String DRUID_QUERY_GRANULARITY = "druid.query.granularity"; + public static final String DRUID_TARGET_SHARDS_PER_GRANULARITY = + "druid.segment.targetShardsPerGranularity"; public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity"; + public static final String DRUID_SHARD_KEY_COL_NAME = "__druid_extra_partition_key"; public static final String DRUID_QUERY_JSON = "druid.query.json"; public static final String DRUID_QUERY_TYPE = "druid.query.type"; public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 0977329..1d136ad 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -92,6 +92,10 @@ tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ? tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) : HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY); + final int targetNumShardsPerGranularity = Integer.parseUnsignedInt( + tableProperties.getProperty(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, "0")); + final int maxPartitionSize = targetNumShardsPerGranularity > 0 ? -1 : HiveConf + .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE); // If datasource is in the table properties, it is an INSERT/INSERT OVERWRITE as the datasource // name was already persisted. Otherwise, it is a CT/CTAS and we need to get the name from the // job properties that are set by configureOutputJobProperties in the DruidStorageHandler @@ -181,8 +185,9 @@ List aggregatorFactories = aggregatorFactoryBuilder.build(); final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec( new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), - new DimensionsSpec(dimensions, - Lists.newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME), null + new DimensionsSpec(dimensions, Lists + .newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME + ), null ) )); @@ -199,8 +204,6 @@ final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY); final String version = jc.get(Constants.DRUID_SEGMENT_VERSION); - Integer maxPartitionSize = HiveConf - .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE); String basePersistDirectory = HiveConf .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY); if (Strings.isNullOrEmpty(basePersistDirectory)) { diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index cf4dad6..07f9e2c 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -24,6 +24,7 @@ import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -78,12 +79,14 @@ private SegmentIdentifier currentOpenSegment = null; - private final Integer maxPartitionSize; + private final int maxPartitionSize; private final FileSystem fileSystem; private final Supplier committerSupplier; + private final Granularity segmentGranularity; + public DruidRecordWriter( DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, @@ -106,12 +109,13 @@ public DruidRecordWriter( dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO, DruidStorageHandlerUtils.INDEX_MERGER_V9 ); - Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need to be greater than 0"); this.maxPartitionSize = maxPartitionSize; appenderator.startJob(); // maybe we need to move this out of the constructor this.segmentsDescriptorDir = Preconditions .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null"); this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is null"); + this.segmentGranularity = this.dataSchema.getGranularitySpec() + .getSegmentGranularity(); committerSupplier = Suppliers.ofInstance(Committers.nil()); } @@ -125,10 +129,6 @@ public DruidRecordWriter( * @return segmentIdentifier with of the truncatedTime and maybe push the current open segment. */ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { - - final Granularity segmentGranularity = dataSchema.getGranularitySpec() - .getSegmentGranularity(); - final Interval interval = new Interval( new DateTime(truncatedTime), segmentGranularity.increment(new DateTime(truncatedTime)) @@ -136,14 +136,13 @@ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { SegmentIdentifier retVal; if (currentOpenSegment == null) { - retVal = new SegmentIdentifier( + currentOpenSegment = new SegmentIdentifier( dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(0) ); - currentOpenSegment = retVal; - return retVal; + return currentOpenSegment; } else if (currentOpenSegment.getInterval().equals(interval)) { retVal = currentOpenSegment; int rowCount = appenderator.getRowCount(retVal); @@ -238,22 +237,50 @@ public String apply( @Override public void write(Writable w) throws IOException { DruidWritable record = (DruidWritable) w; - final long timestamp = (long) record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); - final long truncatedTime = (long) record.getValue() - .get(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME); - - InputRow inputRow = new MapBasedInputRow( - timestamp, - dataSchema.getParser() - .getParseSpec() - .getDimensionsSpec() - .getDimensionNames(), - record.getValue() + final long timestamp = + (long) record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); + final long truncatedTime = + (long) record.getValue().get(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME); + final int partitionNumber = Math.toIntExact( + (long) record.getValue().getOrDefault(Constants.DRUID_SHARD_KEY_COL_NAME, -1)); + final InputRow inputRow = new MapBasedInputRow(timestamp, + dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), + record.getValue() ); try { - appenderator - .add(getSegmentIdentifierAndMaybePush(truncatedTime), inputRow, committerSupplier); + if (partitionNumber != -1 && maxPartitionSize == -1) { + final Interval interval = new Interval(new DateTime(truncatedTime), + segmentGranularity.increment(new DateTime(truncatedTime)) + ); + + if (currentOpenSegment != null) { + if (currentOpenSegment.getShardSpec().getPartitionNum() != partitionNumber) { + pushSegments(ImmutableList.of(currentOpenSegment)); + currentOpenSegment = new SegmentIdentifier(dataSchema.getDataSource(), interval, + tuningConfig.getVersioningPolicy().getVersion(interval), + new LinearShardSpec(Math.toIntExact(partitionNumber)) + ); + } + } else if (currentOpenSegment == null) { + currentOpenSegment = new SegmentIdentifier(dataSchema.getDataSource(), interval, + tuningConfig.getVersioningPolicy().getVersion(interval), + new LinearShardSpec(Math.toIntExact(partitionNumber)) + ); + + } + appenderator.add(currentOpenSegment, inputRow, committerSupplier); + + } else if (partitionNumber == -1 && maxPartitionSize != -1) { + appenderator + .add(getSegmentIdentifierAndMaybePush(truncatedTime), inputRow, committerSupplier); + } else { + throw new IllegalArgumentException(String.format( + "partitionNumber and maxPartitionSize should be mutually exclusive got partitionNum [%s] and maxPartitionSize [%s]", + partitionNumber, maxPartitionSize + )); + } + } catch (SegmentNotWritableException e) { throw new IOException(e); } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 3899bff..2e01900 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -514,10 +514,31 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD } value.put(columns[i], res); } + //Extract the partitions keys segments granularity and partition key if any + // First Segment Granularity has to be here. + final int granularityFieldIndex = columns.length; + assert values.size() > granularityFieldIndex; + Preconditions.checkArgument(fields.get(granularityFieldIndex).getFieldName() + .equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)); value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, - ((TimestampObjectInspector) fields.get(columns.length).getFieldObjectInspector()) - .getPrimitiveJavaObject(values.get(columns.length)).getTime() + ((TimestampObjectInspector) fields.get(granularityFieldIndex).getFieldObjectInspector()) + .getPrimitiveJavaObject(values.get(granularityFieldIndex)).getTime() ); + if (values.size() == columns.length + 2) { + // Then partition number if any. + final int partitionNumPos = granularityFieldIndex + 1; + Preconditions.checkArgument( + fields.get(partitionNumPos).getFieldName().equals(Constants.DRUID_SHARD_KEY_COL_NAME), + String.format("expecting to encounter %s but was %s", Constants.DRUID_SHARD_KEY_COL_NAME, + fields.get(partitionNumPos).getFieldName() + ) + ); + value.put(Constants.DRUID_SHARD_KEY_COL_NAME, + ((LongObjectInspector) fields.get(partitionNumPos).getFieldObjectInspector()) + .get(values.get(partitionNumPos)) + ); + } + return new DruidWritable(value); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java index e3dee93..1b299cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.optimizer; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; @@ -46,6 +47,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; @@ -61,9 +63,13 @@ import org.apache.hadoop.hive.ql.udf.UDFDateFloorSecond; import org.apache.hadoop.hive.ql.udf.UDFDateFloorWeek; import org.apache.hadoop.hive.ql.udf.UDFDateFloorYear; +import org.apache.hadoop.hive.ql.udf.UDFRand; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFEpochMilli; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTimestamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPDivide; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPMod; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; @@ -76,8 +82,10 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Stack; +import java.util.stream.Collectors; /** * Introduces a RS before FS to partition data by configuration specified @@ -113,6 +121,9 @@ private NodeProcessor getSortDynPartProc(ParseContext pCtx) { private final Logger LOG = LoggerFactory.getLogger(SortedDynPartitionTimeGranularityOptimizer.class); protected ParseContext parseCtx; + private int targetShardsPerGranularity = 0; + private int granularityKeyPos = -1; + private int partitionKeyPos = -1; public SortedDynamicPartitionProc(ParseContext pCtx) { this.parseCtx = pCtx; @@ -130,66 +141,92 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Bail out, nothing to do return null; } - String segmentGranularity = null; + String segmentGranularity; + final String targetShardsProperty; final Table table = fsOp.getConf().getTable(); if (table != null) { // case the statement is an INSERT segmentGranularity = table.getParameters().get(Constants.DRUID_SEGMENT_GRANULARITY); + targetShardsProperty = + table.getParameters().getOrDefault(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, "0"); + } else if (parseCtx.getCreateViewDesc() != null) { // case the statement is a CREATE MATERIALIZED VIEW AS segmentGranularity = parseCtx.getCreateViewDesc().getTblProps() .get(Constants.DRUID_SEGMENT_GRANULARITY); + targetShardsProperty = parseCtx.getCreateViewDesc().getTblProps() + .getOrDefault(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, "0"); } else if (parseCtx.getCreateTable() != null) { // case the statement is a CREATE TABLE AS segmentGranularity = parseCtx.getCreateTable().getTblProps() .get(Constants.DRUID_SEGMENT_GRANULARITY); + targetShardsProperty = parseCtx.getCreateTable().getTblProps() + .getOrDefault(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, "0"); } else { throw new SemanticException("Druid storage handler used but not an INSERT, " + "CMVAS or CTAS statement"); } - segmentGranularity = !Strings.isNullOrEmpty(segmentGranularity) - ? segmentGranularity - : HiveConf.getVar(parseCtx.getConf(), - HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY - ); + segmentGranularity = Strings.isNullOrEmpty(segmentGranularity) ? HiveConf + .getVar(parseCtx.getConf(), + HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY + ) : segmentGranularity; + targetShardsPerGranularity = Integer.parseInt(targetShardsProperty); + LOG.info("Sorted dynamic partitioning on time granularity optimization kicked in..."); // unlink connection between FS and its parent - Operator fsParent = fsOp.getParentOperators().get(0); - fsParent = fsOp.getParentOperators().get(0); + final Operator fsParent = fsOp.getParentOperators().get(0); fsParent.getChildOperators().clear(); + if (targetShardsPerGranularity > 0) { + partitionKeyPos = fsParent.getSchema().getSignature().size() + 1; + } + granularityKeyPos = fsParent.getSchema().getSignature().size(); // Create SelectOp with granularity column - Operator granularitySelOp = getGranularitySelOp(fsParent, segmentGranularity); + final Operator granularitySelOp = getGranularitySelOp(fsParent, + segmentGranularity + ); // Create ReduceSinkOp operator - ArrayList parentCols = Lists.newArrayList(granularitySelOp.getSchema().getSignature()); - ArrayList allRSCols = Lists.newArrayList(); + final ArrayList parentCols = + Lists.newArrayList(granularitySelOp.getSchema().getSignature()); + final ArrayList allRSCols = Lists.newArrayList(); for (ColumnInfo ci : parentCols) { allRSCols.add(new ExprNodeColumnDesc(ci)); } // Get the key positions - List keyPositions = new ArrayList<>(); - keyPositions.add(allRSCols.size() - 1); - List sortOrder = new ArrayList(1); - sortOrder.add(1); // asc - List sortNullOrder = new ArrayList(1); - sortNullOrder.add(0); // nulls first + final List keyPositions; + final List sortOrder; + final List sortNullOrder; + //Order matters, assuming later that __time_granularity comes first then __druidPartitionKey + if (targetShardsPerGranularity > 0) { + keyPositions = ImmutableList.of(granularityKeyPos, partitionKeyPos); + sortOrder = ImmutableList.of(1, 1); // asc + sortNullOrder = ImmutableList.of(0, 0); // nulls first + } else { + keyPositions = ImmutableList.of(granularityKeyPos); + sortOrder = ImmutableList.of(1); // asc + sortNullOrder = ImmutableList.of(0); // nulls first + } ReduceSinkOperator rsOp = getReduceSinkOp(keyPositions, sortOrder, sortNullOrder, allRSCols, granularitySelOp); // Create backtrack SelectOp - List descs = new ArrayList(allRSCols.size()); - List colNames = new ArrayList(); - String colName; + final List descs = new ArrayList<>(allRSCols.size()); + final List colNames = new ArrayList<>(); for (int i = 0; i < allRSCols.size(); i++) { ExprNodeDesc col = allRSCols.get(i); - colName = col.getExprString(); + final String colName = col.getExprString(); colNames.add(colName); if (keyPositions.contains(i)) { - descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), ReduceField.KEY.toString()+"."+colName, null, false)); + descs.add( + new ExprNodeColumnDesc(col.getTypeInfo(), ReduceField.KEY.toString() + "." + colName, + null, false + )); } else { - descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), ReduceField.VALUE.toString()+"."+colName, null, false)); + descs.add(new ExprNodeColumnDesc(col.getTypeInfo(), + ReduceField.VALUE.toString() + "." + colName, null, false + )); } } RowSchema selRS = new RowSchema(granularitySelOp.getSchema()); @@ -205,24 +242,30 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Update file sink descriptor fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED); fsOp.getConf().setPartitionCols(rsOp.getConf().getPartitionCols()); - ColumnInfo ci = new ColumnInfo(granularitySelOp.getSchema().getSignature().get( - granularitySelOp.getSchema().getSignature().size() - 1)); // granularity column - fsOp.getSchema().getSignature().add(ci); + final ColumnInfo granularityColumnInfo = + new ColumnInfo(granularitySelOp.getSchema().getSignature().get(granularityKeyPos)); + fsOp.getSchema().getSignature().add(granularityColumnInfo); + if (targetShardsPerGranularity > 0) { + final ColumnInfo partitionKeyColumnInfo = + new ColumnInfo(granularitySelOp.getSchema().getSignature().get(partitionKeyPos)); + fsOp.getSchema().getSignature().add(partitionKeyColumnInfo); + } LOG.info("Inserted " + granularitySelOp.getOperatorId() + ", " + rsOp.getOperatorId() + " and " + backtrackSelOp.getOperatorId() + " as parent of " + fsOp.getOperatorId() + " and child of " + fsParent.getOperatorId()); - parseCtx.setReduceSinkAddedBySortedDynPartition(true); return null; } private Operator getGranularitySelOp( - Operator fsParent, String segmentGranularity + Operator fsParent, + String segmentGranularity ) throws SemanticException { - ArrayList parentCols = Lists.newArrayList(fsParent.getSchema().getSignature()); - ArrayList descs = Lists.newArrayList(); - List colNames = Lists.newArrayList(); + final ArrayList parentCols = + Lists.newArrayList(fsParent.getSchema().getSignature()); + final ArrayList descs = Lists.newArrayList(); + final List colNames = Lists.newArrayList(); int timestampPos = -1; for (int i = 0; i < parentCols.size(); i++) { ColumnInfo ci = parentCols.get(i); @@ -242,10 +285,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, throw new SemanticException("No column with timestamp with local time-zone type on query result; " + "one column should be of timestamp with local time-zone type"); } - RowSchema selRS = new RowSchema(fsParent.getSchema()); + final RowSchema selRS = new RowSchema(fsParent.getSchema()); // Granularity (partition) column - String udfName; - + final String udfName; Class udfClass; switch (segmentGranularity) { case "YEAR": @@ -277,9 +319,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, udfClass = UDFDateFloorSecond.class; break; default: - throw new SemanticException("Granularity for Druid segment not recognized"); + throw new SemanticException(String.format(Locale.ENGLISH, + "Unknown Druid Granularity [%s], Accepted values are [YEAR, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND]", + segmentGranularity + )); } + // Timestamp column type in Druid is timestamp with local time-zone, as it represents // a specific instant in time. Thus, we have this value and we need to extract the // granularity to split the data when we are storing it in Druid. However, Druid stores @@ -308,15 +354,39 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, descs.add(f3); colNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME); // Add granularity to the row schema - ColumnInfo ci = new ColumnInfo(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, TypeInfoFactory.timestampTypeInfo, + final ColumnInfo ci = new ColumnInfo(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, TypeInfoFactory.timestampTypeInfo, selRS.getSignature().get(0).getTabAlias(), false, false); selRS.getSignature().add(ci); - + if (targetShardsPerGranularity > 0 ) { + // add another partitioning key based on floor(1/rand) % targetShardsPerGranularity + final ColumnInfo partitionKeyCi = + new ColumnInfo(Constants.DRUID_SHARD_KEY_COL_NAME, TypeInfoFactory.longTypeInfo, + selRS.getSignature().get(0).getTabAlias(), false, false + ); + final ExprNodeDesc targetNumShardDescNode = + new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, targetShardsPerGranularity); + final ExprNodeGenericFuncDesc randomFn = ExprNodeGenericFuncDesc + .newInstance(new GenericUDFBridge("rand", false, UDFRand.class.getName()), + ImmutableList.of() + ); + + final ExprNodeGenericFuncDesc random = ExprNodeGenericFuncDesc.newInstance( + new GenericUDFFloor(), ImmutableList.of(ExprNodeGenericFuncDesc + .newInstance(new GenericUDFOPDivide(), + ImmutableList.of(new ExprNodeConstantDesc(TypeInfoFactory.doubleTypeInfo, 1.0), randomFn) + ))); + final ExprNodeGenericFuncDesc randModMax = ExprNodeGenericFuncDesc + .newInstance(new GenericUDFOPMod(), + ImmutableList.of(random, targetNumShardDescNode) + ); + descs.add(randModMax); + colNames.add(Constants.DRUID_SHARD_KEY_COL_NAME); + selRS.getSignature().add(partitionKeyCi); + } // Create SelectDesc - SelectDesc selConf = new SelectDesc(descs, colNames); - + final SelectDesc selConf = new SelectDesc(descs, colNames); // Create Select Operator - SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild( + final SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild( selConf, selRS, fsParent); return selOp; @@ -324,72 +394,77 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private ReduceSinkOperator getReduceSinkOp(List keyPositions, List sortOrder, List sortNullOrder, ArrayList allCols, Operator parent - ) throws SemanticException { - - ArrayList keyCols = Lists.newArrayList(); + ) { // we will clone here as RS will update bucket column key with its // corresponding with bucket number and hence their OIs - for (Integer idx : keyPositions) { - keyCols.add(allCols.get(idx).clone()); - } - + final ArrayList keyCols = keyPositions.stream() + .map(id -> allCols.get(id).clone()) + .collect(Collectors.toCollection(ArrayList::new)); ArrayList valCols = Lists.newArrayList(); for (int i = 0; i < allCols.size(); i++) { - if (!keyPositions.contains(i)) { + if (i != granularityKeyPos && i != partitionKeyPos) { valCols.add(allCols.get(i).clone()); } } - ArrayList partCols = Lists.newArrayList(); - for (Integer idx : keyPositions) { - partCols.add(allCols.get(idx).clone()); - } + final ArrayList partCols = + keyPositions.stream().map(id -> allCols.get(id).clone()) + .collect(Collectors.toCollection(ArrayList::new)); // map _col0 to KEY._col0, etc Map colExprMap = Maps.newHashMap(); Map nameMapping = new HashMap<>(); - ArrayList keyColNames = Lists.newArrayList(); - for (ExprNodeDesc keyCol : keyCols) { - String keyColName = keyCol.getExprString(); - keyColNames.add(keyColName); - colExprMap.put(Utilities.ReduceField.KEY + "." +keyColName, keyCol); - nameMapping.put(keyColName, Utilities.ReduceField.KEY + "." + keyColName); - } - ArrayList valColNames = Lists.newArrayList(); - for (ExprNodeDesc valCol : valCols) { - String colName = valCol.getExprString(); - valColNames.add(colName); - colExprMap.put(Utilities.ReduceField.VALUE + "." + colName, valCol); - nameMapping.put(colName, Utilities.ReduceField.VALUE + "." + colName); - } + final ArrayList keyColNames = Lists.newArrayList(); + final ArrayList valColNames = Lists.newArrayList(); + keyCols.stream().forEach(exprNodeDesc -> { + keyColNames.add(exprNodeDesc.getExprString()); + colExprMap + .put(Utilities.ReduceField.KEY + "." + exprNodeDesc.getExprString(), exprNodeDesc); + nameMapping.put(exprNodeDesc.getExprString(), + Utilities.ReduceField.KEY + "." + exprNodeDesc.getName() + ); + }); + valCols.stream().forEach(exprNodeDesc -> { + valColNames.add(exprNodeDesc.getExprString()); + colExprMap + .put(Utilities.ReduceField.VALUE + "." + exprNodeDesc.getExprString(), exprNodeDesc); + nameMapping.put(exprNodeDesc.getExprString(), + Utilities.ReduceField.VALUE + "." + exprNodeDesc.getName() + ); + }); + // order and null order - String orderStr = StringUtils.repeat("+", sortOrder.size()); - String nullOrderStr = StringUtils.repeat("a", sortNullOrder.size()); + final String orderStr = StringUtils.repeat("+", sortOrder.size()); + final String nullOrderStr = StringUtils.repeat("a", sortNullOrder.size()); // Create Key/Value TableDesc. When the operator plan is split into MR tasks, // the reduce operator will initialize Extract operator with information // from Key and Value TableDesc - List fields = PlanUtils.getFieldSchemasFromColumnList(keyCols, + final List fields = PlanUtils.getFieldSchemasFromColumnList(keyCols, keyColNames, 0, ""); - TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, orderStr, nullOrderStr); + final TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, orderStr, nullOrderStr); List valFields = PlanUtils.getFieldSchemasFromColumnList(valCols, valColNames, 0, ""); - TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields); - List> distinctColumnIndices = Lists.newArrayList(); + final TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields); + List> distinctColumnIndices = ImmutableList.of(); // Number of reducers is set to default (-1) - ReduceSinkDesc rsConf = new ReduceSinkDesc(keyCols, keyCols.size(), valCols, + final ReduceSinkDesc rsConf = new ReduceSinkDesc(keyCols, keyCols.size(), valCols, keyColNames, distinctColumnIndices, valColNames, -1, partCols, -1, keyTable, valueTable); - ArrayList signature = new ArrayList<>(); - for (int index = 0; index < parent.getSchema().getSignature().size(); index++) { - ColumnInfo colInfo = new ColumnInfo(parent.getSchema().getSignature().get(index)); - colInfo.setInternalName(nameMapping.get(colInfo.getInternalName())); - signature.add(colInfo); - } - ReduceSinkOperator op = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( + final ArrayList signature = + parent.getSchema().getSignature() + .stream() + .map(e -> new ColumnInfo(e)) + .map(columnInfo -> + { + columnInfo.setInternalName(nameMapping.get(columnInfo.getInternalName())); + return columnInfo; + }) + .collect(Collectors.toCollection(ArrayList::new)); + final ReduceSinkOperator op = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( rsConf, new RowSchema(signature), parent); op.setColumnExprMap(colExprMap); return op;