diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java new file mode 100644 index 0000000..f9304a5 --- /dev/null +++ druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.metadata.MetadataStorageTablesConfig; +import io.druid.metadata.storage.derby.DerbyConnector; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; + +import java.sql.SQLException; +import java.util.UUID; + +public class DerbyConnectorTestUtility extends DerbyConnector { + private final String jdbcUri; + + public DerbyConnectorTestUtility( + Supplier config, + Supplier dbTables + ) { + this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID()); + } + + protected DerbyConnectorTestUtility( + Supplier config, + Supplier dbTables, + String jdbcUri + ) { + super(config, dbTables, new DBI(jdbcUri + ";create=true")); + this.jdbcUri = jdbcUri; + } + + public void tearDown() { + try { + new DBI(jdbcUri + ";drop=true").open().close(); + } catch (UnableToObtainConnectionException e) { + SQLException cause = (SQLException) e.getCause(); + // error code "08006" indicates proper shutdown + Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006", + cause.getSQLState() + ); + } + } + + public static String dbSafeUUID() { + return UUID.randomUUID().toString().replace("-", ""); + } + + public String getJdbcUri() { + return jdbcUri; + } + + public static class DerbyConnectorRule extends ExternalResource { + private DerbyConnectorTestUtility connector; + + private final Supplier dbTables; + + private final MetadataStorageConnectorConfig connectorConfig; + + public DerbyConnectorRule() { + this("druidTest" + dbSafeUUID()); + } + + private DerbyConnectorRule( + final String defaultBase + ) { + this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase))); + } + + public DerbyConnectorRule( + Supplier dbTables + ) { + this.dbTables = dbTables; + this.connectorConfig = new MetadataStorageConnectorConfig() { + @Override + public String getConnectURI() { + return connector.getJdbcUri(); + } + }; + } + + @Override + protected void before() throws Throwable { + connector = new DerbyConnectorTestUtility(Suppliers.ofInstance(connectorConfig), dbTables); + connector.getDBI().open().close(); // create db + } + + @Override + protected void after() { + connector.tearDown(); + } + + public DerbyConnectorTestUtility getConnector() { + return connector; + } + + public MetadataStorageConnectorConfig getMetadataConnectorConfig() { + return connectorConfig; + } + + public Supplier metadataTablesConfigSupplier() { + return dbTables; + } + } + +} diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java deleted file mode 100644 index 3573bf9..0000000 --- druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.druid; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import io.druid.indexer.JobHelper; -import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; -import io.druid.segment.loading.SegmentLoadingException; -import io.druid.timeline.DataSegment; -import io.druid.timeline.partition.NoneShardSpec; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.Constants; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.mockito.Mockito; -import org.skife.jdbi.v2.Handle; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Map; -import java.util.UUID; - -public class DruidStorageHandlerTest { - - @Rule - public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); - - @Rule - public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - private static final String DATA_SOURCE_NAME = "testName"; - - private String segmentsTable; - - private String tableWorkingPath; - - private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1") - .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build(); - - @Before - public void before() throws Throwable { - tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath(); - segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); - Map mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME); - Mockito.when(tableMock.getParameters()).thenReturn(mockMap); - Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0); - StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class); - Mockito.when(storageDes.getBucketColsSize()).thenReturn(0); - Mockito.when(tableMock.getSd()).thenReturn(storageDes); - Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME); - } - - Table tableMock = Mockito.mock(Table.class); - - @Test - public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null - ); - - try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) { - Assert.assertFalse(derbyConnectorRule.getConnector() - .tableExists(handle, - segmentsTable - )); - druidStorageHandler.preCreateTable(tableMock); - Assert.assertTrue(derbyConnectorRule.getConnector() - .tableExists(handle, - segmentsTable - )); - } - - } - - @Test(expected = MetaException.class) - public void testPreCreateTableWhenDataSourceExists() throws MetaException { - derbyConnectorRule.getConnector().createSegmentTable(); - SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler( - derbyConnectorRule.getConnector()); - sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment), - DruidStorageHandlerUtils.JSON_MAPPER - ); - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null - ); - druidStorageHandler.preCreateTable(tableMock); - } - - @Test - public void testCommitCreateTablePlusCommitDropTableWithoutPurge() - throws MetaException, IOException { - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null - ); - druidStorageHandler.preCreateTable(tableMock); - Configuration config = new Configuration(); - config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); - config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); - druidStorageHandler.setConf(config); - LocalFileSystem localFileSystem = FileSystem.getLocal(config); - Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); - Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, - new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) - ); - DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); - druidStorageHandler.commitCreateTable(tableMock); - Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( - DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get() - )).toArray()); - druidStorageHandler.commitDropTable(tableMock, false); - Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList( - DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get() - )).toArray()); - - } - - @Test - public void testCommitInsertTable() throws MetaException, IOException { - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null - ); - druidStorageHandler.preCreateTable(tableMock); - Configuration config = new Configuration(); - config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); - config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); - druidStorageHandler.setConf(config); - LocalFileSystem localFileSystem = FileSystem.getLocal(config); - Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); - Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, - new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) - ); - DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); - druidStorageHandler.commitCreateTable(tableMock); - Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( - DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get() - )).toArray()); - } - - @Test - public void testDeleteSegment() throws IOException, SegmentLoadingException { - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null - ); - - String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath(); - Configuration config = new Configuration(); - druidStorageHandler.setConf(config); - LocalFileSystem localFileSystem = FileSystem.getLocal(config); - - Path segmentOutputPath = JobHelper - .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment); - Path indexPath = new Path(segmentOutputPath, "index.zip"); - DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec( - ImmutableMap.of("path", indexPath)).build(); - OutputStream outputStream = localFileSystem.create(indexPath, true); - outputStream.close(); - Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath)); - Assert.assertTrue(localFileSystem.exists(segmentOutputPath)); - - druidStorageHandler.deleteSegment(dataSegmentWithLoadspect); - // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip - Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath)); - // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/ - Assert.assertFalse("PartitionNum directory still there ??", - localFileSystem.exists(segmentOutputPath) - ); - Assert.assertFalse("Version directory still there ??", - localFileSystem.exists(segmentOutputPath.getParent()) - ); - Assert.assertFalse("Interval directory still there ??", - localFileSystem.exists(segmentOutputPath.getParent().getParent()) - ); - Assert.assertFalse("Data source directory still there ??", - localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent()) - ); - } -} diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java deleted file mode 100644 index 1014ab6..0000000 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.druid; - -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import io.druid.metadata.MetadataStorageConnectorConfig; -import io.druid.metadata.MetadataStorageTablesConfig; -import io.druid.metadata.storage.derby.DerbyConnector; -import org.junit.Assert; -import org.junit.rules.ExternalResource; -import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; - -import java.sql.SQLException; -import java.util.UUID; - -public class TestDerbyConnector extends DerbyConnector { - private final String jdbcUri; - - public TestDerbyConnector( - Supplier config, - Supplier dbTables - ) { - this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID()); - } - - protected TestDerbyConnector( - Supplier config, - Supplier dbTables, - String jdbcUri - ) { - super(config, dbTables, new DBI(jdbcUri + ";create=true")); - this.jdbcUri = jdbcUri; - } - - public void tearDown() { - try { - new DBI(jdbcUri + ";drop=true").open().close(); - } catch (UnableToObtainConnectionException e) { - SQLException cause = (SQLException) e.getCause(); - // error code "08006" indicates proper shutdown - Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006", - cause.getSQLState() - ); - } - } - - public static String dbSafeUUID() { - return UUID.randomUUID().toString().replace("-", ""); - } - - public String getJdbcUri() { - return jdbcUri; - } - - public static class DerbyConnectorRule extends ExternalResource { - private TestDerbyConnector connector; - - private final Supplier dbTables; - - private final MetadataStorageConnectorConfig connectorConfig; - - public DerbyConnectorRule() { - this("druidTest" + dbSafeUUID()); - } - - private DerbyConnectorRule( - final String defaultBase - ) { - this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase))); - } - - public DerbyConnectorRule( - Supplier dbTables - ) { - this.dbTables = dbTables; - this.connectorConfig = new MetadataStorageConnectorConfig() { - @Override - public String getConnectURI() { - return connector.getJdbcUri(); - } - }; - } - - @Override - protected void before() throws Throwable { - connector = new TestDerbyConnector(Suppliers.ofInstance(connectorConfig), dbTables); - connector.getDBI().open().close(); // create db - } - - @Override - protected void after() { - connector.tearDown(); - } - - public TestDerbyConnector getConnector() { - return connector; - } - - public MetadataStorageConnectorConfig getMetadataConnectorConfig() { - return connectorConfig; - } - - public Supplier metadataTablesConfigSupplier() { - return dbTables; - } - } - -} diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java new file mode 100644 index 0000000..da6610a --- /dev/null +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.druid.indexer.JobHelper; +import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.skife.jdbi.v2.Handle; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; + +public class TestDruidStorageHandler { + + @Rule + public final DerbyConnectorTestUtility.DerbyConnectorRule derbyConnectorRule = new DerbyConnectorTestUtility.DerbyConnectorRule(); + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final String DATA_SOURCE_NAME = "testName"; + + private String segmentsTable; + + private String tableWorkingPath; + + private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1") + .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build(); + + @Before + public void before() throws Throwable { + tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath(); + segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); + Map mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME); + Mockito.when(tableMock.getParameters()).thenReturn(mockMap); + Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0); + StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class); + Mockito.when(storageDes.getBucketColsSize()).thenReturn(0); + Mockito.when(tableMock.getSd()).thenReturn(storageDes); + Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME); + } + + Table tableMock = Mockito.mock(Table.class); + + @Test + public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { + DruidStorageHandler druidStorageHandler = new DruidStorageHandler( + derbyConnectorRule.getConnector(), + new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null + ); + + try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) { + Assert.assertFalse(derbyConnectorRule.getConnector() + .tableExists(handle, + segmentsTable + )); + druidStorageHandler.preCreateTable(tableMock); + Assert.assertTrue(derbyConnectorRule.getConnector() + .tableExists(handle, + segmentsTable + )); + } + + } + + @Test(expected = MetaException.class) + public void testPreCreateTableWhenDataSourceExists() throws MetaException { + derbyConnectorRule.getConnector().createSegmentTable(); + SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler( + derbyConnectorRule.getConnector()); + sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment), + DruidStorageHandlerUtils.JSON_MAPPER + ); + DruidStorageHandler druidStorageHandler = new DruidStorageHandler( + derbyConnectorRule.getConnector(), + new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null + ); + druidStorageHandler.preCreateTable(tableMock); + } + + @Test + public void testCommitCreateTablePlusCommitDropTableWithoutPurge() + throws MetaException, IOException { + DruidStorageHandler druidStorageHandler = new DruidStorageHandler( + derbyConnectorRule.getConnector(), + new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null + ); + druidStorageHandler.preCreateTable(tableMock); + Configuration config = new Configuration(); + config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); + config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); + druidStorageHandler.setConf(config); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitCreateTable(tableMock); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + druidStorageHandler.commitDropTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + + } + + @Test + public void testCommitInsertTable() throws MetaException, IOException { + DruidStorageHandler druidStorageHandler = new DruidStorageHandler( + derbyConnectorRule.getConnector(), + new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null + ); + druidStorageHandler.preCreateTable(tableMock); + Configuration config = new Configuration(); + config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); + config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); + druidStorageHandler.setConf(config); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitCreateTable(tableMock); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + } + + @Test + public void testDeleteSegment() throws IOException, SegmentLoadingException { + DruidStorageHandler druidStorageHandler = new DruidStorageHandler( + derbyConnectorRule.getConnector(), + new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null + ); + + String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath(); + Configuration config = new Configuration(); + druidStorageHandler.setConf(config); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + + Path segmentOutputPath = JobHelper + .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment); + Path indexPath = new Path(segmentOutputPath, "index.zip"); + DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec( + ImmutableMap.of("path", indexPath)).build(); + OutputStream outputStream = localFileSystem.create(indexPath, true); + outputStream.close(); + Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath)); + Assert.assertTrue(localFileSystem.exists(segmentOutputPath)); + + druidStorageHandler.deleteSegment(dataSegmentWithLoadspect); + // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip + Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath)); + // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/ + Assert.assertFalse("PartitionNum directory still there ??", + localFileSystem.exists(segmentOutputPath) + ); + Assert.assertFalse("Version directory still there ??", + localFileSystem.exists(segmentOutputPath.getParent()) + ); + Assert.assertFalse("Interval directory still there ??", + localFileSystem.exists(segmentOutputPath.getParent().getParent()) + ); + Assert.assertFalse("Data source directory still there ??", + localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent()) + ); + } +} diff --git druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java deleted file mode 100644 index f72a735..0000000 --- druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.metamx.common.Granularity; -import io.druid.data.input.Firehose; -import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionSchema; -import io.druid.data.input.impl.DimensionsSpec; -import io.druid.data.input.impl.InputRowParser; -import io.druid.data.input.impl.MapInputRowParser; -import io.druid.data.input.impl.StringDimensionSchema; -import io.druid.data.input.impl.TimeAndDimsParseSpec; -import io.druid.data.input.impl.TimestampSpec; -import io.druid.granularity.QueryGranularities; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.LongSumAggregatorFactory; -import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; -import io.druid.segment.QueryableIndex; -import io.druid.segment.QueryableIndexStorageAdapter; -import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.RealtimeTuningConfig; -import io.druid.segment.indexing.granularity.UniformGranularitySpec; -import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.LocalDataSegmentPuller; -import io.druid.segment.loading.LocalDataSegmentPusher; -import io.druid.segment.loading.LocalDataSegmentPusherConfig; -import io.druid.segment.loading.SegmentLoadingException; -import io.druid.segment.realtime.firehose.IngestSegmentFirehose; -import io.druid.segment.realtime.firehose.WindowedStorageAdapter; -import io.druid.timeline.DataSegment; -import org.apache.calcite.adapter.druid.DruidTable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.Constants; -import org.apache.hadoop.hive.druid.DruidStorageHandler; -import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; -import org.apache.hadoop.hive.druid.io.DruidRecordWriter; -import org.apache.hadoop.hive.druid.serde.DruidWritable; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import javax.annotation.Nullable; -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -public class DruidRecordWriterTest { - private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER; - - private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D"); - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - private DruidRecordWriter druidRecordWriter; - - final List> expectedRows = ImmutableList.of( - ImmutableMap.of( - DruidTable.DEFAULT_TIMESTAMP_COLUMN, - DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(), - "host", ImmutableList.of("a.example.com"), - "visited_sum", 190L, - "unique_hosts", 1.0d - ), - ImmutableMap.of( - DruidTable.DEFAULT_TIMESTAMP_COLUMN, - DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(), - "host", ImmutableList.of("b.example.com"), - "visited_sum", 175L, - "unique_hosts", 1.0d - ), - ImmutableMap.of( - DruidTable.DEFAULT_TIMESTAMP_COLUMN, - DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(), - "host", ImmutableList.of("c.example.com"), - "visited_sum", 270L, - "unique_hosts", 1.0d - ) - ); - - // This test need this patch https://github.com/druid-io/druid/pull/3483 - @Ignore - @Test - public void testWrite() throws IOException, SegmentLoadingException { - - final String dataSourceName = "testDataSource"; - final File segmentOutputDir = temporaryFolder.newFolder(); - final File workingDir = temporaryFolder.newFolder(); - Configuration config = new Configuration(); - - final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec( - new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null), - new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("host")), - null, null - ) - )); - final Map parserMap = objectMapper.convertValue(inputRowParser, Map.class); - - DataSchema dataSchema = new DataSchema( - dataSourceName, - parserMap, - new AggregatorFactory[] { - new LongSumAggregatorFactory("visited_sum", "visited_sum"), - new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts") - }, - new UniformGranularitySpec( - Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL) - ), - objectMapper - ); - - RealtimeTuningConfig tuningConfig = RealtimeTuningConfig - .makeDefaultTuningConfig(temporaryFolder.newFolder()); - LocalFileSystem localFileSystem = FileSystem.getLocal(config); - DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher( - new LocalDataSegmentPusherConfig() { - @Override - public File getStorageDirectory() {return segmentOutputDir;} - }, objectMapper); - - Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(), - DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME - ); - druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, - segmentDescriptroPath, localFileSystem - ); - - List druidWritables = Lists.transform(expectedRows, - new Function, DruidWritable>() { - @Nullable - @Override - public DruidWritable apply(@Nullable ImmutableMap input - ) { - return new DruidWritable(ImmutableMap.builder().putAll(input) - .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, - Granularity.DAY.truncate( - new DateTime((long) input - .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN))) - .getMillis() - ).build()); - } - } - ); - for (DruidWritable druidWritable : druidWritables) { - druidRecordWriter.write(druidWritable); - } - druidRecordWriter.close(false); - List dataSegmentList = DruidStorageHandlerUtils - .getPublishedSegments(segmentDescriptroPath, config); - Assert.assertEquals(1, dataSegmentList.size()); - File tmpUnzippedSegmentDir = temporaryFolder.newFolder(); - new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir); - final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO - .loadIndex(tmpUnzippedSegmentDir); - - QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex); - - Firehose firehose = new IngestSegmentFirehose( - ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())), - ImmutableList.of("host"), - ImmutableList.of("visited_sum", "unique_hosts"), - null, - QueryGranularities.NONE - ); - - List rows = Lists.newArrayList(); - while (firehose.hasMore()) { - rows.add(firehose.nextRow()); - } - - verifyRows(expectedRows, rows); - - } - - private void verifyRows(List> expectedRows, - List actualRows - ) { - System.out.println("actualRows = " + actualRows); - Assert.assertEquals(expectedRows.size(), actualRows.size()); - - for (int i = 0; i < expectedRows.size(); i++) { - Map expected = expectedRows.get(i); - InputRow actual = actualRows.get(i); - - Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions()); - - Assert.assertEquals(expected.get(DruidTable.DEFAULT_TIMESTAMP_COLUMN), - actual.getTimestamp().getMillis() - ); - Assert.assertEquals(expected.get("host"), actual.getDimension("host")); - Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); - Assert.assertEquals( - (Double) expected.get("unique_hosts"), - (Double) HyperUniquesAggregatorFactory - .estimateCardinality(actual.getRaw("unique_hosts")), - 0.001 - ); - } - } - - @Test - public void testSerDesr() throws IOException { - String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}"; - DataSegment dataSegment = objectMapper.reader(DataSegment.class) - .readValue(segment); - Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015")); - } - -} diff --git druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java new file mode 100644 index 0000000..9ec82c0 --- /dev/null +++ druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.Granularity; +import io.druid.data.input.Firehose; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.StringDimensionSchema; +import io.druid.data.input.impl.TimeAndDimsParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentPuller; +import io.druid.segment.loading.LocalDataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentPusherConfig; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; +import io.druid.segment.realtime.firehose.WindowedStorageAdapter; +import io.druid.timeline.DataSegment; +import org.apache.calcite.adapter.druid.DruidTable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.druid.DruidStorageHandler; +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.io.DruidRecordWriter; +import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class TestDruidRecordWriter { + private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER; + + private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D"); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private DruidRecordWriter druidRecordWriter; + + final List> expectedRows = ImmutableList.of( + ImmutableMap.of( + DruidTable.DEFAULT_TIMESTAMP_COLUMN, + DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(), + "host", ImmutableList.of("a.example.com"), + "visited_sum", 190L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + DruidTable.DEFAULT_TIMESTAMP_COLUMN, + DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(), + "host", ImmutableList.of("b.example.com"), + "visited_sum", 175L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + DruidTable.DEFAULT_TIMESTAMP_COLUMN, + DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(), + "host", ImmutableList.of("c.example.com"), + "visited_sum", 270L, + "unique_hosts", 1.0d + ) + ); + + // This test need this patch https://github.com/druid-io/druid/pull/3483 + @Ignore + @Test + public void testWrite() throws IOException, SegmentLoadingException { + + final String dataSourceName = "testDataSource"; + final File segmentOutputDir = temporaryFolder.newFolder(); + final File workingDir = temporaryFolder.newFolder(); + Configuration config = new Configuration(); + + final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec( + new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null), + new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("host")), + null, null + ) + )); + final Map parserMap = objectMapper.convertValue(inputRowParser, Map.class); + + DataSchema dataSchema = new DataSchema( + dataSourceName, + parserMap, + new AggregatorFactory[] { + new LongSumAggregatorFactory("visited_sum", "visited_sum"), + new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL) + ), + objectMapper + ); + + RealtimeTuningConfig tuningConfig = RealtimeTuningConfig + .makeDefaultTuningConfig(temporaryFolder.newFolder()); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher( + new LocalDataSegmentPusherConfig() { + @Override + public File getStorageDirectory() {return segmentOutputDir;} + }, objectMapper); + + Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(), + DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME + ); + druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, + segmentDescriptroPath, localFileSystem + ); + + List druidWritables = Lists.transform(expectedRows, + new Function, DruidWritable>() { + @Nullable + @Override + public DruidWritable apply(@Nullable ImmutableMap input + ) { + return new DruidWritable(ImmutableMap.builder().putAll(input) + .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, + Granularity.DAY.truncate( + new DateTime((long) input + .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN))) + .getMillis() + ).build()); + } + } + ); + for (DruidWritable druidWritable : druidWritables) { + druidRecordWriter.write(druidWritable); + } + druidRecordWriter.close(false); + List dataSegmentList = DruidStorageHandlerUtils + .getPublishedSegments(segmentDescriptroPath, config); + Assert.assertEquals(1, dataSegmentList.size()); + File tmpUnzippedSegmentDir = temporaryFolder.newFolder(); + new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir); + final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO + .loadIndex(tmpUnzippedSegmentDir); + + QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex); + + Firehose firehose = new IngestSegmentFirehose( + ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + null, + QueryGranularities.NONE + ); + + List rows = Lists.newArrayList(); + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + + verifyRows(expectedRows, rows); + + } + + private void verifyRows(List> expectedRows, + List actualRows + ) { + System.out.println("actualRows = " + actualRows); + Assert.assertEquals(expectedRows.size(), actualRows.size()); + + for (int i = 0; i < expectedRows.size(); i++) { + Map expected = expectedRows.get(i); + InputRow actual = actualRows.get(i); + + Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions()); + + Assert.assertEquals(expected.get(DruidTable.DEFAULT_TIMESTAMP_COLUMN), + actual.getTimestamp().getMillis() + ); + Assert.assertEquals(expected.get("host"), actual.getDimension("host")); + Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); + Assert.assertEquals( + (Double) expected.get("unique_hosts"), + (Double) HyperUniquesAggregatorFactory + .estimateCardinality(actual.getRaw("unique_hosts")), + 0.001 + ); + } + } + + @Test + public void testSerDesr() throws IOException { + String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}"; + DataSegment dataSegment = objectMapper.reader(DataSegment.class) + .readValue(segment); + Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015")); + } + +}