diff --git common/src/java/org/apache/hadoop/hive/conf/Constants.java common/src/java/org/apache/hadoop/hive/conf/Constants.java index ea7864a..7695e02 100644 --- common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -30,6 +30,7 @@ "org.apache.hadoop.hive.druid.io.DruidOutputFormat"; public static final String DRUID_DATA_SOURCE = "druid.datasource"; public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity"; + public static final String DRUID_QUERY_GRANULARITY = "druid.query.granularity"; public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity"; public static final String DRUID_QUERY_JSON = "druid.query.json"; public static final String DRUID_QUERY_TYPE = "druid.query.type"; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 6c08ca5..51eef40 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -28,6 +28,7 @@ import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -102,7 +103,10 @@ final GranularitySpec granularitySpec = new UniformGranularitySpec( Granularity.valueOf(segmentGranularity), - null, + QueryGranularity.fromString( + tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == null + ? "NONE" + : tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)), null ); @@ -190,10 +194,23 @@ .getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_PARTITION_SIZE); String basePersistDirectory = HiveConf .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY); - final RealtimeTuningConfig realtimeTuningConfig = RealtimeTuningConfig - .makeDefaultTuningConfig(new File( - basePersistDirectory)) - .withVersioningPolicy(new CustomVersioningPolicy(version)); + Integer maxRowInMemory = HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_ROW_IN_MEMORY); + + RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory, + null, + null, new File( + basePersistDirectory), + new CustomVersioningPolicy(version), + null, + null, + null, + null, + true, + 0, + 0, + true, + null + ); LOG.debug(String.format("running with Data schema [%s] ", dataSchema)); return new DruidRecordWriter(dataSchema, realtimeTuningConfig, hdfsDataSegmentPusher,