diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f002c6e931..76fc210abb 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2900,6 +2900,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_HBASE_SNAPSHOT_RESTORE_DIR("hive.hbase.snapshot.restoredir", "/tmp", "The directory in which to " + "restore the HBase table snapshot."), + // For Kudu storage handler + HIVE_KUDU_MASTER_ADDRESSES_DEFAULT("hive.kudu.master.addresses.default", "localhost:7050", + "Comma-separated list of all of the Kudu master addresses.\n" + + "If we are querying Kudu from Hive, this address needs to be declared"), + // For har files HIVEARCHIVEENABLED("hive.archive.enabled", false, "Whether archiving operations are permitted"), diff --git itests/pom.xml itests/pom.xml index 345e9220df..9c20ad65fd 100644 --- itests/pom.xml +++ itests/pom.xml @@ -47,6 +47,7 @@ hive-unit-hadoop2 hive-minikdc qtest-druid + qtest-kudu diff --git itests/qtest-kudu/pom.xml itests/qtest-kudu/pom.xml new file mode 100644 index 0000000000..af8508b288 --- /dev/null +++ itests/qtest-kudu/pom.xml @@ -0,0 +1,494 @@ + + + + 4.0.0 + + + org.apache.hive + hive-it + 4.0.0-SNAPSHOT + ../pom.xml + + + hive-it-qfile-kudu + jar + Hive Integration - QFile Kudu Tests + + + ../.. + None + + + + false + + -mkdir -p + + + + + + + org.apache.hive + hive-common + ${project.version} + test + + + org.apache.hive + hive-contrib + ${project.version} + test + + + org.apache.hive + hive-exec + + + + + org.apache.hive + hive-standalone-metastore-common + ${project.version} + test + + + org.apache.hive + hive-standalone-metastore-common + ${project.version} + tests + test + + + org.apache.hive + hive-standalone-metastore-server + ${project.version} + test + + + org.apache.hive + hive-standalone-metastore-server + ${project.version} + tests + test + + + org.apache.hive + hive-it-custom-serde + ${project.version} + test + + + org.apache.hive + hive-it-util + ${project.version} + test + + + org.apache.hive + hive-exec + + + org.apache.hive + hive-it-druid + + + + + org.apache.hive + hive-serde + ${project.version} + test + + + org.apache.hive + hive-exec + ${project.version} + test + core + + + org.apache.hive + hive-exec + ${project.version} + test + tests + + + + junit + junit + ${junit.version} + test + + + + com.esotericsoftware + kryo + ${kryo.version} + test + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + test + + + javolution + javolution + ${javolution.version} + test + + + com.sun.jersey + jersey-servlet + ${jersey.version} + test + + + org.apache.hadoop + hadoop-archives + ${hadoop.version} + test + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + test + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + tests + test + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + tests + test + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + test + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + ${hadoop.version} + tests + test + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-mapreduce-client-hs + ${hadoop.version} + test + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + test + + + org.apache.hadoop + hadoop-yarn-server-tests + ${hadoop.version} + test + tests + + + org.apache.hadoop + hadoop-yarn-client + ${hadoop.version} + test + + + org.apache.hbase + hbase-common + ${hbase.version} + test + + + org.apache.hbase + hbase-hadoop-compat + ${hbase.version} + test + + + org.apache.hbase + hbase-hadoop2-compat + ${hbase.version} + test + + + org.apache.hbase + hbase-mapreduce + ${hbase.version} + test + + + org.apache.hbase + hbase-server + ${hbase.version} + test + + + org.glassfish.web + javax.servlet.jsp + + + + + org.apache.kudu + kudu-test-utils + ${kudu.version} + test + + + org.apache.tez + tez-tests + ${tez.version} + test-jar + + + org.apache.tez + tez-api + ${tez.version} + test + + + org.apache.tez + tez-runtime-library + ${tez.version} + test + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + org.apache.tez + tez-mapreduce + ${tez.version} + test + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + org.apache.tez + tez-dag + ${tez.version} + test + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + + + + kudu-linux + + + Unix + + + + + org.apache.kudu + kudu-binary + ${kudu.version} + linux-x86_64 + test + + + + + kudu-mac + + + Mac + + + + + org.apache.kudu + kudu-binary + ${kudu.version} + osx-x86_64 + test + + + + + kudu-windows + + + Windows + + + + + **/*.java + + + + + + + + org.codehaus.mojo + properties-maven-plugin + 1.0-alpha-2 + + + initialize + + read-project-properties + + + + ${basedir}/../src/test/resources/testconfiguration.properties + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + generate-tests-sources + generate-test-sources + + + + + + + + + + run + + + + + + org.codehaus.mojo + build-helper-maven-plugin + ${maven.build-helper.plugin.version} + + + add-test-sources + generate-test-sources + + add-test-source + + + + target/generated-test-sources/java + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${exclude.tests} + + + + + + diff --git itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduCliDriver.java itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduCliDriver.java new file mode 100644 index 0000000000..bd8068feaa --- /dev/null +++ itests/qtest-kudu/src/test/java/org/apache/hadoop/hive/cli/TestKuduCliDriver.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import java.io.File; +import java.util.List; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestKuduCliDriver { + + static CliAdapter adapter = new CliConfigs.KuduCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestKuduCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} diff --git itests/qtest-kudu/src/test/java/org/apache/hive/TestDummy.java itests/qtest-kudu/src/test/java/org/apache/hive/TestDummy.java new file mode 100644 index 0000000000..1f1e7e3f25 --- /dev/null +++ itests/qtest-kudu/src/test/java/org/apache/hive/TestDummy.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive; + + +import org.junit.Test; + + +public class TestDummy { + + @Test + public void testDummy() throws Exception { + + } +} diff --git itests/util/pom.xml itests/util/pom.xml index 607fd4724e..bc14d1821f 100644 --- itests/util/pom.xml +++ itests/util/pom.xml @@ -89,6 +89,17 @@ ${project.version} tests + + org.apache.hive + hive-kudu-handler + ${project.version} + + + org.apache.hive + hive-kudu-handler + ${project.version} + tests + org.apache.hive hive-metastore @@ -182,6 +193,11 @@ hbase-mapreduce ${hbase.version} + + org.apache.kudu + kudu-test-utils + ${kudu.version} + org.apache.tez tez-api diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 5c17e1ade6..a2b0618a62 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -778,4 +778,24 @@ public MiniDruidLlapLocalCliConfig() { } } + public static class KuduCliConfig extends AbstractCliConfig { + public KuduCliConfig() { + super(CoreKuduCliDriver.class); + try { + setQueryDir("kudu-handler/src/test/queries/positive"); + + setResultsDir("kudu-handler/src/test/results/positive"); + setLogDir("itests/qtest/target/qfile-results/kudu-handler/positive"); + + setInitScript("q_test_init_src.sql"); + setCleanupScript("q_test_cleanup_src.sql"); + + setHiveConfDir(""); + setClusterType(MiniClusterType.NONE); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + } diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduCliDriver.java new file mode 100644 index 0000000000..c287281150 --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreKuduCliDriver.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli.control; + +import org.apache.hadoop.hive.kudu.KuduQTestUtil; +import org.apache.hadoop.hive.kudu.KuduTestSetup; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestProcessExecResult; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.File; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class CoreKuduCliDriver extends CliAdapter { + + private KuduQTestUtil qt; + + public CoreKuduCliDriver(AbstractCliConfig cliConfig) { + super(cliConfig); + } + + @Override + @BeforeClass + public void beforeClass() { + MiniClusterType miniMR = cliConfig.getClusterType(); + String initScript = cliConfig.getInitScript(); + String cleanupScript = cliConfig.getCleanupScript(); + + try { + qt = new KuduQTestUtil(cliConfig.getResultsDir(), cliConfig.getLogDir(), miniMR, + new KuduTestSetup(), initScript, cleanupScript); + + // do a one time initialization + qt.newSession(); + qt.cleanUp(); + qt.createSources(); + } catch (Exception e) { + throw new RuntimeException("Unexpected exception in setUp", e); + } + } + + @Override + @AfterClass + public void shutdown() { + try { + qt.shutdown(); + } catch (Exception e) { + throw new RuntimeException("Unexpected exception in tearDown", e); + } + } + + @Override + @Before + public void setUp() { + try { + qt.newSession(); + + } catch (Exception e) { + System.err.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.err.flush(); + fail("Unexpected exception in setup"); + } + } + + @Override + @After + public void tearDown() { + try { + qt.clearPostTestEffects(); + qt.clearTestSideEffects(); + + } catch (Exception e) { + System.err.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.err.flush(); + fail("Unexpected exception in tearDown"); + } + } + + @Override + public void runTest(String tname, String fname, String fpath) { + long startTime = System.currentTimeMillis(); + try { + System.err.println("Begin query: " + fname); + + qt.addFile(fpath); + qt.cliInit(new File(fpath)); + + CommandProcessorResponse response = qt.executeClient(fname); + if (response.getResponseCode() != 0) { + qt.failedQuery(response.getException(), response.getResponseCode(), fname, null); + } + + QTestProcessExecResult result = qt.checkCliDriverResults(fname); + if (result.getReturnCode() != 0) { + qt.failedDiff(result.getReturnCode(), fname, result.getCapturedOutput()); + } + qt.clearPostTestEffects(); + + } catch (Exception e) { + qt.failedWithException(e, fname, null); + } + + long elapsedTime = System.currentTimeMillis() - startTime; + System.err.println("Done query: " + fname + " elapsedTime=" + elapsedTime/1000 + "s"); + assertTrue("Test passed", true); + } +} + diff --git itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduQTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduQTestUtil.java new file mode 100644 index 0000000000..49332f3bcb --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduQTestUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.kudu; + +import org.apache.hadoop.hive.ql.QTestArguments; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestUtil; + +/** + * KuduQTestUtil initializes Kudu-specific test fixtures. + */ +public class KuduQTestUtil extends QTestUtil { + + public KuduQTestUtil(String outDir, String logDir, MiniClusterType miniMr, + KuduTestSetup setup, String initScript, String cleanupScript) throws Exception { + + super( + QTestArguments.QTestArgumentsBuilder.instance() + .withOutDir(outDir) + .withLogDir(logDir) + .withClusterType(miniMr) + .withConfDir(null) + .withInitScript(initScript) + .withCleanupScript(cleanupScript) + .withLlapIo(false) + .withQTestSetup(setup) + .build()); + } +} diff --git itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduTestSetup.java itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduTestSetup.java new file mode 100644 index 0000000000..1a6b8e8a95 --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/kudu/KuduTestSetup.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.kudu; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QTestMiniClusters; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.shaded.com.google.common.collect.ImmutableList; +import org.apache.kudu.test.cluster.MiniKuduCluster; + +import java.io.File; +import java.util.Arrays; + +/** + * Start and stop a Kudu MiniCluster for testing purposes. + */ +public class KuduTestSetup extends QTestMiniClusters.QTestSetup { + + public static final String KV_TABLE_NAME = "default.kudu_kv"; + public static final String ALL_TYPES_TABLE_NAME = "default.kudu_all_types"; + + public static final Schema ALL_TYPES_SCHEMA = KuduTestUtils.getAllTypesSchema(); + public static final Schema KV_SCHEMA = new Schema(Arrays.asList( + new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(), + new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build()) + ); + + private MiniKuduCluster miniCluster; + + public KuduTestSetup() { + } + + @Override + public void preTest(HiveConf conf) throws Exception { + super.preTest(conf); + setupWithHiveConf(conf); + } + + private void setupWithHiveConf(HiveConf conf) throws Exception { + if (null == miniCluster) { + String testTmpDir = System.getProperty("test.tmp.dir"); + File tmpDir = new File(testTmpDir, "kudu"); + + if (tmpDir.exists()) { + FileUtils.deleteDirectory(tmpDir); + } + + miniCluster = new MiniKuduCluster.MiniKuduClusterBuilder() + .numMasterServers(3) + .numTabletServers(3) + .build(); + + createKuduTables(miniCluster.getMasterAddressesAsString()); + } + + updateConf(conf); + } + + /** + * Update hiveConf with the Kudu specific parameters + * @param conf The hiveconf to update + */ + private void updateConf(HiveConf conf) { + if (miniCluster != null) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_KUDU_MASTER_ADDRESSES_DEFAULT, + miniCluster.getMasterAddressesAsString()); + } + } + + private void createKuduTables(String masterAddresses) throws KuduException { + + KuduClient client = new KuduClient.KuduClientBuilder(masterAddresses).build(); + try { + createKVTable(client); + createAllTypesTable(client); + } finally { + client.close(); + } + } + + public void createKVTable(KuduClient client) throws KuduException { + if (client.tableExists(KV_TABLE_NAME)) { + client.deleteTable(KV_TABLE_NAME); + } + CreateTableOptions options = new CreateTableOptions() + .addHashPartitions(Arrays.asList("key"), 4); + client.createTable(KV_TABLE_NAME, KV_SCHEMA, options); + } + + public void createAllTypesTable(KuduClient client) throws KuduException { + if (client.tableExists(ALL_TYPES_TABLE_NAME)) { + client.deleteTable(ALL_TYPES_TABLE_NAME); + } + CreateTableOptions options = new CreateTableOptions() + .setRangePartitionColumns(ImmutableList.of("key")); + client.createTable(ALL_TYPES_TABLE_NAME, ALL_TYPES_SCHEMA, options); + } + + @Override + public void tearDown() throws Exception { + if (null != miniCluster) { + miniCluster.shutdown(); + miniCluster = null; + } + super.tearDown(); + } +} diff --git kudu-handler/pom.xml kudu-handler/pom.xml new file mode 100644 index 0000000000..4082840592 --- /dev/null +++ kudu-handler/pom.xml @@ -0,0 +1,155 @@ + + + + 4.0.0 + + org.apache.hive + hive + 4.0.0-SNAPSHOT + ../pom.xml + + + hive-kudu-handler + jar + Hive Kudu Handler + + + .. + None + + + + + + + org.apache.hive + hive-exec + ${project.version} + + + + org.apache.hadoop + hadoop-common + provided + ${hadoop.version} + true + + + org.apache.hadoop + provided + hadoop-mapreduce-client-core + ${hadoop.version} + true + + + org.apache.kudu + kudu-client + ${kudu.version} + + + + junit + junit + ${junit.version} + test + + + org.apache.kudu + kudu-test-utils + ${kudu.version} + test + + + + + ${basedir}/src/java + ${basedir}/src/test + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${exclude.tests} + + + + + + + + + + kudu-linux + + + Unix + + + + + org.apache.kudu + kudu-binary + ${kudu.version} + linux-x86_64 + test + + + + + kudu-mac + + + mac + + + + + org.apache.kudu + kudu-binary + ${kudu.version} + osx-x86_64 + test + + + + + kudu-windows + + + Windows + + + + + **/*.java + + + + diff --git kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduHiveUtils.java kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduHiveUtils.java new file mode 100644 index 0000000000..6240fe5159 --- /dev/null +++ kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduHiveUtils.java @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import java.security.AccessController; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import javax.security.auth.Subject; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.kudu.ColumnTypeAttributes; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY; + +/** + * A collection of static utility methods for the Kudu Hive integration. + * This is useful for code sharing. + */ +public class KuduHiveUtils { + + private static final Logger LOG = LoggerFactory.getLogger(KuduHiveUtils.class); + + private static final Text KUDU_TOKEN_KIND = new Text("kudu-authn-data"); + + /** + * Returns the union of the configuration and table properties with the + * table properties taking precedence. + */ + public static Configuration createOverlayedConf(Configuration conf, Properties tblProps) { + Configuration newConf = new Configuration(conf); + for (Map.Entry prop : tblProps.entrySet()) { + newConf.set((String) prop.getKey(), (String) prop.getValue()); + } + return newConf; + } + + public static String getMasterAddresses(Configuration conf) { + // Load the default configuration. + String masterAddresses = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_KUDU_MASTER_ADDRESSES_DEFAULT); + if (StringUtils.isEmpty(masterAddresses)) { + throw new IllegalStateException("Kudu master addresses not specified in configuration"); + } + // Override with the table configuration if it exists. + if (!StringUtils.isEmpty(conf.get(KUDU_MASTER_ADDRS_KEY))) { + masterAddresses = conf.get(KUDU_MASTER_ADDRS_KEY); + } + return masterAddresses; + } + + // TODO: Support more configurations. Consider a Properties based setter on the KuduClientBuilder. + public static KuduClient getKuduClient(Configuration conf) { + String masterAddresses = getMasterAddresses(conf); + if (StringUtils.isEmpty(masterAddresses)) { + throw new IllegalArgumentException(KUDU_MASTER_ADDRS_KEY + " is not set."); + } + KuduClient client = new KuduClient.KuduClientBuilder(masterAddresses).build(); + importCredentialsFromCurrentSubject(client); + return client; + } + + public static void importCredentialsFromCurrentSubject(KuduClient client) { + Subject subj = Subject.getSubject(AccessController.getContext()); + if (subj == null) { + return; + } + Text service = new Text(client.getMasterAddressesAsString()); + // Find the Hadoop credentials stored within the JAAS subject. + Set credSet = subj.getPrivateCredentials(Credentials.class); + for (Credentials creds : credSet) { + for (Token tok : creds.getAllTokens()) { + if (!tok.getKind().equals(KUDU_TOKEN_KIND)) { + continue; + } + // Only import credentials relevant to the service corresponding to + // 'client'. This is necessary if we want to support a job which + // reads from one cluster and writes to another. + if (!tok.getService().equals(service)) { + LOG.debug("Not importing credentials for service " + service + + "(expecting service " + service + ")"); + continue; + } + LOG.debug("Importing credentials for service " + service); + client.importAuthenticationCredentials(tok.getPassword()); + return; + } + } + } + + /* This method converts a Kudu type to to the corresponding Hive type */ + public static PrimitiveTypeInfo toHiveType(Type kuduType, ColumnTypeAttributes attributes) + throws SerDeException { + switch (kuduType) { + case BOOL: return TypeInfoFactory.booleanTypeInfo; + case INT8: return TypeInfoFactory.byteTypeInfo; + case INT16: return TypeInfoFactory.shortTypeInfo; + case INT32: return TypeInfoFactory.intTypeInfo; + case INT64: return TypeInfoFactory.longTypeInfo; + case UNIXTIME_MICROS: return TypeInfoFactory.timestampTypeInfo; + case DECIMAL: + return TypeInfoFactory.getDecimalTypeInfo(attributes.getPrecision(), attributes.getScale()); + case FLOAT: return TypeInfoFactory.floatTypeInfo; + case DOUBLE: return TypeInfoFactory.doubleTypeInfo; + case STRING: return TypeInfoFactory.stringTypeInfo; + case BINARY: return TypeInfoFactory.binaryTypeInfo; + default: + throw new SerDeException("Unsupported column type: " + kuduType); + } + } +} diff --git kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduInputFormat.java kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduInputFormat.java new file mode 100644 index 0000000000..135172ea1b --- /dev/null +++ kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduInputFormat.java @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.kudu.client.Bytes; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduScanToken; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.LocatedTablet; +import org.apache.kudu.client.RowResult; + +public class KuduInputFormat extends InputFormat + implements org.apache.hadoop.mapred.InputFormat, + VectorizedInputFormatInterface { + + @Override + public List getSplits(JobContext context) throws IOException { + return computeSplits(context.getConfiguration()).stream() + .map(is -> (InputSplit) is) + .collect(Collectors.toList()); + } + + @Override + public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf conf, int numSplits) + throws IOException { + List splits = computeSplits(conf); + return splits.toArray(new org.apache.hadoop.mapred.InputSplit[0]); + } + + private List computeSplits(Configuration conf) throws IOException { + try (KuduClient client = KuduHiveUtils.getKuduClient(conf)) { + // Hive depends on FileSplits so we get the dummy Path for the Splits. + Job job = Job.getInstance(conf); + JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); + Path[] paths = FileInputFormat.getInputPaths(jobContext); + Path dummyPath = paths[0]; + + String tableName = conf.get(KUDU_TABLE_NAME_KEY); + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException(KUDU_TABLE_NAME_KEY + " is not set."); + } + if (!client.tableExists(tableName)) { + throw new IllegalArgumentException("Kudu table does not exist: " + tableName); + } + + KuduTable table = client.openTable(tableName); + List predicates = KuduPredicateHandler.getPredicates(conf, table.getSchema()); + + // TODO: Support more configurations. + KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table) + .setProjectedColumnNames(getProjectedColumns(conf)); + + for (KuduPredicate predicate : predicates) { + tokenBuilder.addPredicate(predicate); + } + List tokens = tokenBuilder.build(); + + List splits = new ArrayList<>(tokens.size()); + for (KuduScanToken token : tokens) { + List locations = new ArrayList<>(token.getTablet().getReplicas().size()); + for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) { + locations.add(replica.getRpcHost()); + } + splits.add(new KuduInputSplit(token, dummyPath, locations.toArray(new String[0]))); + } + return splits; + } + } + + private List getProjectedColumns(Configuration conf) throws IOException { + String[] columnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS); + if (null == columnNamesArr) { + throw new IOException( + "Hive column names must be provided to InputFormat in the Configuration"); + } + List columns = new ArrayList<>(Arrays.asList(columnNamesArr)); + VirtualColumn.removeVirtualColumns(columns); + return columns; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) + { + Preconditions.checkArgument(split instanceof KuduInputSplit); + // Will be initialized via the initialize method. + return new KuduRecordReader(); + } + + @Override + public org.apache.hadoop.mapred.RecordReader getRecordReader( + org.apache.hadoop.mapred.InputSplit split, JobConf conf, Reporter reporter) + throws IOException { + Preconditions.checkArgument(split instanceof KuduInputSplit); + KuduRecordReader recordReader = new KuduRecordReader(); + recordReader.initialize((KuduInputSplit) split, conf); + return recordReader; + } + + @Override + public VectorizedSupport.Support[] getSupportedFeatures() { + return new VectorizedSupport.Support[0]; + } + + /** + * An InputSplit represents the data to be processed by an individual Mapper. + * This is effectively a wrapper around a Kudu scan token. + */ + static class KuduInputSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit { + /** The scan token that the split will use to scan the Kudu table. */ + private byte[] serializedScanToken; + + /** Tablet server locations which host the tablet to be scanned. */ + private String[] locations; + + @SuppressWarnings("unused") // required for deserialization. + public KuduInputSplit() { + super(null, 0, 0, (String[]) null); + } + + KuduInputSplit(KuduScanToken token, Path dummyPath, String[] locations) throws IOException { + super(dummyPath, 0, 0, locations); + this.serializedScanToken = token.serialize(); + this.locations = locations; + } + + byte[] getSerializedScanToken() { + return serializedScanToken; + } + + @Override + public long getLength() { + return 0; + } + + @Override + public String[] getLocations() { + return locations; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + serializedScanToken = Bytes.readByteArray(in); + locations = new String[in.readInt()]; + for (int i = 0; i < locations.length; i++) { + byte[] str = Bytes.readByteArray(in); + locations[i] = Bytes.getString(str); + } + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Bytes.writeByteArray(out, serializedScanToken); + out.writeInt(locations.length); + for (String location : locations) { + byte[] str = Bytes.fromString(location); + Bytes.writeByteArray(out, str); + } + } + } + + /** + * A RecordReader that reads the Kudu rows from a KuduInputSplit. + */ + static class KuduRecordReader extends RecordReader + implements org.apache.hadoop.mapred.RecordReader { + + private volatile boolean initialized = false; + private KuduClient client; + private KuduScanner scanner; + private Iterator iterator; + private RowResult currentValue; + private KuduWritable currentWritable; + private long pos; + + KuduRecordReader() {} + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { + Preconditions.checkArgument(split instanceof KuduInputSplit); + initialize((KuduInputSplit) split, context.getConfiguration()); + } + + private synchronized void initialize(KuduInputSplit split, Configuration conf) + throws IOException { + if (!initialized) { + byte[] serializedScanToken = split.getSerializedScanToken(); + client = KuduHiveUtils.getKuduClient(conf); + scanner = KuduScanToken.deserializeIntoScanner(serializedScanToken, client); + iterator = scanner.iterator(); + currentValue = null; + currentWritable = new KuduWritable(scanner.getProjectionSchema().newPartialRow()); + pos = 0; + initialized = true; + } + } + + @Override + public boolean nextKeyValue() { + if (iterator.hasNext()) { + currentValue = iterator.next(); + currentWritable.setRow(currentValue); + pos++; + return true; + } + currentValue = null; + return false; + } + + @Override + public NullWritable getCurrentKey() { + return NullWritable.get(); + } + + @Override + public KuduWritable getCurrentValue() { + Preconditions.checkNotNull(currentValue); + return currentWritable; + } + + @Override + public boolean next(NullWritable nullWritable, KuduWritable kuduWritable) { + if (nextKeyValue()) { + kuduWritable.setRow(currentValue); + return true; + } + return false; + } + + @Override public NullWritable createKey() { + return NullWritable.get(); + } + + @Override public KuduWritable createValue() { + return new KuduWritable(scanner.getProjectionSchema().newPartialRow()); + } + + @Override + public void close() throws IOException { + try { + scanner.close(); + } catch (KuduException e) { + throw new IOException(e); + } + client.shutdown(); + } + + @Override + public long getPos() { + return pos; + } + + @Override + public float getProgress() { + return 0; + } + } +} diff --git kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduOutputFormat.java kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduOutputFormat.java new file mode 100644 index 0000000000..e1a539ec72 --- /dev/null +++ kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduOutputFormat.java @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import java.io.IOException; +import java.util.Properties; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Progressable; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.RowError; +import org.apache.kudu.client.RowErrorsAndOverflowStatus; +import org.apache.kudu.client.SessionConfiguration.FlushMode; + +import static org.apache.hadoop.hive.kudu.KuduHiveUtils.createOverlayedConf; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY; + +// TODO: Support AcidOutputFormat to fully support updates and deletes. +public class KuduOutputFormat extends OutputFormat + implements HiveOutputFormat { + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, + Properties tableProperties, + Progressable progress) + throws IOException { + return new KuduRecordWriter(createOverlayedConf(jc, tableProperties)); + } + + + @Override + public org.apache.hadoop.mapred.RecordWriter getRecordWriter( + FileSystem ignored, JobConf job, String name, Progressable progress) + throws IOException { + return new KuduRecordWriter(job); + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + return new KuduRecordWriter(context.getConfiguration()); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) { + // Not doing any check. + } + + @Override + public void checkOutputSpecs(JobContext context) { + // Not doing any check. + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new KuduOuputComitter(); + } + + static class KuduRecordWriter extends RecordWriter + implements FileSinkOperator.RecordWriter, + org.apache.hadoop.mapred.RecordWriter { + private KuduClient client; + private KuduTable table; + private KuduSession session; + + KuduRecordWriter(Configuration conf) throws IOException { + this.client = KuduHiveUtils.getKuduClient(conf); + + String tableName = conf.get(KUDU_TABLE_NAME_KEY); + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException(KUDU_TABLE_NAME_KEY + " is not set."); + } + if (!client.tableExists(tableName)) { + throw new IllegalArgumentException("Kudu table does not exist: " + tableName); + } + + this.table = client.openTable(tableName); + // TODO: Support more configurations. Consider a Properties based setter on the session. + this.session = client.newSession(); + this.session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND); + } + + @Override + public void write(Writable row) throws IOException { + Preconditions.checkArgument(row instanceof KuduWritable); + // TODO: Support configurable operation type. + Operation op = table.newUpsert(); + ((KuduWritable) row).populateRow(op.getRow()); + session.apply(op); + } + + @Override + public void write(NullWritable key, KuduWritable value) throws IOException { + write(value); + } + + @Override + public void close(boolean abort) throws IOException { + session.close(); + processErrors(); + client.close(); + } + + @Override + public void close(TaskAttemptContext context) throws IOException { + close(false); + } + + @Override + public void close(Reporter reporter) throws IOException { + close(false); + } + + private void processErrors() throws IOException { + RowErrorsAndOverflowStatus pendingErrors = session.getPendingErrors(); + if (pendingErrors.getRowErrors().length != 0) { + RowError[] errors = pendingErrors.getRowErrors(); + // Build a sample of error strings. + int sampleSize = 5; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < errors.length; i++) { + if (i == sampleSize) { break; } + sb.append(errors[i].getErrorStatus().toString()); + } + if (pendingErrors.isOverflowed()) { + throw new IOException( + "PendingErrors overflowed. Failed to write at least " + errors.length + " rows " + + "to Kudu; Sample errors: " + sb.toString()); + } else { + throw new IOException( + "Failed to write " + errors.length + " rows to Kudu; Sample errors: " + + sb.toString()); + } + } + } + } + + /** + * A dummy committer class that does not do anything. + */ + static class KuduOuputComitter extends OutputCommitter { + @Override + public void setupJob(JobContext jobContext) { + // do nothing. + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) { + // do nothing. + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) { + // do nothing. + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) { + // do nothing. + } + } +} diff --git kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduPredicateHandler.java kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduPredicateHandler.java new file mode 100644 index 0000000000..e6537a8d75 --- /dev/null +++ kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduPredicateHandler.java @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.KuduPredicate; + +import java.util.ArrayList; +import java.util.List; + +public class KuduPredicateHandler { + + private KuduPredicateHandler() {} + + /** + * Analyzes the predicates and return the portion of it which + * cannot be evaluated by Kudu during table access. + * + * @param predicateExpr predicate to be decomposed + * @param schema the schema of the Kudu table + * @return decomposed form of predicate, or null if no pushdown is possible at all + */ + public static DecomposedPredicate decompose(ExprNodeDesc predicateExpr, Schema schema) { + IndexPredicateAnalyzer analyzer = newAnalyzer(schema); + List sConditions = new ArrayList<>(); + ExprNodeDesc residualPredicate = analyzer.analyzePredicate(predicateExpr, sConditions); + + // Nothing to decompose. + if (sConditions.size() == 0) { + return null; + } + + DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); + decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(sConditions); + decomposedPredicate.residualPredicate = (ExprNodeGenericFuncDesc) residualPredicate; + return decomposedPredicate; + } + + /** + * Returns the list of Kudu predicates based on the + * {@value TableScanDesc#FILTER_EXPR_CONF_STR} configuration property. + * + * @param conf the execution configuration + * @param schema the schema of the Kudu table + * @return the list of Kudu predicates + */ + public static List getPredicates(Configuration conf, Schema schema) { + List predicates = new ArrayList<>(); + for (IndexSearchCondition sc : getSearchConditions(conf, schema)) { + predicates.add(conditionToPredicate(sc, schema)); + } + return predicates; + } + + private static List getSearchConditions(Configuration conf, Schema schema) { + List conditions = new ArrayList<>(); + ExprNodeDesc filterExpr = getExpression(conf); + if (null == filterExpr) { + return conditions; + } + IndexPredicateAnalyzer analyzer = newAnalyzer(schema); + ExprNodeDesc residual = analyzer.analyzePredicate(filterExpr, conditions); + if (residual != null) { + throw new RuntimeException("Unexpected residual predicate: " + residual.getExprString()); + } + return conditions; + } + + private static ExprNodeDesc getExpression(Configuration conf) { + String filteredExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filteredExprSerialized == null) { + return null; + } + return SerializationUtilities.deserializeExpression(filteredExprSerialized); + } + + private static KuduPredicate conditionToPredicate(IndexSearchCondition condition, Schema schema) { + ColumnSchema column = schema.getColumn(condition.getColumnDesc().getColumn()); + GenericUDF genericUDF = condition.getOriginalExpr().getGenericUDF(); + Object value = condition.getConstantDesc().getValue(); + if (genericUDF instanceof GenericUDFOPEqual) { + return KuduPredicate.newComparisonPredicate(column, KuduPredicate.ComparisonOp.EQUAL, value); + } else if (genericUDF instanceof GenericUDFOPGreaterThan) { + return KuduPredicate.newComparisonPredicate(column, KuduPredicate.ComparisonOp.GREATER, value); + } else if (genericUDF instanceof GenericUDFOPEqualOrGreaterThan) { + return KuduPredicate.newComparisonPredicate(column, KuduPredicate.ComparisonOp.GREATER_EQUAL, value); + } else if (genericUDF instanceof GenericUDFOPLessThan) { + return KuduPredicate.newComparisonPredicate(column, KuduPredicate.ComparisonOp.LESS, value); + } else if (genericUDF instanceof GenericUDFOPEqualOrLessThan) { + return KuduPredicate.newComparisonPredicate(column, KuduPredicate.ComparisonOp.LESS_EQUAL, value); + } else if (genericUDF instanceof GenericUDFOPNull) { + return KuduPredicate.newIsNullPredicate(column); + } else if (genericUDF instanceof GenericUDFOPNotNull) { + return KuduPredicate.newIsNotNullPredicate(column); + } else { + throw new RuntimeException("Unhandled Predicate: " + genericUDF); + } + } + + private static IndexPredicateAnalyzer newAnalyzer(Schema schema) { + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + + // Register comparison operators which can be satisfied by Kudu predicates. + analyzer.addComparisonOp(GenericUDFOPEqual.class.getName()); + analyzer.addComparisonOp(GenericUDFOPNotEqual.class.getName()); + analyzer.addComparisonOp(GenericUDFOPGreaterThan.class.getName()); + analyzer.addComparisonOp(GenericUDFOPEqualOrGreaterThan.class.getName()); + analyzer.addComparisonOp(GenericUDFOPLessThan.class.getName()); + analyzer.addComparisonOp(GenericUDFOPEqualOrLessThan.class.getName()); + analyzer.addComparisonOp(GenericUDFOPAnd.class.getName()); + // TODO: For some reason the analyzer doesn't handle these. + // analyzer.addComparisonOp(GenericUDFOPNull.class.getName()); + // analyzer.addComparisonOp(GenericUDFOPNotNull.class.getName()); + // analyzer.addComparisonOp(GenericUDFIn.class.getName()); + + // Set the column names that can be satisfied. + for (ColumnSchema col : schema.getColumns()) { + analyzer.allowColumnName(col.getName()); + } + + return analyzer; + } +} diff --git kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduSerDe.java kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduSerDe.java new file mode 100644 index 0000000000..127ab60c97 --- /dev/null +++ kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduSerDe.java @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeSpec; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnTypeAttributes; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.PartialRow; + +import static org.apache.hadoop.hive.kudu.KuduHiveUtils.createOverlayedConf; +import static org.apache.hadoop.hive.kudu.KuduHiveUtils.toHiveType; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY; + +/** + * A Kudu serializer and deserializer to support reading and writing Kudu data from Hive. + */ +@SerDeSpec(schemaProps = { KuduStorageHandler.KUDU_TABLE_NAME_KEY }) +public class KuduSerDe extends AbstractSerDe { + + private ObjectInspector objectInspector; + private Schema schema; + + @SuppressWarnings("unused") + public KuduSerDe() {} + + @Override + public void initialize(Configuration sysConf, Properties tblProps) + throws SerDeException { + Configuration conf = createOverlayedConf(sysConf, tblProps); + String tableName = conf.get(KuduStorageHandler.KUDU_TABLE_NAME_KEY); + if (StringUtils.isEmpty(tableName)) { + throw new SerDeException(KUDU_TABLE_NAME_KEY + " is not set."); + } + try (KuduClient client = KuduHiveUtils.getKuduClient(conf)) { + if (!client.tableExists(tableName)) { + throw new SerDeException("Kudu table does not exist: " + tableName); + } + schema = client.openTable(tableName).getSchema(); + } catch (KuduException|IllegalArgumentException ex) { + throw new SerDeException(ex); + } + this.objectInspector = createObjectInspector(schema); + } + + private static ObjectInspector createObjectInspector(Schema schema) throws SerDeException { + Preconditions.checkNotNull(schema); + List fieldNames = new ArrayList<>(); + List fieldInspectors = new ArrayList<>(); + List fieldComments = new ArrayList<>(); + for (int i = 0; i < schema.getColumnCount(); i++) { + ColumnSchema col = schema.getColumnByIndex(i); + PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes()); + fieldNames.add(col.getName()); + fieldInspectors.add( + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo)); + fieldComments.add(col.getComment()); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldInspectors, + fieldComments); + } + + public Schema getSchema() { + return schema; + } + + @Override + public ObjectInspector getObjectInspector() { + return objectInspector; + } + + @Override + public Class getSerializedClass() { + return KuduWritable.class; + } + + /** + * Serialize an object by navigating inside the Object with the ObjectInspector. + */ + @Override + public KuduWritable serialize(Object obj, ObjectInspector objectInspector) throws SerDeException { + Preconditions.checkArgument(objectInspector.getCategory() == Category.STRUCT); + + StructObjectInspector soi = (StructObjectInspector) objectInspector; + List writableObj = soi.getStructFieldsDataAsList(obj); + List fields = soi.getAllStructFieldRefs(); + + PartialRow row = schema.newPartialRow(); + for (int i = 0; i < fields.size(); i++) { + StructField field = fields.get(i); + String columnName = field.getFieldName(); + Object value = writableObj.get(i); + if (value instanceof NullWritable) { + row.setNull(field.getFieldName()); + } else { + Type type = schema.getColumn(columnName).getType(); + ObjectInspector inspector = field.getFieldObjectInspector(); + switch (type) { + case BOOL: + boolean boolVal = ((BooleanObjectInspector) inspector).get(value); + row.addBoolean(columnName, boolVal); + break; + case INT8: + byte byteVal = ((ByteObjectInspector) inspector).get(value); + row.addByte(columnName, byteVal); + break; + case INT16: + short shortVal = ((ShortObjectInspector) inspector).get(value); + row.addShort(columnName, shortVal); + break; + case INT32: + int intVal = ((IntObjectInspector) inspector).get(value); + row.addInt(columnName, intVal); + break; + case INT64: + long longVal = ((LongObjectInspector) inspector).get(value); + row.addLong(columnName, longVal); + break; + case UNIXTIME_MICROS: + java.sql.Timestamp timestampVal = ((TimestampObjectInspector) inspector) + .getPrimitiveJavaObject(value).toSqlTimestamp(); + row.addTimestamp(columnName, timestampVal); + break; + case DECIMAL: + HiveDecimal decimalVal = ((HiveDecimalObjectInspector) inspector) + .getPrimitiveJavaObject(value); + row.addDecimal(columnName, decimalVal.bigDecimalValue()); + break; + case FLOAT: + float floatVal = ((FloatObjectInspector) inspector).get(value); + row.addFloat(columnName, floatVal); + break; + case DOUBLE: + double doubleVal = ((DoubleObjectInspector) inspector).get(value); + row.addDouble(columnName, doubleVal); + break; + case STRING: + String stringVal = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value); + row.addString(columnName, stringVal); + break; + case BINARY: + byte[] bytesVal = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value); + row.addBinary(columnName, bytesVal); + break; + default: + throw new SerDeException("Unsupported column type: " + type.name()); + } + } + } + return new KuduWritable(row); + } + + /** + * Deserialize an object out of a Writable blob. + */ + @Override + public Object deserialize(Writable writable) throws SerDeException { + KuduWritable input = (KuduWritable) writable; + List output = new ArrayList<>(); + for(int i = 0; i < schema.getColumnCount(); i++) { + // If the column isn't set, skip it. + if (!input.isSet(i)) { + continue; + } + Object javaObj = input.getValueObject(i); + ColumnSchema col = schema.getColumnByIndex(i); + PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes()); + if (javaObj == null) { + output.add(NullWritable.get()); + } else { + switch (typeInfo.getPrimitiveCategory()) { + case BOOLEAN: output.add(new BooleanWritable((boolean) javaObj)); break; + case BYTE: output.add(new ByteWritable((byte) javaObj)); break; + case SHORT: output.add(new ShortWritable((short) javaObj)); break; + case INT: output.add(new IntWritable((int) javaObj)); break; + case LONG: output.add(new LongWritable((long) javaObj)); break; + case TIMESTAMP: + java.sql.Timestamp sqlTs = (java.sql.Timestamp) javaObj; + Timestamp hiveTs = Timestamp.ofEpochMilli(sqlTs.getTime(), sqlTs.getNanos()); + output.add(new TimestampWritableV2(hiveTs)); break; + case DECIMAL: + HiveDecimal hiveDecimal = HiveDecimal.create((BigDecimal) javaObj); + output.add(new HiveDecimalWritable(hiveDecimal)); break; + case FLOAT: output.add(new FloatWritable((float) javaObj)); break; + case DOUBLE: output.add(new DoubleWritable((double) javaObj)); break; + case STRING: output.add(new Text((String) javaObj)); break; + case BINARY: output.add(new BytesWritable((byte[]) javaObj)); break; + default: + throw new SerDeException("Unsupported type: " + typeInfo.getPrimitiveCategory()); + } + } + } + return output; + } + + @Override + public SerDeStats getSerDeStats() { + // No support for statistics. That seems to be a popular answer. + return null; + } +} + + diff --git kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduStorageHandler.java kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduStorageHandler.java new file mode 100644 index 0000000000..6ce5618091 --- /dev/null +++ kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduStorageHandler.java @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.kudu.KuduOutputFormat.KuduRecordWriter; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.apache.kudu.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides a HiveStorageHandler implementation for Apache Kudu. + */ +public class KuduStorageHandler extends DefaultStorageHandler implements HiveStoragePredicateHandler { + + private static final Logger LOG = LoggerFactory.getLogger(KuduStorageHandler.class); + + private static final String KUDU_PROPERTY_PREFIX = "kudu."; + + /** Table Properties. Used in the hive table definition when creating a new table. */ + public static final String KUDU_TABLE_ID_KEY = KUDU_PROPERTY_PREFIX + "table_id"; + public static final String KUDU_TABLE_NAME_KEY = KUDU_PROPERTY_PREFIX + "table_name"; + public static final String KUDU_MASTER_ADDRS_KEY = KUDU_PROPERTY_PREFIX + "master_addresses"; + public static final List KUDU_TABLE_PROPERTIES = + Arrays.asList(KUDU_TABLE_ID_KEY, KUDU_TABLE_NAME_KEY, KUDU_MASTER_ADDRS_KEY); + + private Configuration conf; + + @Override + public Class getInputFormatClass() { + return KuduInputFormat.class; + } + + @Override + public Class getOutputFormatClass() { + return KuduOutputFormat.class; + } + + @Override + public Class getSerDeClass() { + return KuduSerDe.class; + } + + // TODO: Support HiveMetaHook interface and handle HMS sync enabled/disabled. + @Override + public HiveMetaHook getMetaHook() { + return null; + } + + @Override + public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + return new DefaultHiveAuthorizationProvider(); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void configureInputJobProperties(TableDesc tableDesc, + Map jobProperties) { + configureJobProperties(tableDesc, jobProperties); + } + + @Override + public void configureOutputJobProperties(TableDesc tableDesc, + Map jobProperties) { + configureJobProperties(tableDesc, jobProperties); + } + + @Override + public void configureTableJobProperties(TableDesc tableDesc, + Map jobProperties) { + configureJobProperties(tableDesc, jobProperties); + } + + @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + // Copied from the DruidStorageHandler. + if (UserGroupInformation.isSecurityEnabled()) { + // AM can not do Kerberos Auth so will do the input split generation in the HS2 + LOG.debug("Setting {} to {} to enable split generation on HS2", + HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(), + Boolean.FALSE.toString()); + jobConf.set(HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(), Boolean.FALSE.toString()); + } + try { + addDependencyJars(jobConf, KuduRecordWriter.class); + } catch (IOException e) { + Throwables.propagate(e); + } + } + + // Copied from the DruidStorageHandler. + private static void addDependencyJars(Configuration conf, Class... classes) + throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Set jars = new HashSet<>(conf.getStringCollection("tmpjars")); + for (Class clazz : classes) { + if (clazz == null) { + continue; + } + final String path = Utilities.jarFinderGetJar(clazz); + if (path == null) { + throw new RuntimeException("Could not find jar for class " + clazz + + " in order to ship it to the cluster."); + } + if (!localFs.exists(new Path(path))) { + throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz); + } + jars.add(path); + } + if (jars.isEmpty()) { + return; + } + //noinspection ToArrayCallWithZeroLengthArrayArgument + conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); + } + + private void configureJobProperties(TableDesc tableDesc, + Map jobProperties) { + + Properties tblProps = tableDesc.getProperties(); + copyPropertiesFromTable(jobProperties, tblProps); + } + + private void copyPropertiesFromTable(Map jobProperties, Properties tblProps) { + for (String propToCopy : KUDU_TABLE_PROPERTIES) { + if (tblProps.containsKey(propToCopy)) { + String value = tblProps.getProperty(propToCopy); + conf.set(propToCopy, value); + jobProperties.put(propToCopy, value); + } + } + } + + /** + * Gives the storage handler a chance to decompose a predicate. + * The storage handler should analyze the predicate and return the portion of it which + * cannot be evaluated during table access. + * + * @param jobConf contains a job configuration matching the one that will later be passed + * to getRecordReader and getSplits + * @param deserializer deserializer which will be used when fetching rows + * @param predicate predicate to be decomposed + * @return decomposed form of predicate, or null if no pushdown is possible at all + */ + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + ExprNodeDesc predicate) { + Preconditions.checkArgument(deserializer instanceof KuduSerDe); + KuduSerDe serDe = (KuduSerDe) deserializer; + Schema schema = serDe.getSchema(); + return KuduPredicateHandler.decompose(predicate, schema); + } + + /** + * Used to fetch runtime information about storage handler during DESCRIBE EXTENDED statement. + */ + @Override + public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException { + // TODO: Implement KuduStorageHandlerInfo. + return null; + } +} diff --git kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduWritable.java kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduWritable.java new file mode 100644 index 0000000000..e67b5984c6 --- /dev/null +++ kudu-handler/src/java/org/apache/hadoop/hive/kudu/KuduWritable.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import java.io.DataInput; +import java.io.DataOutput; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.io.Writable; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RowResult; + +/** + * A Writable representation of a Kudu Row. + * The row may be either a PartialRow or a RowResult. + */ +public class KuduWritable implements Writable { + + private PartialRow partialRow; + private RowResult rowResult; + + public KuduWritable(PartialRow row) { + this.partialRow = row; + this.rowResult = null; + } + + public void setRow(RowResult rowResult) { + this.rowResult = rowResult; + this.partialRow = null; + } + + public PartialRow getPartialRow() { + Preconditions.checkNotNull(partialRow); + return partialRow; + } + + public RowResult getRowResult() { + Preconditions.checkNotNull(rowResult); + return rowResult; + } + + public Object getValueObject(int colIndex) { + if (partialRow != null) { + return partialRow.getObject(colIndex); + } else { + return rowResult.getObject(colIndex); + } + } + + public Object getValueObject(String colName) { + if (partialRow != null) { + return partialRow.getObject(colName); + } else { + return rowResult.getObject(colName); + } + } + + public boolean isSet(int colIndex) { + if (partialRow != null) { + return partialRow.isSet(colIndex); + } else { + // RowResult columns are always set. + return true; + } + } + + public boolean isSet(String colName) { + if (partialRow != null) { + return partialRow.isSet(colName); + } else { + // RowResult columns are always set. + return true; + } + } + + public void populateRow(PartialRow row) { + Schema schema = row.getSchema(); + for (int i = 0; i < schema.getColumnCount(); i++) { + ColumnSchema col = schema.getColumnByIndex(i); + if (isSet(col.getName())) { + Object value = getValueObject(col.getName()); + row.addObject(i, value); + } + } + } + + @Override + public void readFields(DataInput in) { + throw new UnsupportedOperationException(); + } + + @Override + public void write(DataOutput out) { + throw new UnsupportedOperationException(); + } +} diff --git kudu-handler/src/java/org/apache/hadoop/hive/kudu/package-info.java kudu-handler/src/java/org/apache/hadoop/hive/kudu/package-info.java new file mode 100644 index 0000000000..6491013f8a --- /dev/null +++ kudu-handler/src/java/org/apache/hadoop/hive/kudu/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package info file. + */ + +package org.apache.hadoop.hive.kudu; diff --git kudu-handler/src/test/org/apache/hadoop/hive/kudu/KuduTestUtils.java kudu-handler/src/test/org/apache/hadoop/hive/kudu/KuduTestUtils.java new file mode 100644 index 0000000000..3d9b751d6d --- /dev/null +++ kudu-handler/src/test/org/apache/hadoop/hive/kudu/KuduTestUtils.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnTypeAttributes; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; + +import java.util.Arrays; + +public class KuduTestUtils { + + private KuduTestUtils() {} + + public static Schema getAllTypesSchema() { + return new org.apache.kudu.Schema(Arrays.asList( + new ColumnSchema.ColumnSchemaBuilder("key", Type.INT8).key(true).build(), + new ColumnSchema.ColumnSchemaBuilder("int16", Type.INT16).build(), + new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build(), + new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build(), + new ColumnSchema.ColumnSchemaBuilder("bool", Type.BOOL).build(), + new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build(), + new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build(), + new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build(), + new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build(), + new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).build(), + new ColumnSchema.ColumnSchemaBuilder("decimal", Type.DECIMAL) + .typeAttributes(new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(5).scale(3).build()) + .build(), + new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(), + new ColumnSchema.ColumnSchemaBuilder("default", Type.INT32).defaultValue(1).build() + )); + } + +} diff --git kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduInputFormat.java kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduInputFormat.java new file mode 100644 index 0000000000..8f47bd9114 --- /dev/null +++ kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduInputFormat.java @@ -0,0 +1,331 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.kudu.KuduInputFormat.KuduInputSplit; +import org.apache.hadoop.hive.kudu.KuduInputFormat.KuduRecordReader; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.shaded.com.google.common.collect.ImmutableList; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hive.kudu.KuduHiveUtils.toHiveType; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY; +import static org.apache.hadoop.hive.kudu.KuduTestUtils.getAllTypesSchema; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestKuduInputFormat { + + private static final String TABLE_NAME = "default.TestKuduInputFormat"; + + private static final Schema SCHEMA = getAllTypesSchema(); + + private static final Configuration BASE_CONF = new Configuration(); + + private static final long nowMs = System.currentTimeMillis(); + + private static final PartialRow ROW; + static { + ROW = SCHEMA.newPartialRow(); + ROW.addByte("key", (byte) 1); + ROW.addShort("int16", (short) 1); + ROW.addInt("int32", 1); + ROW.addLong("int64", 1L); + ROW.addBoolean("bool", true); + ROW.addFloat("float", 1.1f); + ROW.addDouble("double", 1.1d); + ROW.addString("string", "one"); + ROW.addBinary("binary", "one".getBytes(UTF_8)); + ROW.addTimestamp("timestamp", new Timestamp(nowMs)); + ROW.addDecimal("decimal", new BigDecimal("1.111")); + ROW.setNull("null"); + // Not setting the "default" column. + } + + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + + @Before + public void setUp() throws Exception { + // Set the base configuration values. + BASE_CONF.set(KUDU_MASTER_ADDRS_KEY, harness.getMasterAddressesAsString()); + BASE_CONF.set(KUDU_TABLE_NAME_KEY, TABLE_NAME); + BASE_CONF.set(FileInputFormat.INPUT_DIR, "dummy"); + + // Create the test Kudu table. + CreateTableOptions options = new CreateTableOptions() + .setRangePartitionColumns(ImmutableList.of("key")); + harness.getClient().createTable(TABLE_NAME, SCHEMA, options); + + // Insert a test row. + KuduTable table = harness.getClient().openTable(TABLE_NAME); + KuduSession session = harness.getClient().newSession(); + Insert insert = table.newInsert(); + PartialRow insertRow = insert.getRow(); + // Use KuduWritable, to populate the insert row. + new KuduWritable(ROW).populateRow(insertRow); + session.apply(insert); + session.close(); + } + + @Test + public void testAllColumns() throws Exception { + KuduInputFormat input = new KuduInputFormat(); + + JobConf jobConf = new JobConf(BASE_CONF); + String columnsStr = SCHEMA.getColumns().stream() + .map(ColumnSchema::getName) + .collect(Collectors.joining( "," )); + jobConf.set(serdeConstants.LIST_COLUMNS, columnsStr); + + InputSplit[] splits = input.getSplits(jobConf, 1); + assertEquals(1, splits.length); + KuduInputSplit split = (KuduInputSplit) splits[0]; + + KuduRecordReader reader = + (KuduRecordReader) input.getRecordReader(split, jobConf, null); + assertTrue(reader.nextKeyValue()); + RowResult value = reader.getCurrentValue().getRowResult(); + verfiyRow(value); + assertFalse(reader.nextKeyValue()); + } + + @Test + public void testProjection() throws Exception { + KuduInputFormat input = new KuduInputFormat(); + + + JobConf jobConf = new JobConf(BASE_CONF); + jobConf.set(serdeConstants.LIST_COLUMNS, "bool,key"); + + InputSplit[] splits = input.getSplits(jobConf, 1); + assertEquals(1, splits.length); + KuduInputSplit split = (KuduInputSplit) splits[0]; + + KuduRecordReader reader = + (KuduRecordReader) input.getRecordReader(split, jobConf, null); + assertTrue(reader.nextKeyValue()); + RowResult value = reader.getCurrentValue().getRowResult(); + assertEquals(2, value.getSchema().getColumnCount()); + assertTrue(value.getBoolean(0)); + assertEquals((byte) 1, value.getByte(1)); + assertFalse(reader.nextKeyValue()); + } + + @Test + public void testMissingTable() throws Exception { + KuduInputFormat input = new KuduInputFormat(); + + JobConf jobConf = new JobConf(BASE_CONF); + jobConf.unset(KUDU_TABLE_NAME_KEY); + jobConf.set(serdeConstants.LIST_COLUMNS, "key"); + + try { + input.getSplits(jobConf, 1); + fail("Should fail on missing master addresses"); + } catch (IllegalArgumentException ex) { + assertThat(ex.getMessage(), containsString("kudu.table_name is not set")); + } + } + + @Test + public void testBadTable() throws Exception { + KuduInputFormat input = new KuduInputFormat(); + + JobConf jobConf = new JobConf(BASE_CONF); + jobConf.set(KUDU_TABLE_NAME_KEY, "default.notatable"); + jobConf.set(serdeConstants.LIST_COLUMNS, "key"); + + try { + input.getSplits(jobConf, 1); + fail("Should fail on a bad table"); + } catch (IllegalArgumentException ex) { + assertThat(ex.getMessage(), + containsString("Kudu table does not exist: default.notatable")); + } + } + + @Test + public void testMissingColumn() throws Exception { + KuduInputFormat input = new KuduInputFormat(); + + JobConf jobConf = new JobConf(BASE_CONF); + jobConf.set(serdeConstants.LIST_COLUMNS, "missing"); + + try { + input.getSplits(jobConf, 1); + fail("Should fail on missing column"); + } catch (IllegalArgumentException ex) { + assertThat(ex.getMessage(), containsString("Unknown column: missing")); + } + } + + @Test + public void testMultipleSplits() throws Exception { + String tableName = "default.twoPartitionTable"; + Schema schema = new Schema(Arrays.asList( + new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(), + new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build() + )); + CreateTableOptions options = new CreateTableOptions() + .addHashPartitions(Collections.singletonList("key"), 2); + harness.getClient().createTable(tableName, schema, options); + + // Insert multiple test rows. + KuduTable table = harness.getClient().openTable(tableName); + KuduSession session = harness.getClient().newSession(); + Insert insert1 = table.newInsert(); + PartialRow row1 = insert1.getRow(); + row1.addInt("key", 1); + row1.addString("string", "one"); + session.apply(insert1); + Insert insert2 = table.newInsert(); + PartialRow row2 = insert2.getRow(); + row2.addInt("key", 2); + row2.addString("string", "two"); + session.apply(insert2); + session.close(); + + KuduInputFormat input = new KuduInputFormat(); + + JobConf jobConf = new JobConf(BASE_CONF); + jobConf.set(KUDU_TABLE_NAME_KEY, tableName); + jobConf.set(serdeConstants.LIST_COLUMNS, "key"); + + InputSplit[] splits = input.getSplits(jobConf, 1); + assertEquals(2, splits.length); + } + + @Test + public void testPredicate() throws Exception { + // Insert a second test row that will be filtered out. + KuduTable table = harness.getClient().openTable(TABLE_NAME); + KuduSession session = harness.getClient().newSession(); + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addByte("key", (byte) 2); + row.addShort("int16", (short) 2); + row.addInt("int32", 2); + row.addLong("int64", 2L); + row.addBoolean("bool", false); + row.addFloat("float", 2.2f); + row.addDouble("double", 2.2d); + row.addString("string", "two"); + row.addBinary("binary", "two".getBytes(UTF_8)); + row.addTimestamp("timestamp", new Timestamp(nowMs + 1)); + row.addDecimal("decimal", new BigDecimal("2.222")); + row.setNull("null"); + // Not setting the "default" column. + session.apply(insert); + session.close(); + + KuduInputFormat input = new KuduInputFormat(); + + // Test an equality predicate for each column. + for (ColumnSchema col : SCHEMA.getColumns()) { + // Skip null and default columns because they don't have a value to use. + if (col.getName().equals("null") || col.getName().equals("default")) { + continue; + } + + JobConf jobConf = new JobConf(BASE_CONF); + String columnsStr = SCHEMA.getColumns().stream() + .map(ColumnSchema::getName) + .collect(Collectors.joining( "," )); + jobConf.set(serdeConstants.LIST_COLUMNS, columnsStr); + + PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes()); + ExprNodeDesc colExpr = new ExprNodeColumnDesc(typeInfo, col.getName(), null, false); + ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName())); + List children = Lists.newArrayList(); + children.add(colExpr); + children.add(constExpr); + ExprNodeGenericFuncDesc predicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqual(), children); + + String filterExpr = SerializationUtilities.serializeExpression(predicateExpr); + jobConf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); + + InputSplit[] splits = input.getSplits(jobConf, 1); + assertEquals(1, splits.length); + KuduInputSplit split = (KuduInputSplit) splits[0]; + + KuduRecordReader reader = + (KuduRecordReader) input.getRecordReader(split, jobConf, null); + assertTrue(reader.nextKeyValue()); + RowResult value = reader.getCurrentValue().getRowResult(); + verfiyRow(value); + assertFalse(reader.nextKeyValue()); + } + } + private void verfiyRow(RowResult value) { + assertEquals(SCHEMA.getColumnCount(), value.getSchema().getColumnCount()); + assertEquals(ROW.getByte(0), value.getByte(0)); + assertEquals(ROW.getShort(1), value.getShort(1)); + assertEquals(ROW.getInt(2), value.getInt(2)); + assertEquals(ROW.getLong(3), value.getLong(3)); + assertEquals(ROW.getBoolean(4), value.getBoolean(4)); + assertEquals(ROW.getFloat(5), value.getFloat(5), 0); + assertEquals(ROW.getDouble(6), value.getDouble(6), 0); + assertEquals(ROW.getString(7), value.getString(7)); + assertArrayEquals(ROW.getBinaryCopy(8), value.getBinaryCopy(8)); + assertEquals(ROW.getTimestamp(9), value.getTimestamp(9)); + assertEquals(ROW.getDecimal(10), value.getDecimal(10)); + assertTrue(value.isNull(11)); + assertEquals(1, value.getInt(12)); // default. + } +} diff --git kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduOutputFormat.java kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduOutputFormat.java new file mode 100644 index 0000000000..afd6570c8b --- /dev/null +++ kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduOutputFormat.java @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.kudu.KuduOutputFormat.KuduRecordWriter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.kudu.Schema; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.shaded.com.google.common.collect.ImmutableList; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY; +import static org.apache.hadoop.hive.kudu.KuduTestUtils.getAllTypesSchema; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestKuduOutputFormat { + + private static final String TABLE_NAME = "default.TestKuduOutputFormat"; + + private static final Schema SCHEMA = getAllTypesSchema(); + + private static final Configuration BASE_CONF = new Configuration(); + + private static final Properties TBL_PROPS = new Properties(); + + private static final long nowMs = System.currentTimeMillis(); + + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + + @Before + public void setUp() throws Exception { + // Set the base configuration values. + BASE_CONF.set(KUDU_MASTER_ADDRS_KEY, harness.getMasterAddressesAsString()); + TBL_PROPS.setProperty(KUDU_TABLE_NAME_KEY, TABLE_NAME); + + // Create the test Kudu table. + CreateTableOptions options = new CreateTableOptions() + .setRangePartitionColumns(ImmutableList.of("key")); + harness.getClient().createTable(TABLE_NAME, SCHEMA, options); + } + + @Test + public void testGoodRow() throws Exception { + KuduOutputFormat outputFormat = new KuduOutputFormat(); + + KuduRecordWriter writer = + (KuduRecordWriter) outputFormat.getHiveRecordWriter(new JobConf(BASE_CONF), + null, null, false, TBL_PROPS, null); + + // Write a good row. + try { + PartialRow row = SCHEMA.newPartialRow(); + row.addByte("key", (byte) 1); + row.addShort("int16", (short) 1); + row.addInt("int32", 1); + row.addLong("int64", 1L); + row.addBoolean("bool", true); + row.addFloat("float", 1.1f); + row.addDouble("double", 1.1d); + row.addString("string", "one"); + row.addBinary("binary", "one".getBytes(UTF_8)); + row.addTimestamp("timestamp", new Timestamp(nowMs)); + row.addDecimal("decimal", new BigDecimal("1.111")); + row.setNull("null"); + // Not setting the "default" column. + KuduWritable writable = new KuduWritable(row); + writer.write(writable); + } finally { + writer.close(false); + } + + // Verify the written row. + KuduClient client = harness.getClient(); + KuduTable table = client.openTable(TABLE_NAME); + KuduScanner scanner = client.newScannerBuilder(table).build(); + + List results = new ArrayList<>(); + for (RowResult result : scanner) { + results.add(result); + } + assertEquals(1, results.size()); + RowResult result = results.get(0); + + assertEquals((byte) 1, result.getByte(0)); + assertEquals((short) 1, result.getShort(1)); + assertEquals(1, result.getInt(2)); + assertEquals(1L, result.getLong(3)); + assertTrue(result.getBoolean(4)); + assertEquals(1.1f, result.getFloat(5), 0); + assertEquals(1.1d, result.getDouble(6), 0); + assertEquals("one", result.getString(7)); + assertEquals("one", new String(result.getBinaryCopy(8), UTF_8)); + assertEquals(nowMs, result.getTimestamp(9).getTime()); + assertEquals(new BigDecimal("1.111"), result.getDecimal(10)); + assertTrue(result.isNull(11)); + assertEquals(1, result.getInt(12)); // default. + } + + @Test + public void testBadRow() throws Exception { + KuduOutputFormat outputFormat = new KuduOutputFormat(); + + KuduRecordWriter writer = + (KuduRecordWriter) outputFormat.getHiveRecordWriter(new JobConf(BASE_CONF), + null, null, false, TBL_PROPS, null); + + // Write an empty row. + try { + PartialRow row = SCHEMA.newPartialRow(); + KuduWritable writable = new KuduWritable(row); + writer.write(writable); + } catch (KuduException ex) { + assertThat(ex.getMessage(), containsString("Primary key column key is not set")); + } finally { + writer.close(false); + } + } + + @Test + public void testMissingTable() throws Exception { + KuduOutputFormat outputFormat = new KuduOutputFormat(); + + Properties tblProps = new Properties(); + + try { + outputFormat.getHiveRecordWriter(new JobConf(BASE_CONF), + null, null, false, tblProps, null); + fail("Should fail on missing table"); + } catch (IllegalArgumentException ex) { + assertThat(ex.getMessage(), containsString("kudu.table_name is not set")); + } + } + + @Test + public void testBadTable() throws Exception { + KuduOutputFormat outputFormat = new KuduOutputFormat(); + + Properties tblProps = new Properties(); + tblProps.setProperty(KUDU_TABLE_NAME_KEY, "default.notatable"); + + try { + outputFormat.getHiveRecordWriter(new JobConf(BASE_CONF), + null, null, false, tblProps, null); + fail("Should fail on a bad table"); + } catch (IllegalArgumentException ex) { + assertThat(ex.getMessage(), + containsString("Kudu table does not exist: default.notatable")); + } + } +} diff --git kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduPredicateHandler.java kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduPredicateHandler.java new file mode 100644 index 0000000000..5566518aa2 --- /dev/null +++ kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduPredicateHandler.java @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.shaded.com.google.common.collect.ImmutableList; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hive.kudu.KuduHiveUtils.toHiveType; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY; +import static org.apache.hadoop.hive.kudu.KuduTestUtils.getAllTypesSchema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class TestKuduPredicateHandler { + + private static final String TABLE_NAME = "default.TestKuduPredicateHandler"; + + private static final Schema SCHEMA = getAllTypesSchema(); + + private static final Configuration BASE_CONF = new Configuration(); + + private static final long nowMs = System.currentTimeMillis(); + + private static final PartialRow ROW; + static { + ROW = SCHEMA.newPartialRow(); + ROW.addByte("key", (byte) 1); + ROW.addShort("int16", (short) 1); + ROW.addInt("int32", 1); + ROW.addLong("int64", 1L); + ROW.addBoolean("bool", true); + ROW.addFloat("float", 1.1f); + ROW.addDouble("double", 1.1d); + ROW.addString("string", "one"); + ROW.addBinary("binary", "one".getBytes(UTF_8)); + ROW.addTimestamp("timestamp", new Timestamp(nowMs)); + ROW.addDecimal("decimal", new BigDecimal("1.111")); + ROW.setNull("null"); + // Not setting the "default" column. + } + + private static final List comparisonUDFs = Arrays.asList( + new GenericUDFOPEqual(), + new GenericUDFOPGreaterThan(), + new GenericUDFOPEqualOrGreaterThan(), + new GenericUDFOPLessThan(), + new GenericUDFOPEqualOrLessThan() + ); + + private static final List nullableUDFs = Arrays.asList( + new GenericUDFOPNull(), + new GenericUDFOPNotNull() + ); + + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + + @Before + public void setUp() throws Exception { + // Set the base configuration values. + BASE_CONF.set(KUDU_MASTER_ADDRS_KEY, harness.getMasterAddressesAsString()); + BASE_CONF.set(KUDU_TABLE_NAME_KEY, TABLE_NAME); + BASE_CONF.set(FileInputFormat.INPUT_DIR, "dummy"); + + // Create the test Kudu table. + CreateTableOptions options = new CreateTableOptions() + .setRangePartitionColumns(ImmutableList.of("key")); + harness.getClient().createTable(TABLE_NAME, SCHEMA, options); + } + + @Test + public void testComparisonPredicates() throws Exception { + for (ColumnSchema col : SCHEMA.getColumns()) { + // Skip null and default columns because they don't have a value to use. + if (col.getName().equals("null") || col.getName().equals("default")) { + continue; + } + PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes()); + ExprNodeDesc colExpr = new ExprNodeColumnDesc(typeInfo, col.getName(), null, false); + ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName())); + List children = Lists.newArrayList(); + children.add(colExpr); + children.add(constExpr); + for (GenericUDF udf : comparisonUDFs) { + ExprNodeGenericFuncDesc predicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, udf, children); + + // Verify KuduPredicateHandler.decompose + HiveStoragePredicateHandler.DecomposedPredicate decompose = + KuduPredicateHandler.decompose(predicateExpr, SCHEMA); + assertNotNull(String.format("Unsupported comparison UDF and type (%s, %s)", udf, typeInfo), + decompose); + assertNotNull(String.format("Unsupported comparison UDF and type (%s, %s)", udf, typeInfo), + decompose.pushedPredicate); + assertNull(String.format("Unsupported comparison UDF and type (%s, %s)", udf, typeInfo), + decompose.residualPredicate); + + // Verify KuduPredicateHandler.getPredicates + String filterExpr = SerializationUtilities.serializeExpression(predicateExpr); + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); + List predicates = KuduPredicateHandler.getPredicates(conf, SCHEMA); + assertEquals(1, predicates.size()); + KuduPredicate predicate = predicates.get(0); + + // Scan the table with the predicate to be sure there are no exceptions. + KuduClient client = harness.getClient(); + KuduTable table = client.openTable(TABLE_NAME); + KuduScanner scanner = client.newScannerBuilder(table) + .addPredicate(predicate) + .build(); + while (scanner.hasMoreRows()) { + scanner.nextRows(); + } + } + } + } + + @Test + public void testAndPredicates() throws Exception { + for (ColumnSchema col : SCHEMA.getColumns()) { + // Skip null and default columns because they don't have a value to use. + if (col.getName().equals("null") || col.getName().equals("default")) { + continue; + } + PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes()); + ExprNodeDesc colExpr = new ExprNodeColumnDesc(typeInfo, col.getName(), null, false); + ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName())); + List children = Lists.newArrayList(); + children.add(colExpr); + children.add(constExpr); + + ExprNodeGenericFuncDesc gePredicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrGreaterThan(), children); + ExprNodeGenericFuncDesc lePredicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrGreaterThan(), children); + + List andChildren = Lists.newArrayList(); + andChildren.add(gePredicateExpr); + andChildren.add(lePredicateExpr); + ExprNodeGenericFuncDesc andPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo, + new GenericUDFOPAnd(), andChildren); + + // Verify KuduPredicateHandler.decompose + HiveStoragePredicateHandler.DecomposedPredicate decompose = + KuduPredicateHandler.decompose(andPredicateExpr, SCHEMA); + assertNotNull(decompose); + assertNotNull(decompose.pushedPredicate); + assertNull(decompose.residualPredicate); + + // Verify KuduPredicateHandler.getPredicates + String filterExpr = SerializationUtilities.serializeExpression(andPredicateExpr); + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); + List predicates = KuduPredicateHandler.getPredicates(conf, SCHEMA); + assertEquals(2, predicates.size()); + + // Scan the table with the predicate to be sure there are no exceptions. + KuduClient client = harness.getClient(); + KuduTable table = client.openTable(TABLE_NAME); + KuduScanner scanner = client.newScannerBuilder(table) + .addPredicate(predicates.get(0)) + .addPredicate(predicates.get(1)) + .build(); + while (scanner.hasMoreRows()) { + scanner.nextRows(); + } + } + } + + @Test + public void testMixedPredicates() throws Exception { + for (ColumnSchema col : SCHEMA.getColumns()) { + // Skip null and default columns because they don't have a value to use. + if (col.getName().equals("null") || col.getName().equals("default")) { + continue; + } + PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes()); + ExprNodeDesc colExpr = new ExprNodeColumnDesc(typeInfo, col.getName(), null, false); + ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName())); + List children = Lists.newArrayList(); + children.add(colExpr); + children.add(constExpr); + + ExprNodeGenericFuncDesc supportedPredicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrGreaterThan(), children); + ExprNodeGenericFuncDesc unsupportedPredicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPUnsupported(), children); + + List andChildren = Lists.newArrayList(); + andChildren.add(supportedPredicateExpr); + andChildren.add(unsupportedPredicateExpr); + ExprNodeGenericFuncDesc andPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo, + new GenericUDFOPAnd(), andChildren); + + // Verify KuduPredicateHandler.decompose + HiveStoragePredicateHandler.DecomposedPredicate decompose = + KuduPredicateHandler.decompose(andPredicateExpr, SCHEMA); + assertNotNull(decompose); + assertNotNull(decompose.pushedPredicate); + assertNotNull(decompose.residualPredicate); + + // Verify KuduPredicateHandler.getPredicates + String filterExpr = SerializationUtilities.serializeExpression(decompose.pushedPredicate); + Configuration conf = new Configuration(); + conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); + List predicates = KuduPredicateHandler.getPredicates(conf, SCHEMA); + assertEquals(1, predicates.size()); + KuduPredicate predicate = predicates.get(0); + + // Scan the table with the predicate to be sure there are no exceptions. + KuduClient client = harness.getClient(); + KuduTable table = client.openTable(TABLE_NAME); + KuduScanner scanner = client.newScannerBuilder(table) + .addPredicate(predicate) + .build(); + while (scanner.hasMoreRows()) { + scanner.nextRows(); + } + } + } + + @Test + public void testInPredicates() throws Exception { + PrimitiveTypeInfo typeInfo = toHiveType(Type.STRING, null); + ExprNodeDesc colExpr = new ExprNodeColumnDesc(typeInfo, "string", null, false); + ExprNodeConstantDesc constDesc = new ExprNodeConstantDesc("Alpha"); + ExprNodeConstantDesc constDesc2 = new ExprNodeConstantDesc("Bravo"); + List children = Lists.newArrayList(); + children.add(colExpr); + children.add(constDesc); + children.add(constDesc2); + + ExprNodeGenericFuncDesc predicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFIn(), children); + + // Verify KuduPredicateHandler.decompose + HiveStoragePredicateHandler.DecomposedPredicate decompose = + KuduPredicateHandler.decompose(predicateExpr, SCHEMA); + // TODO: IN predicates are not supported. + assertNull(decompose); + } + + @Test + public void testNullablePredicates() throws Exception { + PrimitiveTypeInfo typeInfo = toHiveType(Type.STRING, null); + ExprNodeDesc colExpr = new ExprNodeColumnDesc(typeInfo, "null", null, false); + List children = Lists.newArrayList(); + children.add(colExpr); + + for (GenericUDF udf : nullableUDFs) { + ExprNodeGenericFuncDesc predicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, udf, children); + + // Verify KuduPredicateHandler.decompose + HiveStoragePredicateHandler.DecomposedPredicate decompose = + KuduPredicateHandler.decompose(predicateExpr, SCHEMA); + // TODO: Null and NotNull predicates are not supported. + assertNull(decompose); + } + } + + @Test + public void testOrPredicates() throws Exception { + for (ColumnSchema col : SCHEMA.getColumns()) { + // Skip null and default columns because they don't have a value to use. + if (col.getName().equals("null") || col.getName().equals("default")) { + continue; + } + PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes()); + ExprNodeDesc colExpr = new ExprNodeColumnDesc(typeInfo, col.getName(), null, false); + ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName())); + List children = Lists.newArrayList(); + children.add(colExpr); + children.add(constExpr); + + ExprNodeGenericFuncDesc gePredicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrGreaterThan(), children); + ExprNodeGenericFuncDesc lePredicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, new GenericUDFOPEqualOrGreaterThan(), children); + + List orChildren = Lists.newArrayList(); + orChildren.add(gePredicateExpr); + orChildren.add(lePredicateExpr); + ExprNodeGenericFuncDesc orPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo, + new GenericUDFOPOr(), orChildren); + + // Verify KuduPredicateHandler.decompose + HiveStoragePredicateHandler.DecomposedPredicate decompose = + KuduPredicateHandler.decompose(orPredicateExpr, SCHEMA); + // TODO: OR predicates are not supported. + assertNull(decompose); + } + } + + @Test + public void testNotPredicates() throws Exception { + for (ColumnSchema col : SCHEMA.getColumns()) { + // Skip null and default columns because they don't have a value to use. + if (col.getName().equals("null") || col.getName().equals("default")) { + continue; + } + PrimitiveTypeInfo typeInfo = toHiveType(col.getType(), col.getTypeAttributes()); + ExprNodeDesc colExpr = new ExprNodeColumnDesc(typeInfo, col.getName(), null, false); + ExprNodeDesc constExpr = new ExprNodeConstantDesc(typeInfo, ROW.getObject(col.getName())); + List children = Lists.newArrayList(); + children.add(colExpr); + children.add(constExpr); + + for (GenericUDF udf : comparisonUDFs) { + ExprNodeGenericFuncDesc predicateExpr = + new ExprNodeGenericFuncDesc(typeInfo, udf, children); + + List notChildren = Lists.newArrayList(); + notChildren.add(predicateExpr); + ExprNodeGenericFuncDesc andPredicateExpr = new ExprNodeGenericFuncDesc(typeInfo, + new GenericUDFOPNot(), notChildren); + + // Verify KuduPredicateHandler.decompose + HiveStoragePredicateHandler.DecomposedPredicate decompose = + KuduPredicateHandler.decompose(andPredicateExpr, SCHEMA); + // TODO: NOT predicates are not supported. + assertNull(decompose); + } + } + } + + // Wrapper implementation to simplify testing unsupported UDFs. + private class GenericUDFOPUnsupported extends GenericUDFBaseCompare { + public GenericUDFOPUnsupported() { + this.opName = "UNSUPPORTED"; + this.opDisplayName = "UNSUPPORTED"; + } + + @Override + public Object evaluate(DeferredObject[] arguments) { + return null; + } + + @Override + public GenericUDF flip() { + return new GenericUDFOPUnsupported(); + } + + @Override + public GenericUDF negative() { + return new GenericUDFOPUnsupported(); + } + } +} diff --git kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduSerDe.java kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduSerDe.java new file mode 100644 index 0000000000..d7250ae0b3 --- /dev/null +++ kudu-handler/src/test/org/apache/hadoop/hive/kudu/TestKuduSerDe.java @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.hadoop.hive.kudu; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.shaded.com.google.common.collect.ImmutableList; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY; +import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_TABLE_NAME_KEY; +import static org.apache.hadoop.hive.kudu.KuduTestUtils.getAllTypesSchema; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public class TestKuduSerDe { + + private static final String TABLE_NAME = "default.TestKuduSerDe"; + + private static final Schema SCHEMA = getAllTypesSchema(); + + private static final Configuration BASE_CONF = new Configuration(); + + private static final Properties TBL_PROPS = new Properties(); + + private static final long nowMs = System.currentTimeMillis(); + + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + + @Before + public void setUp() throws Exception { + // Set the base configuration values. + BASE_CONF.set(KUDU_MASTER_ADDRS_KEY, harness.getMasterAddressesAsString()); + TBL_PROPS.setProperty(KUDU_TABLE_NAME_KEY, TABLE_NAME); + + // Create the test Kudu table. + CreateTableOptions options = new CreateTableOptions() + .setRangePartitionColumns(ImmutableList.of("key")); + harness.getClient().createTable(TABLE_NAME, SCHEMA, options); + } + + @Test + public void testSerDeRoundTrip() throws Exception { + KuduSerDe serDe = new KuduSerDe(); + serDe.initialize(BASE_CONF, TBL_PROPS); + + PartialRow before = SCHEMA.newPartialRow(); + before.addByte("key", (byte) 1); + before.addShort("int16", (short) 1); + before.addInt("int32", 1); + before.addLong("int64", 1L); + before.addBoolean("bool", true); + before.addFloat("float", 1.1f); + before.addDouble("double", 1.1d); + before.addString("string", "one"); + before.addBinary("binary", "one".getBytes(UTF_8)); + before.addTimestamp("timestamp", new Timestamp(nowMs)); + before.addDecimal("decimal", new BigDecimal("1.111")); + before.setNull("null"); + // Not setting the "default" column. + KuduWritable beforeWritable = new KuduWritable(before); + Object object = serDe.deserialize(beforeWritable); + + + // Capitalized `key` field to check for field case insensitivity. + List fieldNames = Arrays.asList("KEY", "int16", "int32", "int64", "bool", "float", + "double", "string", "binary", "timestamp", "decimal", "null"); + List ois = Arrays.asList( + PrimitiveObjectInspectorFactory.writableByteObjectInspector, + PrimitiveObjectInspectorFactory.writableShortObjectInspector, + PrimitiveObjectInspectorFactory.writableIntObjectInspector, + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + PrimitiveObjectInspectorFactory.writableBooleanObjectInspector, + PrimitiveObjectInspectorFactory.writableFloatObjectInspector, + PrimitiveObjectInspectorFactory.writableDoubleObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector, + PrimitiveObjectInspectorFactory.writableBinaryObjectInspector, + PrimitiveObjectInspectorFactory.writableTimestampObjectInspector, + PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector, + PrimitiveObjectInspectorFactory.writableStringObjectInspector + // the "default" column is not set. + ); + StandardStructObjectInspector objectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, ois); + KuduWritable afterWritable = serDe.serialize(object, objectInspector); + PartialRow after = afterWritable.getPartialRow(); + + for (int i = 0; i < SCHEMA.getColumnCount(); i++) { + + if (SCHEMA.getColumnByIndex(i).getType() == Type.BINARY) { + assertArrayEquals("Columns not equal at index: " + i, + before.getBinaryCopy(i), after.getBinaryCopy(i)); + } else { + assertEquals("Columns not equal at index: " + i, + before.getObject(i), after.getObject(i)); + } + } + } + + @Test + public void testMissingTable() throws Exception { + KuduSerDe serDe = new KuduSerDe(); + + Properties tblProps = new Properties(); + + try { + serDe.initialize(BASE_CONF, tblProps); + fail("Should fail on missing table"); + } catch (SerDeException ex) { + assertThat(ex.getMessage(), containsString("kudu.table_name is not set")); + } + } + + @Test + public void testBadTable() throws Exception { + KuduSerDe serDe = new KuduSerDe(); + + Properties tblProps = new Properties(); + tblProps.setProperty(KUDU_TABLE_NAME_KEY, "default.notatable"); + + try { + serDe.initialize(BASE_CONF, tblProps); + fail("Should fail on a bad table"); + } catch (SerDeException ex) { + assertThat(ex.getMessage(), + containsString("Kudu table does not exist: default.notatable")); + } + } +} diff --git kudu-handler/src/test/queries/positive/kudu_queries.q kudu-handler/src/test/queries/positive/kudu_queries.q new file mode 100644 index 0000000000..5e55b6d958 --- /dev/null +++ kudu-handler/src/test/queries/positive/kudu_queries.q @@ -0,0 +1,10 @@ +--! qt:dataset:src + +DROP TABLE IF EXISTS kudu_kv_table; +CREATE EXTERNAL TABLE kudu_kv_table(key int, value string) +STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler' +TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv"); + +DESCRIBE EXTENDED kudu_kv_table; + +SELECT * FROM kudu_kv_table; diff --git kudu-handler/src/test/results/positive/kudu_queries.q.out kudu-handler/src/test/results/positive/kudu_queries.q.out new file mode 100644 index 0000000000..c704745d34 --- /dev/null +++ kudu-handler/src/test/results/positive/kudu_queries.q.out @@ -0,0 +1,34 @@ +PREHOOK: query: DROP TABLE IF EXISTS kudu_kv_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS kudu_kv_table +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE kudu_kv_table(key int, value string) +STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler' +TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kudu_kv_table +POSTHOOK: query: CREATE EXTERNAL TABLE kudu_kv_table(key int, value string) +STORED BY 'org.apache.hadoop.hive.kudu.KuduStorageHandler' +TBLPROPERTIES ("kudu.table_name" = "default.kudu_kv") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kudu_kv_table +PREHOOK: query: DESCRIBE EXTENDED kudu_kv_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@kudu_kv_table +POSTHOOK: query: DESCRIBE EXTENDED kudu_kv_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@kudu_kv_table +key int +value string + +#### A masked pattern was here #### +PREHOOK: query: SELECT * FROM kudu_kv_table +PREHOOK: type: QUERY +PREHOOK: Input: default@kudu_kv_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM kudu_kv_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kudu_kv_table +#### A masked pattern was here #### diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java index 7b2e32bea2..c338842e9f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java @@ -46,7 +46,8 @@ private static final String[] DEFAULT_AUX_CLASSES = new String[] {"org.apache.hive.hcatalog.data.JsonSerDe", "org.apache.hadoop.hive.druid.DruidStorageHandler", "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory", - "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler"}; + "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler" + + "org.apache.hadoop.hive.kudu.KuduStorageHandler"}; private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe"; private final LlapServiceCommandLine cl; diff --git pom.xml pom.xml index 7649af10a7..28ec3de458 100644 --- pom.xml +++ pom.xml @@ -59,6 +59,7 @@ shims spark-client kryo-registrator + kudu-handler testutils packaging standalone-metastore @@ -183,6 +184,7 @@ 1.8 4.11 4.0.2 + 1.10.0 0.9.3 0.9.3 2.10.0