diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 60be4ef..da6d493 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -281,7 +281,6 @@ public void publishSegments(Table table, boolean overwrite) throws MetaException if (MetaStoreUtils.isExternalTable(table)) { return; } - Lifecycle lifecycle = new Lifecycle(); LOG.info("Committing table {} to the druid metastore", table.getDbName()); final Path tableDir = getSegmentDescriptorDir(); try { @@ -310,19 +309,9 @@ public void publishSegments(Table table, boolean overwrite) throws MetaException String coordinatorResponse = null; try { - coordinatorResponse = RetryUtils.retry(new Callable() { - @Override - public String call() throws Exception { - return DruidStorageHandlerUtils.getURL(getHttpClient(), - new URL(String.format("http://%s/status", coordinatorAddress)) - ); - } - }, new Predicate() { - @Override - public boolean apply(@Nullable Throwable input) { - return input instanceof IOException; - } - }, maxTries); + coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(), + new URL(String.format("http://%s/status", coordinatorAddress)) + ), input -> input instanceof IOException, maxTries); } catch (Exception e) { console.printInfo( "Will skip waiting for data loading"); @@ -338,28 +327,25 @@ public boolean apply(@Nullable Throwable input) { long passiveWaitTimeMs = HiveConf .getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME); ImmutableSet setOfUrls = FluentIterable.from(segmentList) - .transform(new Function() { - @Override - public URL apply(DataSegment dataSegment) { - try { - //Need to make sure that we are using UTC since most of the druid cluster use UTC by default - return new URL(String - .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s", - coordinatorAddress, dataSourceName, DataSegment - .makeDataSegmentIdentifier(dataSegment.getDataSource(), - new DateTime(dataSegment.getInterval() - .getStartMillis(), DateTimeZone.UTC), - new DateTime(dataSegment.getInterval() - .getEndMillis(), DateTimeZone.UTC), - dataSegment.getVersion(), - dataSegment.getShardSpec() - ) - )); - } catch (MalformedURLException e) { - Throwables.propagate(e); - } - return null; + .transform(dataSegment -> { + try { + //Need to make sure that we are using UTC since most of the druid cluster use UTC by default + return new URL(String + .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s", + coordinatorAddress, dataSourceName, DataSegment + .makeDataSegmentIdentifier(dataSegment.getDataSource(), + new DateTime(dataSegment.getInterval() + .getStartMillis(), DateTimeZone.UTC), + new DateTime(dataSegment.getInterval() + .getEndMillis(), DateTimeZone.UTC), + dataSegment.getVersion(), + dataSegment.getShardSpec() + ) + )); + } catch (MalformedURLException e) { + Throwables.propagate(e); } + return null; }).toSet(); int numRetries = 0; @@ -399,7 +385,6 @@ public boolean apply(URL input) { Throwables.propagate(e); } finally { cleanWorkingDir(); - lifecycle.stop(); } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 5dd65b3..3eeb0c3 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -19,6 +19,7 @@ import io.druid.common.utils.JodaUtils; import io.druid.jackson.DefaultObjectMapper; +import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; @@ -28,7 +29,6 @@ import io.druid.segment.IndexMergerV9; import io.druid.segment.column.ColumnConfig; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; @@ -77,6 +77,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.format.ISODateTimeFormat; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -135,10 +136,9 @@ static { // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig - InjectableValues.Std injectableValues = new InjectableValues.Std().addValue( - SelectQueryConfig.class, - new SelectQueryConfig(false) - ); + InjectableValues.Std injectableValues = new InjectableValues.Std() + .addValue(SelectQueryConfig.class, new SelectQueryConfig(false)) + .addValue(ExprMacroTable.class, ExprMacroTable.nil()); JSON_MAPPER.setInjectableValues(injectableValues); SMILE_MAPPER.setInjectableValues(injectableValues); HiveDruidSerializationModule hiveDruidSerializationModule = new HiveDruidSerializationModule(); @@ -720,9 +720,17 @@ public static DataSegment publishSegmentWithShardSpec(DataSegment segment, Shard } public static Path finalPathForSegment(String segmentDirectory, DataSegment segment) { - return new Path( - String.format("%s/%s/index.zip", segmentDirectory, - DataSegmentPusherUtil.getHdfsStorageDir(segment))); + String path = DataSegmentPusher.JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), + segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) + ), + segment.getVersion().replaceAll(":", "_") + ); + + return new Path(String.format("%s/%s/index.zip", segmentDirectory, path)); } private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) { diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 5e1deac..9d2ec82 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -212,7 +212,8 @@ 0, 0, true, - null + null, + 0L ); LOG.debug(String.format("running with Data schema [%s] ", dataSchema)); diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 3ba3196..25f96b3 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -21,7 +21,10 @@ import io.druid.indexer.JobHelper; import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; import io.druid.metadata.MetadataStorageTablesConfig; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.storage.hdfs.HdfsDataSegmentPusher; +import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NoneShardSpec; @@ -202,8 +205,15 @@ public void testDeleteSegment() throws IOException, SegmentLoadingException { LocalFileSystem localFileSystem = FileSystem.getLocal(config); Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString()); + HdfsDataSegmentPusherConfig hdfsDSPConfig = new HdfsDataSegmentPusherConfig(); + hdfsDSPConfig.setStorageDirectory(segmentRootPath); + HdfsDataSegmentPusher hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDSPConfig, config, + DruidStorageHandlerUtils.JSON_MAPPER + ); Path segmentOutputPath = JobHelper - .makeFileNamePath(new Path(segmentRootPath), localFileSystem, dataSegment, JobHelper.INDEX_ZIP); + .makeFileNamePath(new Path(segmentRootPath), localFileSystem, dataSegment, + JobHelper.INDEX_ZIP, hdfsDataSegmentPusher + ); Path indexPath = new Path(segmentOutputPath, "index.zip"); DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec( ImmutableMap.of("path", indexPath)).build(); diff --git druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index d5b217a..4962e0b 100644 --- druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -142,7 +142,8 @@ public void testWrite() throws IOException, SegmentLoadingException { IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, null, - temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null + temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null, + 0L ); LocalFileSystem localFileSystem = FileSystem.getLocal(config); DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher( @@ -192,8 +193,7 @@ public DruidWritable apply(@Nullable ImmutableMap input ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())), ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), - null, - Granularities.NONE + null ); List rows = Lists.newArrayList(); diff --git pom.xml pom.xml index 40699bc..6b59505 100644 --- pom.xml +++ pom.xml @@ -138,7 +138,7 @@ 10.10.2.0 3.1.0 0.1.2 - 0.10.0 + 0.10.1 14.0.1 2.4.11 1.3.166