diff --git a/common/pom.xml b/common/pom.xml index 868e14dbc3..aaeecc0955 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -220,6 +220,12 @@ io.dropwizard.metrics metrics-json ${dropwizard.version} + + + com.fasterxml.jackson.core + jackson-databind + + com.fasterxml.jackson.core diff --git a/hbase-handler/pom.xml b/hbase-handler/pom.xml index 7f57b77168..59f9dd8dfb 100644 --- a/hbase-handler/pom.xml +++ b/hbase-handler/pom.xml @@ -81,8 +81,23 @@ org.apache.hbase hbase-server ${hbase.version} - - + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.hbase + hbase-mapreduce + ${hbase.version} + + org.slf4j slf4j-log4j12 @@ -92,6 +107,7 @@ + org.apache.hbase hbase-common @@ -140,7 +156,24 @@ commons-logging - + + + org.apache.hbase + hbase-mapreduce + ${hbase.version} + test-jar + test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hbase hbase-hadoop-compat @@ -149,6 +182,12 @@ test + org.eclipse.jetty + jetty-runner + ${jetty.version} + test + + com.sun.jersey jersey-servlet ${jersey.version} diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseMetaHook.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseMetaHook.java new file mode 100644 index 0000000000..9fe07afa28 --- /dev/null +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseMetaHook.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.StringUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * MetaHook for HBase. Updates the table data in HBase too. Not thread safe, and cleanup should + * be used after usage. + */ +public class HBaseMetaHook implements HiveMetaHook, Closeable { + private Configuration hbaseConf; + private Admin admin; + + public HBaseMetaHook(Configuration hbaseConf) { + this.hbaseConf = hbaseConf; + } + + private Admin getHBaseAdmin() throws MetaException { + try { + if (admin == null) { + Connection conn = ConnectionFactory.createConnection(hbaseConf); + admin = conn.getAdmin(); + } + return admin; + } catch (IOException ioe) { + throw new MetaException(StringUtils.stringifyException(ioe)); + } + } + + private String getHBaseTableName(Table tbl) { + // Give preference to TBLPROPERTIES over SERDEPROPERTIES + // (really we should only use TBLPROPERTIES, so this is just + // for backwards compatibility with the original specs). + String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME); + if (tableName == null) { + //convert to lower case in case we are getting from serde + tableName = tbl.getSd().getSerdeInfo().getParameters().get(HBaseSerDe.HBASE_TABLE_NAME); + //standardize to lower case + if (tableName != null) { + tableName = tableName.toLowerCase(); + } + } + if (tableName == null) { + tableName = (tbl.getDbName() + "." + tbl.getTableName()).toLowerCase(); + if (tableName.startsWith(HBaseStorageHandler.DEFAULT_PREFIX)) { + tableName = tableName.substring(HBaseStorageHandler.DEFAULT_PREFIX.length()); + } + } + return tableName; + } + + @Override + public void preDropTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void commitDropTable(Table tbl, boolean deleteData) throws MetaException { + try { + String tableName = getHBaseTableName(tbl); + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + if (deleteData && !isExternal) { + if (getHBaseAdmin().isTableEnabled(TableName.valueOf(tableName))) { + getHBaseAdmin().disableTable(TableName.valueOf(tableName)); + } + getHBaseAdmin().deleteTable(TableName.valueOf(tableName)); + } + } catch (IOException ie) { + throw new MetaException(StringUtils.stringifyException(ie)); + } + } + + @Override + public void preCreateTable(Table tbl) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + + // We'd like to move this to HiveMetaStore for any non-native table, but + // first we need to support storing NULL for location on a table + if (tbl.getSd().getLocation() != null) { + throw new MetaException("LOCATION may not be specified for HBase."); + } + + org.apache.hadoop.hbase.client.Table htable = null; + + try { + String tableName = getHBaseTableName(tbl); + Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); + String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + + ColumnMappings columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); + + HTableDescriptor tableDesc; + + if (!getHBaseAdmin().tableExists(TableName.valueOf(tableName))) { + // if it is not an external table then create one + if (!isExternal) { + // Create the column descriptors + tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); + Set uniqueColumnFamilies = new HashSet(); + + for (ColumnMappings.ColumnMapping colMap : columnMappings) { + if (!colMap.hbaseRowKey && !colMap.hbaseTimestamp) { + uniqueColumnFamilies.add(colMap.familyName); + } + } + + for (String columnFamily : uniqueColumnFamilies) { + tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes(columnFamily))); + } + + getHBaseAdmin().createTable(tableDesc); + } else { + // an external table + throw new MetaException("HBase table " + tableName + + " doesn't exist while the table is declared as an external table."); + } + + } else { + if (!isExternal) { + throw new MetaException("Table " + tableName + " already exists within HBase; " + + "use CREATE EXTERNAL TABLE instead to register it in Hive."); + } + // make sure the schema mapping is right + tableDesc = getHBaseAdmin().getTableDescriptor(TableName.valueOf(tableName)); + + for (ColumnMappings.ColumnMapping colMap : columnMappings) { + + if (colMap.hbaseRowKey || colMap.hbaseTimestamp) { + continue; + } + + if (!tableDesc.hasFamily(colMap.familyNameBytes)) { + throw new MetaException("Column Family " + colMap.familyName + + " is not defined in hbase table " + tableName); + } + } + } + + // ensure the table is online + htable = getHBaseAdmin().getConnection().getTable(tableDesc.getTableName()); + } catch (Exception se) { + throw new MetaException(StringUtils.stringifyException(se)); + } finally { + if (htable != null) { + IOUtils.closeQuietly(htable); + } + } + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(table); + String tableName = getHBaseTableName(table); + try { + if (!isExternal && getHBaseAdmin().tableExists(TableName.valueOf(tableName))) { + // we have created an HBase table, so we delete it to roll back; + if (getHBaseAdmin().isTableEnabled(TableName.valueOf(tableName))) { + getHBaseAdmin().disableTable(TableName.valueOf(tableName)); + } + getHBaseAdmin().deleteTable(TableName.valueOf(tableName)); + } + } catch (IOException ie) { + throw new MetaException(StringUtils.stringifyException(ie)); + } + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void close() throws IOException { + if (admin != null) { + Connection connection = admin.getConnection(); + admin.close(); + admin = null; + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java index c6f3b0ff3a..ce7071e534 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java @@ -174,7 +174,7 @@ private void serializeField( continue; } - put.add(colMap.familyNameBytes, columnQualifierBytes, bytes); + put.addColumn(colMap.familyNameBytes, columnQualifierBytes, bytes); } } else { byte[] bytes; @@ -198,7 +198,7 @@ private void serializeField( return; } - put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, bytes); + put.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes, bytes); } } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 9cad97ad4b..0a3788fc69 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -28,28 +27,20 @@ import java.util.Properties; import java.util.Set; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.mapred.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.token.TokenUtil; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.metastore.HiveMetaHook; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; @@ -81,14 +72,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.yammer.metrics.core.MetricsRegistry; +import com.codahale.metrics.MetricRegistry; /** * HBaseStorageHandler provides a HiveStorageHandler implementation for * HBase. */ public class HBaseStorageHandler extends DefaultStorageHandler - implements HiveMetaHook, HiveStoragePredicateHandler { + implements HiveStoragePredicateHandler { private static final Logger LOG = LoggerFactory.getLogger(HBaseStorageHandler.class); @@ -117,169 +108,6 @@ private Configuration jobConf; private Configuration hbaseConf; - private HBaseAdmin admin; - - private HBaseAdmin getHBaseAdmin() throws MetaException { - try { - if (admin == null) { - admin = new HBaseAdmin(hbaseConf); - } - return admin; - } catch (IOException ioe) { - throw new MetaException(StringUtils.stringifyException(ioe)); - } - } - - private String getHBaseTableName(Table tbl) { - // Give preference to TBLPROPERTIES over SERDEPROPERTIES - // (really we should only use TBLPROPERTIES, so this is just - // for backwards compatibility with the original specs). - String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME); - if (tableName == null) { - //convert to lower case in case we are getting from serde - tableName = tbl.getSd().getSerdeInfo().getParameters().get( - HBaseSerDe.HBASE_TABLE_NAME); - //standardize to lower case - if (tableName != null) { - tableName = tableName.toLowerCase(); - } - } - if (tableName == null) { - tableName = (tbl.getDbName() + "." + tbl.getTableName()).toLowerCase(); - if (tableName.startsWith(DEFAULT_PREFIX)) { - tableName = tableName.substring(DEFAULT_PREFIX.length()); - } - } - return tableName; - } - - @Override - public void preDropTable(Table table) throws MetaException { - // nothing to do - } - - @Override - public void rollbackDropTable(Table table) throws MetaException { - // nothing to do - } - - @Override - public void commitDropTable( - Table tbl, boolean deleteData) throws MetaException { - - try { - String tableName = getHBaseTableName(tbl); - boolean isExternal = MetaStoreUtils.isExternalTable(tbl); - if (deleteData && !isExternal) { - if (getHBaseAdmin().isTableEnabled(tableName)) { - getHBaseAdmin().disableTable(tableName); - } - getHBaseAdmin().deleteTable(tableName); - } - } catch (IOException ie) { - throw new MetaException(StringUtils.stringifyException(ie)); - } - } - - @Override - public void preCreateTable(Table tbl) throws MetaException { - boolean isExternal = MetaStoreUtils.isExternalTable(tbl); - - // We'd like to move this to HiveMetaStore for any non-native table, but - // first we need to support storing NULL for location on a table - if (tbl.getSd().getLocation() != null) { - throw new MetaException("LOCATION may not be specified for HBase."); - } - - HTable htable = null; - - try { - String tableName = getHBaseTableName(tbl); - Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); - String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); - - ColumnMappings columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); - - HTableDescriptor tableDesc; - - if (!getHBaseAdmin().tableExists(tableName)) { - // if it is not an external table then create one - if (!isExternal) { - // Create the column descriptors - tableDesc = new HTableDescriptor(tableName); - Set uniqueColumnFamilies = new HashSet(); - - for (ColumnMapping colMap : columnMappings) { - if (!colMap.hbaseRowKey && !colMap.hbaseTimestamp) { - uniqueColumnFamilies.add(colMap.familyName); - } - } - - for (String columnFamily : uniqueColumnFamilies) { - tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes(columnFamily))); - } - - getHBaseAdmin().createTable(tableDesc); - } else { - // an external table - throw new MetaException("HBase table " + tableName + - " doesn't exist while the table is declared as an external table."); - } - - } else { - if (!isExternal) { - throw new MetaException("Table " + tableName + " already exists" - + " within HBase; use CREATE EXTERNAL TABLE instead to" - + " register it in Hive."); - } - // make sure the schema mapping is right - tableDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tableName)); - - for (ColumnMapping colMap : columnMappings) { - - if (colMap.hbaseRowKey || colMap.hbaseTimestamp) { - continue; - } - - if (!tableDesc.hasFamily(colMap.familyNameBytes)) { - throw new MetaException("Column Family " + colMap.familyName - + " is not defined in hbase table " + tableName); - } - } - } - - // ensure the table is online - htable = new HTable(hbaseConf, tableDesc.getName()); - } catch (Exception se) { - throw new MetaException(StringUtils.stringifyException(se)); - } finally { - if (htable != null) { - IOUtils.closeQuietly(htable); - } - } - } - - @Override - public void rollbackCreateTable(Table table) throws MetaException { - boolean isExternal = MetaStoreUtils.isExternalTable(table); - String tableName = getHBaseTableName(table); - try { - if (!isExternal && getHBaseAdmin().tableExists(tableName)) { - // we have created an HBase table, so we delete it to roll back; - if (getHBaseAdmin().isTableEnabled(tableName)) { - getHBaseAdmin().disableTable(tableName); - } - getHBaseAdmin().deleteTable(tableName); - } - } catch (IOException ie) { - throw new MetaException(StringUtils.stringifyException(ie)); - } - } - - @Override - public void commitCreateTable(Table table) throws MetaException { - // nothing to do - } @Override public Configuration getConf() { @@ -321,7 +149,7 @@ public void setConf(Configuration conf) { @Override public HiveMetaHook getMetaHook() { - return this; + return new HBaseMetaHook(hbaseConf); } @Override @@ -371,12 +199,10 @@ public void configureTableJobProperties( jobProperties.put(HBaseSerDe.HBASE_SCAN_BATCH, scanBatch); } - String tableName = - tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_NAME); + String tableName = tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_NAME); if (tableName == null) { - tableName = - tableProperties.getProperty(hive_metastoreConstants.META_TABLE_NAME); - tableName = tableName.toLowerCase(); + tableName = tableProperties.getProperty(hive_metastoreConstants.META_TABLE_NAME); + tableName = tableName.toLowerCase(); if (tableName.startsWith(DEFAULT_PREFIX)) { tableName = tableName.substring(DEFAULT_PREFIX.length()); } @@ -432,8 +258,7 @@ public void configureTableJobProperties( } try { addHBaseDelegationToken(jobConf); - }//try - catch (IOException e) { + } catch (IOException | MetaException e) { throw new IllegalStateException("Error while configuring input job properties", e); } //input job properties } @@ -480,18 +305,19 @@ private void addHBaseResources(Configuration jobConf, } } - private void addHBaseDelegationToken(Configuration conf) throws IOException { + private void addHBaseDelegationToken(Configuration conf) throws IOException, MetaException { if (User.isHBaseSecurityEnabled(conf)) { - HConnection conn = HConnectionManager.createConnection(conf); + Connection connection = ConnectionFactory.createConnection(hbaseConf); try { User curUser = User.getCurrent(); Job job = new Job(conf); - TokenUtil.addTokenForJob(conn, curUser, job); + TokenUtil.addTokenForJob(connection, curUser, job); } catch (InterruptedException e) { throw new IOException("Error while obtaining hbase delegation token", e); - } - finally { - conn.close(); + } finally { + if (connection != null) { + connection.close(); + } } } } @@ -523,8 +349,9 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { } if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_HBASE_SNAPSHOT_NAME) != null) { // There is an extra dependency on MetricsRegistry for snapshot IF. - TableMapReduceUtil.addDependencyJars(jobConf, MetricsRegistry.class); + TableMapReduceUtil.addDependencyJars(jobConf, MetricRegistry.class); } + Set merged = new LinkedHashSet(jobConf.getStringCollection("tmpjars")); Job copy = new Job(jobConf); diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java index 6054d533f0..d42b7ecc8b 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java @@ -24,13 +24,10 @@ import java.util.List; import java.util.Map; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -43,14 +40,6 @@ class HiveHBaseInputFormatUtil { /** - * Parse {@code jobConf} to create the target {@link HTable} instance. - */ - public static HTable getTable(JobConf jobConf) throws IOException { - String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - return new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)); - } - - /** * Parse {@code jobConf} to create a {@link Scan} instance. */ public static Scan getScan(JobConf jobConf) throws IOException { diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 1ef454572c..8b89817902 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java index 4b8f62c913..7c78d7b25e 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java @@ -20,19 +20,21 @@ import java.io.IOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.hbase.PutWritable; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -102,9 +104,9 @@ public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException { jobConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); final boolean walEnabled = HiveConf.getBoolVar( jobConf, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED); - final HTable table = new HTable(HBaseConfiguration.create(jobConf), hbaseTableName); - table.setAutoFlush(false); - return new MyRecordWriter(table,walEnabled); + final Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf)); + final BufferedMutator table = conn.getBufferedMutator(TableName.valueOf(hbaseTableName)); + return new MyRecordWriter(table, conn, walEnabled); } @Override @@ -115,12 +117,14 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOE static private class MyRecordWriter implements org.apache.hadoop.mapred.RecordWriter { - private final HTable m_table; + private final BufferedMutator m_table; private final boolean m_walEnabled; + private final Connection m_connection; - public MyRecordWriter(HTable table, boolean walEnabled) { + public MyRecordWriter(BufferedMutator table, Connection connection, boolean walEnabled) { m_table = table; m_walEnabled = walEnabled; + m_connection = connection; } public void close(Reporter reporter) @@ -143,13 +147,14 @@ public void write(ImmutableBytesWritable key, } else { put.setDurability(Durability.SKIP_WAL); } - m_table.put(put); + m_table.mutate(put); } @Override protected void finalize() throws Throwable { try { m_table.close(); + m_connection.close(); } finally { super.finalize(); } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java index a25a96faa9..223dbe192a 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -60,15 +60,15 @@ * for loading a table with a single column family. */ public class HiveHFileOutputFormat extends - HFileOutputFormat implements - HiveOutputFormat { + HFileOutputFormat2 implements + HiveOutputFormat { public static final String HFILE_FAMILY_PATH = "hfile.family.path"; static final Logger LOG = LoggerFactory.getLogger(HiveHFileOutputFormat.class.getName()); private - org.apache.hadoop.mapreduce.RecordWriter + org.apache.hadoop.mapreduce.RecordWriter getFileWriter(org.apache.hadoop.mapreduce.TaskAttemptContext tac) throws IOException { try { @@ -118,7 +118,7 @@ public RecordWriter getHiveRecordWriter( final Path outputdir = FileOutputFormat.getOutputPath(tac); final Path taskAttemptOutputdir = new FileOutputCommitter(outputdir, tac).getWorkPath(); final org.apache.hadoop.mapreduce.RecordWriter< - ImmutableBytesWritable, KeyValue> fileWriter = getFileWriter(tac); + ImmutableBytesWritable, Cell> fileWriter = getFileWriter(tac); // Individual columns are going to be pivoted to HBase cells, // and for each row, they need to be written out in order @@ -262,7 +262,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf jc) throws IOException } @Override - public org.apache.hadoop.mapred.RecordWriter getRecordWriter( + public org.apache.hadoop.mapred.RecordWriter getRecordWriter( FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { throw new NotImplementedException("This will not be invoked"); } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ResultWritable.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ResultWritable.java index b35aea9f60..93c2f96f11 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ResultWritable.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ResultWritable.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -65,9 +66,9 @@ public void write(final DataOutput out) throws IOException { ProtobufUtil.toResultNoData(result).writeDelimitedTo(DataOutputOutputStream.from(out)); out.writeInt(result.size()); - for(KeyValue kv : result.list()) { + for(Cell cell : result.listCells()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); KeyValue.write(kv, out); } } - } diff --git a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java index f244ed6562..14557d3bac 100644 --- a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java +++ b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java @@ -42,6 +42,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -179,7 +181,7 @@ public void testHBaseSerDeI() throws SerDeException { byte [] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); kvs.add(new KeyValue(rowKey, cfa, qualByte, Bytes.toBytes("123"))); kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes("456"))); @@ -191,18 +193,18 @@ public void testHBaseSerDeI() throws SerDeException { kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes("true"))); Collections.sort(kvs, KeyValue.COMPARATOR); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); - p.add(cfa, qualByte, Bytes.toBytes("123")); - p.add(cfb, qualShort, Bytes.toBytes("456")); - p.add(cfc, qualInt, Bytes.toBytes("789")); - p.add(cfa, qualLong, Bytes.toBytes("1000")); - p.add(cfb, qualFloat, Bytes.toBytes("-0.01")); - p.add(cfc, qualDouble, Bytes.toBytes("5.3")); - p.add(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive")); - p.add(cfb, qualBool, Bytes.toBytes("true")); + p.addColumn(cfa, qualByte, Bytes.toBytes("123")); + p.addColumn(cfb, qualShort, Bytes.toBytes("456")); + p.addColumn(cfc, qualInt, Bytes.toBytes("789")); + p.addColumn(cfa, qualLong, Bytes.toBytes("1000")); + p.addColumn(cfb, qualFloat, Bytes.toBytes("-0.01")); + p.addColumn(cfc, qualDouble, Bytes.toBytes("5.3")); + p.addColumn(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive")); + p.addColumn(cfb, qualBool, Bytes.toBytes("true")); Object[] expectedFieldsData = { new Text("test-row1"), @@ -273,7 +275,7 @@ public void testHBaseSerDeWithTimestamp() throws SerDeException { byte [] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); kvs.add(new KeyValue(rowKey, cfa, qualByte, Bytes.toBytes("123"))); kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes("456"))); @@ -285,18 +287,18 @@ public void testHBaseSerDeWithTimestamp() throws SerDeException { kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes("true"))); Collections.sort(kvs, KeyValue.COMPARATOR); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey,putTimestamp); - p.add(cfa, qualByte, Bytes.toBytes("123")); - p.add(cfb, qualShort, Bytes.toBytes("456")); - p.add(cfc, qualInt, Bytes.toBytes("789")); - p.add(cfa, qualLong, Bytes.toBytes("1000")); - p.add(cfb, qualFloat, Bytes.toBytes("-0.01")); - p.add(cfc, qualDouble, Bytes.toBytes("5.3")); - p.add(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive")); - p.add(cfb, qualBool, Bytes.toBytes("true")); + p.addColumn(cfa, qualByte, Bytes.toBytes("123")); + p.addColumn(cfb, qualShort, Bytes.toBytes("456")); + p.addColumn(cfc, qualInt, Bytes.toBytes("789")); + p.addColumn(cfa, qualLong, Bytes.toBytes("1000")); + p.addColumn(cfb, qualFloat, Bytes.toBytes("-0.01")); + p.addColumn(cfc, qualDouble, Bytes.toBytes("5.3")); + p.addColumn(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive")); + p.addColumn(cfb, qualBool, Bytes.toBytes("true")); Object[] expectedFieldsData = { new Text("test-row1"), @@ -419,7 +421,7 @@ public void testHBaseSerDeII() throws SerDeException { byte [] rowKey = Bytes.toBytes("test-row-2"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); kvs.add(new KeyValue(rowKey, cfa, qualByte, new byte [] { Byte.MIN_VALUE })); kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE))); @@ -431,19 +433,21 @@ public void testHBaseSerDeII() throws SerDeException { "Hadoop, HBase, and Hive Again!"))); kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes(false))); +// When using only HBase2, then we could change to this +// Collections.sort(kvs, CellComparator.COMPARATOR); Collections.sort(kvs, KeyValue.COMPARATOR); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); - p.add(cfa, qualByte, new byte [] { Byte.MIN_VALUE }); - p.add(cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE)); - p.add(cfc, qualInt, Bytes.toBytes(Integer.MIN_VALUE)); - p.add(cfa, qualLong, Bytes.toBytes(Long.MIN_VALUE)); - p.add(cfb, qualFloat, Bytes.toBytes(Float.MIN_VALUE)); - p.add(cfc, qualDouble, Bytes.toBytes(Double.MAX_VALUE)); - p.add(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive Again!")); - p.add(cfb, qualBool, Bytes.toBytes(false)); + p.addColumn(cfa, qualByte, new byte [] { Byte.MIN_VALUE }); + p.addColumn(cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE)); + p.addColumn(cfc, qualInt, Bytes.toBytes(Integer.MIN_VALUE)); + p.addColumn(cfa, qualLong, Bytes.toBytes(Long.MIN_VALUE)); + p.addColumn(cfb, qualFloat, Bytes.toBytes(Float.MIN_VALUE)); + p.addColumn(cfc, qualDouble, Bytes.toBytes(Double.MAX_VALUE)); + p.addColumn(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive Again!")); + p.addColumn(cfb, qualBool, Bytes.toBytes(false)); Object[] expectedFieldsData = { new Text("test-row-2"), @@ -557,7 +561,7 @@ public void testHBaseSerDeWithHiveMapToHBaseColumnFamily() throws SerDeException Bytes.toBytes(true)} }; - List kvs = new ArrayList(); + List kvs = new ArrayList(); Result [] r = new Result [] {null, null, null}; Put [] p = new Put [] {null, null, null}; @@ -568,11 +572,11 @@ public void testHBaseSerDeWithHiveMapToHBaseColumnFamily() throws SerDeException for (int j = 0; j < columnQualifiersAndValues[i].length; j++) { kvs.add(new KeyValue(rowKeys[i], columnFamilies[j], columnQualifiersAndValues[i][j], columnQualifiersAndValues[i][j])); - p[i].add(columnFamilies[j], columnQualifiersAndValues[i][j], + p[i].addColumn(columnFamilies[j], columnQualifiersAndValues[i][j], columnQualifiersAndValues[i][j]); } - r[i] = new Result(kvs); + r[i] = Result.create(kvs); } Object [][] expectedData = { @@ -701,15 +705,15 @@ public void testHBaseSerDeWithHiveMapToHBaseColumnFamilyII() throws SerDeExcepti }; Put p = new Put(rowKey); - List kvs = new ArrayList(); + List kvs = new ArrayList(); for (int j = 0; j < columnQualifiersAndValues.length; j++) { kvs.add(new KeyValue(rowKey, columnFamilies[j], columnQualifiersAndValues[j], columnQualifiersAndValues[j])); - p.add(columnFamilies[j], columnQualifiersAndValues[j], columnQualifiersAndValues[j]); + p.addColumn(columnFamilies[j], columnQualifiersAndValues[j], columnQualifiersAndValues[j]); } - Result r = new Result(kvs); + Result r = Result.create(kvs); Object [] expectedData = { new Text("row-key"), new ByteWritable((byte) 123), new ShortWritable((short) 456), @@ -821,7 +825,7 @@ public void testHBaseSerDeWithColumnPrefixes() byte[] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] dataA = "This is first test data".getBytes(); byte[] dataB = "This is second test data".getBytes(); @@ -833,7 +837,7 @@ public void testHBaseSerDeWithColumnPrefixes() kvs.add(new KeyValue(rowKey, cfa, qualC, dataC)); kvs.add(new KeyValue(rowKey, cfa, qualD, dataD)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -928,13 +932,13 @@ public void testHBaseSerDeCompositeKeyWithSeparator() throws SerDeException, TEx byte[] rowKey = testStruct.getBytes(); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] testData = "This is a test data".getBytes(); kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -976,13 +980,13 @@ public void testHBaseSerDeCompositeKeyWithoutSeparator() throws SerDeException, byte[] rowKey = testStruct.getBytes(); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] testData = "This is a test data".getBytes(); kvs.add(new KeyValue(rowKey, cfa, qualStruct, testData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); byte[] putRowKey = testStruct.getBytesWithDelimiters(); @@ -1047,13 +1051,13 @@ public void testHBaseSerDeWithAvroSchemaInline() throws SerDeException, IOExcept byte[] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] avroData = getTestAvroBytesFromSchema(RECORD_SCHEMA); kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -1092,13 +1096,13 @@ public void testHBaseSerDeWithForwardEvolvedSchema() throws SerDeException, IOEx byte[] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] avroData = getTestAvroBytesFromSchema(RECORD_SCHEMA); kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -1138,13 +1142,13 @@ public void testHBaseSerDeWithBackwardEvolvedSchema() throws SerDeException, IOE byte[] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] avroData = getTestAvroBytesFromSchema(RECORD_SCHEMA_EVOLVED); kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -1183,13 +1187,13 @@ public void testHBaseSerDeWithAvroSerClass() throws SerDeException, IOException byte[] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] avroData = getTestAvroBytesFromClass1(1); kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -1234,13 +1238,13 @@ public void testHBaseSerDeWithAvroSchemaUrl() throws SerDeException, IOException byte[] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] avroData = getTestAvroBytesFromSchema(RECORD_SCHEMA); kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -1298,13 +1302,13 @@ public void testHBaseSerDeWithAvroExternalSchema() throws SerDeException, IOExce byte[] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] avroData = getTestAvroBytesFromClass2(1); kvs.add(new KeyValue(rowKey, cfa, qualAvro, avroData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -1362,7 +1366,7 @@ public void testHBaseSerDeWithHiveMapToHBaseAvroColumnFamily() throws Exception byte[] rowKey = Bytes.toBytes("test-row1"); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] avroDataA = getTestAvroBytesFromSchema(RECORD_SCHEMA); byte[] avroDataB = getTestAvroBytesFromClass1(1); @@ -1372,7 +1376,7 @@ public void testHBaseSerDeWithHiveMapToHBaseAvroColumnFamily() throws Exception kvs.add(new KeyValue(rowKey, cfa, qualAvroB, avroDataB)); kvs.add(new KeyValue(rowKey, cfa, qualAvroC, avroDataC)); - Result r = new Result(kvs); + Result r = Result.create(kvs); Put p = new Put(rowKey); @@ -1426,12 +1430,12 @@ public void testHBaseSerDeCustomStructValue() throws IOException, SerDeException TestStruct testStruct = new TestStruct("A", "B", "C", false, (byte) 0); byte[] key = testStruct.getBytes(); // Data - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] testData = testStruct.getBytes(); kvs.add(new KeyValue(key, cfa, qualStruct, testData)); - Result r = new Result(kvs); + Result r = Result.create(kvs); byte[] putKey = testStruct.getBytesWithDelimiters(); Put p = new Put(putKey); diff --git a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java index b2bdd19964..216d7ae60b 100644 --- a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java +++ b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java @@ -25,6 +25,7 @@ import junit.framework.TestCase; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; @@ -69,7 +70,7 @@ public void testLazyHBaseCellMap1() throws SerDeException { LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi); // Initialize a result - List kvs = new ArrayList(); + List kvs = new ArrayList(); kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfa"), Bytes.toBytes("col1"), Bytes.toBytes("cfacol1"))); @@ -86,7 +87,7 @@ public void testLazyHBaseCellMap1() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfc"), Bytes.toBytes("col3"), Bytes.toBytes("cfccol3"))); - Result r = new Result(kvs); + Result r = Result.create(kvs); List mapBinaryStorage = new ArrayList(); mapBinaryStorage.add(false); @@ -131,7 +132,7 @@ public void testLazyHBaseCellMap2() throws SerDeException { LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi); // Initialize a result - List kvs = new ArrayList(); + List kvs = new ArrayList(); kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfa"), Bytes.toBytes("col1"), Bytes.toBytes("cfacol1"))); @@ -148,7 +149,7 @@ public void testLazyHBaseCellMap2() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfc"), Bytes.toBytes("col3"), Bytes.toBytes("cfccol3"))); - Result r = new Result(kvs); + Result r = Result.create(kvs); List mapBinaryStorage = new ArrayList(); mapBinaryStorage.add(false); mapBinaryStorage.add(false); @@ -192,11 +193,11 @@ public void testLazyHBaseCellMap3() throws SerDeException { mapBinaryIntKeyValue, new byte [] {(byte)1, (byte) 2}, 0, nullSequence, false, (byte) 0); LazyHBaseCellMap hbaseCellMap = new LazyHBaseCellMap((LazyMapObjectInspector) oi); - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte [] rowKey = "row-key".getBytes(); byte [] cfInt = "cf-int".getBytes(); kvs.add(new KeyValue(rowKey, cfInt, Bytes.toBytes(1), Bytes.toBytes(1))); - Result result = new Result(kvs); + Result result = Result.create(kvs); List mapBinaryStorage = new ArrayList(); mapBinaryStorage.add(true); mapBinaryStorage.add(true); @@ -210,7 +211,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue( rowKey, cfInt, Bytes.toBytes(Integer.MIN_VALUE), Bytes.toBytes(Integer.MIN_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfInt, mapBinaryStorage); expectedIntValue = new IntWritable(Integer.MIN_VALUE); lazyPrimitive = @@ -221,7 +222,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue( rowKey, cfInt, Bytes.toBytes(Integer.MAX_VALUE), Bytes.toBytes(Integer.MAX_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfInt, mapBinaryStorage); expectedIntValue = new IntWritable(Integer.MAX_VALUE); lazyPrimitive = @@ -237,7 +238,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { byte [] cfByte = "cf-byte".getBytes(); kvs.clear(); kvs.add(new KeyValue(rowKey, cfByte, new byte [] {(byte) 1}, new byte [] {(byte) 1})); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfByte, mapBinaryStorage); ByteWritable expectedByteValue = new ByteWritable((byte) 1); lazyPrimitive = @@ -248,7 +249,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfByte, new byte [] {Byte.MIN_VALUE}, new byte [] {Byte.MIN_VALUE})); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfByte, mapBinaryStorage); expectedByteValue = new ByteWritable(Byte.MIN_VALUE); lazyPrimitive = @@ -259,7 +260,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfByte, new byte [] {Byte.MAX_VALUE}, new byte [] {Byte.MAX_VALUE})); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfByte, mapBinaryStorage); expectedByteValue = new ByteWritable(Byte.MAX_VALUE); lazyPrimitive = @@ -275,7 +276,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { byte [] cfShort = "cf-short".getBytes(); kvs.clear(); kvs.add(new KeyValue(rowKey, cfShort, Bytes.toBytes((short) 1), Bytes.toBytes((short) 1))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfShort, mapBinaryStorage); ShortWritable expectedShortValue = new ShortWritable((short) 1); lazyPrimitive = @@ -286,7 +287,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfShort, Bytes.toBytes(Short.MIN_VALUE), Bytes.toBytes(Short.MIN_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfShort, mapBinaryStorage); expectedShortValue = new ShortWritable(Short.MIN_VALUE); lazyPrimitive = @@ -297,7 +298,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfShort, Bytes.toBytes(Short.MAX_VALUE), Bytes.toBytes(Short.MAX_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfShort, mapBinaryStorage); expectedShortValue = new ShortWritable(Short.MAX_VALUE); lazyPrimitive = @@ -313,7 +314,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { byte [] cfLong = "cf-long".getBytes(); kvs.clear(); kvs.add(new KeyValue(rowKey, cfLong, Bytes.toBytes((long) 1), Bytes.toBytes((long) 1))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfLong, mapBinaryStorage); LongWritable expectedLongValue = new LongWritable(1); lazyPrimitive = @@ -324,7 +325,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfLong, Bytes.toBytes(Long.MIN_VALUE), Bytes.toBytes(Long.MIN_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfLong, mapBinaryStorage); expectedLongValue = new LongWritable(Long.MIN_VALUE); lazyPrimitive = @@ -335,7 +336,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfLong, Bytes.toBytes(Long.MAX_VALUE), Bytes.toBytes(Long.MAX_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfLong, mapBinaryStorage); expectedLongValue = new LongWritable(Long.MAX_VALUE); lazyPrimitive = @@ -353,7 +354,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfFloat, Bytes.toBytes((float) 1.0F), Bytes.toBytes((float) 1.0F))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfFloat, mapBinaryStorage); FloatWritable expectedFloatValue = new FloatWritable(1.0F); lazyPrimitive = @@ -364,7 +365,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfFloat, Bytes.toBytes((float) Float.MIN_VALUE), Bytes.toBytes((float) Float.MIN_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfFloat, mapBinaryStorage); expectedFloatValue = new FloatWritable(Float.MIN_VALUE); lazyPrimitive = @@ -375,7 +376,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfFloat, Bytes.toBytes((float) Float.MAX_VALUE), Bytes.toBytes((float) Float.MAX_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfFloat, mapBinaryStorage); expectedFloatValue = new FloatWritable(Float.MAX_VALUE); lazyPrimitive = @@ -392,7 +393,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { byte [] cfDouble = "cf-double".getBytes(); kvs.clear(); kvs.add(new KeyValue(rowKey, cfDouble, Bytes.toBytes(1.0), Bytes.toBytes(1.0))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfDouble, mapBinaryStorage); DoubleWritable expectedDoubleValue = new DoubleWritable(1.0); lazyPrimitive = @@ -403,7 +404,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfDouble, Bytes.toBytes(Double.MIN_VALUE), Bytes.toBytes(Double.MIN_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfDouble, mapBinaryStorage); expectedDoubleValue = new DoubleWritable(Double.MIN_VALUE); lazyPrimitive = @@ -414,7 +415,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfDouble, Bytes.toBytes(Double.MAX_VALUE), Bytes.toBytes(Double.MAX_VALUE))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfDouble, mapBinaryStorage); expectedDoubleValue = new DoubleWritable(Double.MAX_VALUE); lazyPrimitive = @@ -431,7 +432,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { byte [] cfBoolean = "cf-boolean".getBytes(); kvs.clear(); kvs.add(new KeyValue(rowKey, cfBoolean, Bytes.toBytes(false), Bytes.toBytes(false))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfBoolean, mapBinaryStorage); BooleanWritable expectedBooleanValue = new BooleanWritable(false); lazyPrimitive = @@ -441,7 +442,7 @@ public void testLazyHBaseCellMap3() throws SerDeException { kvs.clear(); kvs.add(new KeyValue(rowKey, cfBoolean, Bytes.toBytes(true), Bytes.toBytes(true))); - result = new Result(kvs); + result = Result.create(kvs); hbaseCellMap.init(result, cfBoolean, mapBinaryStorage); expectedBooleanValue = new BooleanWritable(true); lazyPrimitive = @@ -485,7 +486,7 @@ public void testLazyHBaseRow1() throws SerDeException { nullSequence, false, false, (byte)0); LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings); - List kvs = new ArrayList(); + List kvs = new ArrayList(); kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfa"), Bytes.toBytes("a"), Bytes.toBytes("123"))); @@ -496,7 +497,7 @@ public void testLazyHBaseRow1() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("hi"))); - Result r = new Result(kvs); + Result r = Result.create(kvs); o.init(r); assertEquals( @@ -510,7 +511,7 @@ public void testLazyHBaseRow1() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("d=e:f=g"))); - r = new Result(kvs); + r = Result.create(kvs); o.init(r); assertEquals( @@ -526,7 +527,7 @@ public void testLazyHBaseRow1() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no"))); - r = new Result(kvs); + r = Result.create(kvs); o.init(r); assertEquals( @@ -540,7 +541,7 @@ public void testLazyHBaseRow1() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no"))); - r = new Result(kvs); + r = Result.create(kvs); o.init(r); assertEquals( @@ -564,7 +565,7 @@ public void testLazyHBaseRow1() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes(""))); - r = new Result(kvs); + r = Result.create(kvs); o.init(r); assertEquals( @@ -611,7 +612,7 @@ public void testLazyHBaseRow2() throws SerDeException { nullSequence, false, false, (byte) 0); LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings); - List kvs = new ArrayList(); + List kvs = new ArrayList(); kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfa"), Bytes.toBytes("a"), Bytes.toBytes("123"))); kvs.add(new KeyValue(Bytes.toBytes("test-row"), @@ -623,7 +624,7 @@ public void testLazyHBaseRow2() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("hi"))); - Result r = new Result(kvs); + Result r = Result.create(kvs); o.init(r); assertEquals( @@ -639,7 +640,7 @@ public void testLazyHBaseRow2() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"), Bytes.toBytes("f"), Bytes.toBytes("g"))); - r = new Result(kvs); + r = Result.create(kvs); o.init(r); assertEquals( @@ -655,7 +656,7 @@ public void testLazyHBaseRow2() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no"))); - r = new Result(kvs); + r = Result.create(kvs); o.init(r); assertEquals( @@ -669,7 +670,7 @@ public void testLazyHBaseRow2() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no"))); - r = new Result(kvs); + r = Result.create(kvs); o.init(r); assertEquals( @@ -685,7 +686,7 @@ public void testLazyHBaseRow2() throws SerDeException { kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes(""))); - r = new Result(kvs); + r = Result.create(kvs); o.init(r); assertEquals( @@ -736,7 +737,7 @@ public void testLazyHBaseRow3() throws SerDeException { LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings); byte [] rowKey = "row-key".getBytes(); - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte [] value; for (int i = 1; i < columnsMapping.length; i++) { @@ -784,7 +785,7 @@ public void testLazyHBaseRow3() throws SerDeException { } Collections.sort(kvs, KeyValue.COMPARATOR); - Result result = new Result(kvs); + Result result = Result.create(kvs); o.init(result); List fieldRefs = ((StructObjectInspector) oi).getAllStructFieldRefs(); diff --git a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestPutResultWritable.java b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestPutResultWritable.java index 561b0a8952..cd9afed476 100644 --- a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestPutResultWritable.java +++ b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestPutResultWritable.java @@ -43,9 +43,9 @@ public void testResult() throws Exception { new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfa"), Bytes.toBytes("col2"), Bytes.toBytes("cfacol2")) }; - Result expected = new Result(kvs); + Result expected = Result.create(kvs); ResultWritable actual = copy(new ResultWritable(expected), new ResultWritable()); - Assert.assertArrayEquals(expected.raw(), actual.getResult().raw()); + Assert.assertArrayEquals(expected.rawCells(), actual.getResult().rawCells()); } @@ -65,7 +65,8 @@ public void testPut() throws Exception { } PutWritable actual = copy(new PutWritable(expected), new PutWritable()); Assert.assertArrayEquals(expected.getRow(), actual.getPut().getRow()); - Assert.assertEquals(expected.getFamilyMap(), actual.getPut().getFamilyMap()); + Assert.assertEquals(expected.getFamilyCellMap().keySet(), + actual.getPut().getFamilyCellMap().keySet()); } private T copy(T oldWritable, T newWritable) throws IOException { diff --git a/hbase-handler/src/test/queries/positive/hbase_bulk.q b/hbase-handler/src/test/queries/positive/hbase_bulk.q index 475aafc1ce..5e0c14e08d 100644 --- a/hbase-handler/src/test/queries/positive/hbase_bulk.q +++ b/hbase-handler/src/test/queries/positive/hbase_bulk.q @@ -9,7 +9,7 @@ create table hbsort(key string, val string, val2 string) stored as INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat' -TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf'); +TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf','hbase.mapreduce.hfileoutputformat.table.name'='hbsort'); -- this is a dummy table used for controlling how the input file -- for TotalOrderPartitioner is created diff --git a/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q b/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q index 85581ecdac..ac2fdfade6 100644 --- a/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q +++ b/hbase-handler/src/test/queries/positive/hbase_handler_bulk.q @@ -6,7 +6,7 @@ drop table if exists hb_target; create table hb_target(key int, val string) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties ('hbase.columns.mapping' = ':key,cf:val') -tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk'); +tblproperties ('hbase.mapreduce.hfileoutputformat.table.name' = 'positive_hbase_handler_bulk'); set hive.hbase.generatehfiles=true; set hfile.family.path=/tmp/hb_target/cf; diff --git a/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out b/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out index 1f42567a4b..10e1c0a1e9 100644 --- a/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out +++ b/hbase-handler/src/test/results/positive/hbase_handler_bulk.q.out @@ -5,14 +5,14 @@ POSTHOOK: type: DROPTABLE PREHOOK: query: create table hb_target(key int, val string) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties ('hbase.columns.mapping' = ':key,cf:val') -tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk') +tblproperties ('hbase.mapreduce.hfileoutputformat.table.name' = 'positive_hbase_handler_bulk') PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@hb_target POSTHOOK: query: create table hb_target(key int, val string) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties ('hbase.columns.mapping' = ':key,cf:val') -tblproperties ('hbase.table.name' = 'positive_hbase_handler_bulk') +tblproperties ('hbase.mapreduce.hfileoutputformat.table.name' = 'positive_hbase_handler_bulk') POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@hb_target diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 4ace16ca6f..65bfb9c389 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -739,7 +739,7 @@ public static void copyJobPropertiesToJobConf( public static boolean isHadoop23() { String version = org.apache.hadoop.util.VersionInfo.getVersion(); - if (version.matches("\\b0\\.23\\..+\\b")||version.matches("\\b2\\..*")) + if (version.matches("\\b0\\.23\\..+\\b")||version.matches("\\b2\\..*")||version.matches("\\b3\\..*")) return true; return false; } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java index bb6c582d28..7e30d98dd4 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -85,6 +86,7 @@ private static BytesRefArrayWritable patialS = new BytesRefArrayWritable(); private static byte[][] bytesArray = null; + private static int numRepeat = 1000; private static BytesRefArrayWritable s = null; @@ -115,6 +117,7 @@ patialS.set(6, new BytesRefWritable("NULL".getBytes("UTF-8"))); patialS.set(7, new BytesRefWritable("NULL".getBytes("UTF-8"))); + numRepeat = (int) Math.ceil((double)SequenceFile.SYNC_INTERVAL / (double)bytesArray.length); } catch (UnsupportedEncodingException e) { } } @@ -182,24 +185,24 @@ public void testSynAndSplit() throws IOException, InterruptedException { } private void splitBeforeSync() throws IOException, InterruptedException { - writeThenReadByRecordReader(600, 1000, 2, 17684, null); + writeThenReadByRecordReader(600, numRepeat, 2, 17684, null); } private void splitRightBeforeSync() throws IOException, InterruptedException { - writeThenReadByRecordReader(500, 1000, 2, 17750, null); + writeThenReadByRecordReader(500, numRepeat, 2, 17750, null); } private void splitInMiddleOfSync() throws IOException, InterruptedException { - writeThenReadByRecordReader(500, 1000, 2, 17760, null); + writeThenReadByRecordReader(500, numRepeat, 2, 17760, null); } private void splitRightAfterSync() throws IOException, InterruptedException { - writeThenReadByRecordReader(500, 1000, 2, 17770, null); + writeThenReadByRecordReader(500, numRepeat, 2, 17770, null); } private void splitAfterSync() throws IOException, InterruptedException { - writeThenReadByRecordReader(500, 1000, 2, 19950, null); + writeThenReadByRecordReader(500, numRepeat, 2, 19950, null); } private void writeThenReadByRecordReader(int intervalRecordCount, diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestWebHCatE2e.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestWebHCatE2e.java index 22d2cc6bff..341314b218 100644 --- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestWebHCatE2e.java +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestWebHCatE2e.java @@ -238,7 +238,7 @@ public void getHadoopVersion() throws Exception { Map props = JsonBuilder.jsonToMap(p.responseBody); Assert.assertEquals("hadoop", props.get("module")); Assert.assertTrue(p.getAssertMsg(), - ((String)props.get("version")).matches("[1-2].[0-9]+.[0-9]+.*")); + ((String)props.get("version")).matches("[1-3].[0-9]+.[0-9]+.*")); } @Test @@ -356,4 +356,4 @@ private static MethodCallRetVal doHttpCall(String uri, HTTP_METHOD_TYPE type, Ma } return new MethodCallRetVal(-1, "Http " + type + " failed; see log file for details", actualUri, method.getName()); } -} \ No newline at end of file +} diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/mock/MockUriInfo.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/mock/MockUriInfo.java index d69a7339c1..30c4a05a7d 100644 --- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/mock/MockUriInfo.java +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/mock/MockUriInfo.java @@ -136,4 +136,15 @@ public UriBuilder getRequestUriBuilder() { return null; } + @Override + public URI relativize(URI uri) { + // TODO Auto-generated method stub + return null; + } + + @Override + public URI resolve(URI uri) { + // TODO Auto-generated method stub + return null; + } } diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml index bb6b10554f..2a2f8457ff 100644 --- a/itests/hcatalog-unit/pom.xml +++ b/itests/hcatalog-unit/pom.xml @@ -283,6 +283,12 @@ org.apache.hbase + hbase-mapreduce + ${hbase.version} + test + + + org.apache.hbase hbase-server ${hbase.version} tests diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java index 745aa999f9..ad44bc2d62 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/ManyMiniCluster.java @@ -25,8 +25,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; @@ -121,7 +123,6 @@ protected synchronized void start() { protected synchronized void stop() { if (hbaseCluster != null) { - HConnectionManager.deleteAllConnections(true); try { hbaseCluster.shutdown(); } catch (Exception e) { @@ -245,6 +246,8 @@ private void setupZookeeper() { private void setupHBaseCluster() { final int numRegionServers = 1; + Connection connection = null; + Table table = null; try { hbaseDir = new File(workDir, "hbase").getCanonicalPath(); @@ -266,9 +269,25 @@ private void setupHBaseCluster() { hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers); hbaseConf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort()); //opening the META table ensures that cluster is running - new HTable(hbaseConf, HConstants.META_TABLE_NAME); + connection = ConnectionFactory.createConnection(hbaseConf); + table = connection.getTable(TableName.META_TABLE_NAME); } catch (Exception e) { throw new IllegalStateException("Failed to setup HBase Cluster", e); + } finally { + if (table != null) { + try { + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if (connection != null) { + try { + connection.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } } } diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/SkeletonHBaseTest.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/SkeletonHBaseTest.java index 4e1384a634..c8bb4f5ee8 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/SkeletonHBaseTest.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/SkeletonHBaseTest.java @@ -33,6 +33,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hive.conf.HiveConf; import org.junit.AfterClass; @@ -56,10 +60,13 @@ */ protected static Configuration testConf = null; - protected void createTable(String tableName, String[] families) { + protected void createTable(String tableName, String[] families) throws IOException { + Connection connection = null; + Admin admin = null; try { - HBaseAdmin admin = new HBaseAdmin(getHbaseConf()); - HTableDescriptor tableDesc = new HTableDescriptor(tableName); + connection = ConnectionFactory.createConnection(getHbaseConf()); + admin = connection.getAdmin(); + HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); for (String family : families) { HColumnDescriptor columnDescriptor = new HColumnDescriptor(family); tableDesc.addFamily(columnDescriptor); @@ -68,8 +75,14 @@ protected void createTable(String tableName, String[] families) { } catch (Exception e) { e.printStackTrace(); throw new IllegalStateException(e); + } finally { + if (admin != null) { + admin.close(); + } + if (connection != null) { + connection.close(); + } } - } protected String newTableName(String prefix) { @@ -90,6 +103,9 @@ protected String newTableName(String prefix) { */ @BeforeClass public static void setup() { + // Fix needed due to dependency for hbase-mapreduce module + System.setProperty("org.apache.hadoop.hbase.shaded.io.netty.packagePrefix", + "org.apache.hadoop.hbase.shaded."); if (!contextMap.containsKey(getContextHandle())) contextMap.put(getContextHandle(), new Context(getContextHandle())); diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/TestPigHBaseStorageHandler.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/TestPigHBaseStorageHandler.java index f8f18b3514..120b4af826 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/TestPigHBaseStorageHandler.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/hbase/TestPigHBaseStorageHandler.java @@ -34,12 +34,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; @@ -94,10 +98,17 @@ public void Initialize() throws Exception { } - private void populateHBaseTable(String tName) throws IOException { + private void populateHBaseTable(String tName, Connection connection) throws IOException { List myPuts = generatePuts(tName); - HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tName)); - table.put(myPuts); + Table table = null; + try { + table = connection.getTable(TableName.valueOf(tName)); + table.put(myPuts); + } finally { + if (table != null) { + table.close(); + } + } } private List generatePuts(String tableName) throws IOException { @@ -107,8 +118,8 @@ private void populateHBaseTable(String tName) throws IOException { myPuts = new ArrayList(); for (int i = 1; i <=10; i++) { Put put = new Put(Bytes.toBytes(i)); - put.add(FAMILY, QUALIFIER1, 1, Bytes.toBytes("textA-" + i)); - put.add(FAMILY, QUALIFIER2, 1, Bytes.toBytes("textB-" + i)); + put.addColumn(FAMILY, QUALIFIER1, 1, Bytes.toBytes("textA-" + i)); + put.addColumn(FAMILY, QUALIFIER2, 1, Bytes.toBytes("textB-" + i)); myPuts.add(put); } return myPuts; @@ -165,8 +176,22 @@ public void testPigHBaseSchema() throws Exception { CommandProcessorResponse responseThree = driver.run(tableQuery); - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists(hbaseTableName); + Connection connection = null; + Admin hAdmin = null; + boolean doesTableExist = false; + try { + connection = ConnectionFactory.createConnection(getHbaseConf()); + hAdmin = connection.getAdmin(); + doesTableExist = hAdmin.tableExists(TableName.valueOf(hbaseTableName)); + } finally { + if (hAdmin != null) { + hAdmin.close(); + } + if (connection != null) { + connection.close(); + } + } + assertTrue(doesTableExist); PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); @@ -220,17 +245,39 @@ public void testPigFilterProjection() throws Exception { CommandProcessorResponse responseThree = driver.run(tableQuery); - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists(hbaseTableName); - assertTrue(doesTableExist); + Connection connection = null; + Admin hAdmin = null; + Table table = null; + ResultScanner scanner = null; + boolean doesTableExist = false; + try { + connection = ConnectionFactory.createConnection(getHbaseConf()); + hAdmin = connection.getAdmin(); + doesTableExist = hAdmin.tableExists(TableName.valueOf(hbaseTableName)); + + assertTrue(doesTableExist); + + populateHBaseTable(hbaseTableName, connection); - populateHBaseTable(hbaseTableName); + table = connection.getTable(TableName.valueOf(hbaseTableName)); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("testFamily")); + scanner = table.getScanner(scan); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null ) { + table.close(); + } + if (hAdmin != null) { + hAdmin.close(); + } + if (connection != null) { + connection.close(); + } + } - Configuration conf = new Configuration(getHbaseConf()); - HTable table = new HTable(conf, hbaseTableName); - Scan scan = new Scan(); - scan.addFamily(Bytes.toBytes("testFamily")); - ResultScanner scanner = table.getScanner(scan); int index=1; PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); @@ -288,59 +335,80 @@ public void testPigPopulation() throws Exception { CommandProcessorResponse responseThree = driver.run(tableQuery); - HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); - boolean doesTableExist = hAdmin.tableExists(hbaseTableName); - assertTrue(doesTableExist); - - - createTestDataFile(POPTXT_FILE_NAME); - - PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); - server.registerQuery("A = load '"+POPTXT_FILE_NAME+"' using PigStorage() as (key:int, testqualifier1:float, testqualifier2:chararray);"); - server.registerQuery("B = filter A by (key > 2) AND (key < 8) ;"); - server.registerQuery("store B into '"+databaseName.toLowerCase()+"."+tableName.toLowerCase()+"' using org.apache.hive.hcatalog.pig.HCatStorer();"); - server.registerQuery("C = load '"+databaseName.toLowerCase()+"."+tableName.toLowerCase()+"' using org.apache.hive.hcatalog.pig.HCatLoader();"); - // Schema should be same - Schema dumpedBSchema = server.dumpSchema("C"); - - List fields = dumpedBSchema.getFields(); - assertEquals(3, fields.size()); - - assertEquals(DataType.INTEGER,fields.get(0).type); - assertEquals("key",fields.get(0).alias.toLowerCase()); - - assertEquals( DataType.FLOAT,fields.get(1).type); - assertEquals("testQualifier1".toLowerCase(), fields.get(1).alias.toLowerCase()); - - assertEquals( DataType.CHARARRAY,fields.get(2).type); - assertEquals("testQualifier2".toLowerCase(), fields.get(2).alias.toLowerCase()); - - //Query the hbase table and check the key is valid and only 5 are present - Configuration conf = new Configuration(getHbaseConf()); - HTable table = new HTable(conf, hbaseTableName); - Scan scan = new Scan(); - scan.addFamily(Bytes.toBytes("testFamily")); - byte[] familyNameBytes = Bytes.toBytes("testFamily"); - ResultScanner scanner = table.getScanner(scan); - int index=3; - int count=0; - for(Result result: scanner) { - //key is correct - assertEquals(index,Bytes.toInt(result.getRow())); - //first column exists - assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes("testQualifier1"))); - //value is correct - assertEquals((index+f),Bytes.toFloat(result.getValue(familyNameBytes,Bytes.toBytes("testQualifier1"))),0); - - //second column exists - assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes("testQualifier2"))); - //value is correct - assertEquals(("textB-"+index).toString(),Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes("testQualifier2")))); - index++; - count++; + Connection connection = null; + Admin hAdmin = null; + Table table = null; + ResultScanner scanner = null; + boolean doesTableExist = false; + try { + connection = ConnectionFactory.createConnection(getHbaseConf()); + hAdmin = connection.getAdmin(); + doesTableExist = hAdmin.tableExists(TableName.valueOf(hbaseTableName)); + + assertTrue(doesTableExist); + + + createTestDataFile(POPTXT_FILE_NAME); + + PigServer server = new PigServer(ExecType.LOCAL,hcatConf.getAllProperties()); + server.registerQuery("A = load '"+POPTXT_FILE_NAME+"' using PigStorage() as (key:int, testqualifier1:float, testqualifier2:chararray);"); + server.registerQuery("B = filter A by (key > 2) AND (key < 8) ;"); + server.registerQuery("store B into '"+databaseName.toLowerCase()+"."+tableName.toLowerCase()+"' using org.apache.hive.hcatalog.pig.HCatStorer();"); + server.registerQuery("C = load '"+databaseName.toLowerCase()+"."+tableName.toLowerCase()+"' using org.apache.hive.hcatalog.pig.HCatLoader();"); + // Schema should be same + Schema dumpedBSchema = server.dumpSchema("C"); + + List fields = dumpedBSchema.getFields(); + assertEquals(3, fields.size()); + + assertEquals(DataType.INTEGER,fields.get(0).type); + assertEquals("key",fields.get(0).alias.toLowerCase()); + + assertEquals( DataType.FLOAT,fields.get(1).type); + assertEquals("testQualifier1".toLowerCase(), fields.get(1).alias.toLowerCase()); + + assertEquals( DataType.CHARARRAY,fields.get(2).type); + assertEquals("testQualifier2".toLowerCase(), fields.get(2).alias.toLowerCase()); + + //Query the hbase table and check the key is valid and only 5 are present + table = connection.getTable(TableName.valueOf(hbaseTableName)); + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("testFamily")); + byte[] familyNameBytes = Bytes.toBytes("testFamily"); + scanner = table.getScanner(scan); + int index=3; + int count=0; + for(Result result: scanner) { + //key is correct + assertEquals(index,Bytes.toInt(result.getRow())); + //first column exists + assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes("testQualifier1"))); + //value is correct + assertEquals((index+f),Bytes.toFloat(result.getValue(familyNameBytes,Bytes.toBytes("testQualifier1"))),0); + + //second column exists + assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes("testQualifier2"))); + //value is correct + assertEquals(("textB-"+index).toString(),Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes("testQualifier2")))); + index++; + count++; + } + // 5 rows should be returned + assertEquals(count,5); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null ) { + table.close(); + } + if (hAdmin != null) { + hAdmin.close(); + } + if (connection != null) { + connection.close(); + } } - // 5 rows should be returned - assertEquals(count,5); //Check if hive returns results correctly driver.run(selectQuery); diff --git a/itests/hive-minikdc/pom.xml b/itests/hive-minikdc/pom.xml index 95d2614901..689e67955d 100644 --- a/itests/hive-minikdc/pom.xml +++ b/itests/hive-minikdc/pom.xml @@ -191,7 +191,7 @@ commons-logging - + org.apache.hbase hbase-server @@ -199,6 +199,12 @@ test + org.apache.hbase + hbase-mapreduce + ${hbase.version} + test + + org.apache.hadoop hadoop-minicluster test diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml index 339a194f2d..f862dac5f6 100644 --- a/itests/hive-unit-hadoop2/pom.xml +++ b/itests/hive-unit-hadoop2/pom.xml @@ -198,6 +198,12 @@ test + org.apache.hbase + hbase-mapreduce + ${hbase.version} + test + + org.apache.hadoop hadoop-minicluster test diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index 14409832c8..ea5b7b9480 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -125,6 +125,18 @@ commmons-logging commons-logging + + org.apache.hadoop + hadoop-mapreduce-client-common + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-common + @@ -227,7 +239,7 @@ commons-logging - + org.apache.hadoop hadoop-mapreduce-client-core @@ -253,6 +265,19 @@ org.apache.hbase + hbase-common + ${hbase.version} + tests + test + + + org.apache.hbase + hbase-procedure + ${hbase.version} + test + + + org.apache.hbase hbase-hadoop-compat ${hbase.version} test-jar @@ -366,6 +391,10 @@ commmons-logging commons-logging + + org.apache.hadoop + hadoop-yarn-server-web-proxy + diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index dbfc23510c..65a1ed110e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -106,6 +107,8 @@ public void setUp() throws Exception { .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); TxnDbUtil.setConfValues(hiveConf); + hiveConf.setInt(MRJobConfig.MAP_MEMORY_MB, 1024); + hiveConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 1024); TxnDbUtil.prepDb(hiveConf); File f = new File(TEST_WAREHOUSE_DIR); if (f.exists()) { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 707bcd10b7..7103fb95b6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1207,7 +1207,7 @@ public void testTableProperties() throws Exception { t.init(stop, looped); t.run(); JobConf job = t.getMrJob(); - Assert.assertEquals("2048", job.get("mapreduce.map.memory.mb")); // 2048 comes from tblproperties + Assert.assertEquals(2048, job.getMemoryForMapTask()); // 2048 comes from tblproperties // Compact ttp1 stop = new AtomicBoolean(true); t = new Worker(); @@ -1217,7 +1217,7 @@ public void testTableProperties() throws Exception { t.init(stop, looped); t.run(); job = t.getMrJob(); - Assert.assertEquals("1024", job.get("mapreduce.map.memory.mb")); // 1024 is the default value + Assert.assertEquals(1024, job.getMemoryForMapTask()); // 1024 is the default value // Clean up runCleaner(conf); rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -1269,7 +1269,7 @@ public void testTableProperties() throws Exception { t.init(stop, looped); t.run(); job = t.getMrJob(); - Assert.assertEquals("3072", job.get("mapreduce.map.memory.mb")); + Assert.assertEquals(3072, job.getMemoryForMapTask()); Assert.assertTrue(job.get("hive.compactor.table.props").contains("orc.compress.size4:8192")); } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java index cabddea5c6..dd24f0261f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java @@ -70,6 +70,11 @@ private static HiveConf createHiveConf() { conf.set("hive.execution.engine", "spark"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.master", "local-cluster[2,2,1024]"); + // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout + // while spark2 is still using Hadoop2. + // Spark requires Hive to support Hadoop3 first then Spark can start + // working on Hadoop3 support. Remove this after Spark supports Hadoop3. + conf.set("dfs.client.datanode-restart.timeout", "30"); return conf; } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java index e3f9646b59..2156f4b4de 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java @@ -79,6 +79,11 @@ private static HiveConf createHiveConf() { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.master", "local-cluster[2,2,1024]"); conf.set("spark.deploy.defaultCores", "2"); + // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout + // while spark2 is still using Hadoop2. + // Spark requires Hive to support Hadoop3 first then Spark can start + // working on Hadoop3 support. Remove this after Spark supports Hadoop3. + conf.set("dfs.client.datanode-restart.timeout", "30"); return conf; } diff --git a/itests/qtest-accumulo/pom.xml b/itests/qtest-accumulo/pom.xml index 40d0a749c8..9d80bceedf 100644 --- a/itests/qtest-accumulo/pom.xml +++ b/itests/qtest-accumulo/pom.xml @@ -299,9 +299,8 @@ org.apache.hbase - hbase-server + hbase-mapreduce ${hbase.version} - tests test @@ -385,9 +384,15 @@ - org.apache.accumulo - accumulo-minicluster - test + org.apache.accumulo + accumulo-minicluster + test + + + commons-beanutils + commons-beanutils-core + + diff --git a/itests/qtest-spark/pom.xml b/itests/qtest-spark/pom.xml index a506f7f081..b9b17b66a5 100644 --- a/itests/qtest-spark/pom.xml +++ b/itests/qtest-spark/pom.xml @@ -34,7 +34,6 @@ OFF - 8.1.14.v20131031 2.21 @@ -67,25 +66,31 @@ org.eclipse.jetty jetty-util - ${spark.jetty.version} + ${jetty.version} test org.eclipse.jetty jetty-security - ${spark.jetty.version} + ${jetty.version} test org.eclipse.jetty jetty-plus - ${spark.jetty.version} + ${jetty.version} test org.eclipse.jetty jetty-server - ${spark.jetty.version} + ${jetty.version} + test + + + org.eclipse.jetty + jetty-servlet + ${jetty.version} test @@ -316,6 +321,12 @@ test + org.apache.hbase + hbase-mapreduce + ${hbase.version} + test + + junit junit ${junit.version} diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml index 02664f3792..7f7d5f3ddf 100644 --- a/itests/qtest/pom.xml +++ b/itests/qtest/pom.xml @@ -328,6 +328,12 @@ test + org.apache.hbase + hbase-mapreduce + ${hbase.version} + test + + org.apache.tez tez-tests ${tez.version} diff --git a/itests/util/pom.xml b/itests/util/pom.xml index e6dc09fb8f..16118b52cf 100644 --- a/itests/util/pom.xml +++ b/itests/util/pom.xml @@ -38,7 +38,7 @@ org.apache.accumulo accumulo-minicluster - + org.slf4j slf4j-log4j12 @@ -46,6 +46,10 @@ commmons-logging commons-logging + + commons-beanutils + commons-beanutils-core + @@ -143,6 +147,11 @@ hbase-server ${hbase.version} + + org.apache.hbase + hbase-mapreduce + ${hbase.version} + junit @@ -170,12 +179,18 @@ commons-logging - + org.apache.hbase hbase-server ${hbase.version} tests + + org.apache.hbase + hbase-mapreduce + ${hbase.version} + tests + diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java index 0cc9a89085..e5d72e0d7d 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java @@ -17,14 +17,12 @@ */ package org.apache.hadoop.hive.hbase; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.QTestUtil; -import java.util.List; - /** * HBaseQTestUtil initializes HBase-specific test fixtures. */ @@ -37,7 +35,7 @@ public static String HBASE_SRC_SNAPSHOT_NAME = "src_hbase_snapshot"; /** A handle to this harness's cluster */ - private final HConnection conn; + private final Connection conn; private HBaseTestSetup hbaseSetup = null; @@ -53,19 +51,6 @@ public HBaseQTestUtil( super.init(); } - /** return true when HBase table snapshot exists, false otherwise. */ - private static boolean hbaseTableSnapshotExists(HBaseAdmin admin, String snapshotName) throws - Exception { - List snapshots = - admin.listSnapshots(".*" + snapshotName + ".*"); - for (HBaseProtos.SnapshotDescription sn : snapshots) { - if (sn.getName().equals(HBASE_SRC_SNAPSHOT_NAME)) { - return true; - } - } - return false; - } - @Override public void init() throws Exception { // defer @@ -93,10 +78,10 @@ public void createSources(String tname) throws Exception { runCmd("INSERT OVERWRITE TABLE " + HBASE_SRC_NAME + " SELECT * FROM src"); // create a snapshot - HBaseAdmin admin = null; + Admin admin = null; try { - admin = new HBaseAdmin(conn.getConfiguration()); - admin.snapshot(HBASE_SRC_SNAPSHOT_NAME, HBASE_SRC_NAME); + admin = conn.getAdmin(); + admin.snapshot(HBASE_SRC_SNAPSHOT_NAME, TableName.valueOf(HBASE_SRC_NAME)); } finally { if (admin != null) admin.close(); } @@ -111,12 +96,10 @@ public void cleanUp(String tname) throws Exception { // drop in case leftover from unsuccessful run db.dropTable(Warehouse.DEFAULT_DATABASE_NAME, HBASE_SRC_NAME); - HBaseAdmin admin = null; + Admin admin = null; try { - admin = new HBaseAdmin(conn.getConfiguration()); - if (hbaseTableSnapshotExists(admin, HBASE_SRC_SNAPSHOT_NAME)) { - admin.deleteSnapshot(HBASE_SRC_SNAPSHOT_NAME); - } + admin = conn.getAdmin(); + admin.deleteSnapshots(HBASE_SRC_SNAPSHOT_NAME); } finally { if (admin != null) admin.close(); } diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java index 4f8fa05925..5db44d2bc0 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java @@ -28,12 +28,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.JobConf; @@ -48,11 +48,11 @@ private MiniHBaseCluster hbaseCluster; private int zooKeeperPort; private String hbaseRoot; - private HConnection hbaseConn; + private Connection hbaseConn; private static final int NUM_REGIONSERVERS = 1; - public HConnection getConnection() { + public Connection getConnection() { return this.hbaseConn; } @@ -94,12 +94,15 @@ private void setUpFixtures(HiveConf conf) throws Exception { hbaseConf.setInt("hbase.master.info.port", -1); hbaseConf.setInt("hbase.regionserver.port", findFreePort()); hbaseConf.setInt("hbase.regionserver.info.port", -1); + // Fix needed due to dependency for hbase-mapreduce module + System.setProperty("org.apache.hadoop.hbase.shaded.io.netty.packagePrefix", + "org.apache.hadoop.hbase.shaded."); hbaseCluster = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS); conf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort()); - hbaseConn = HConnectionManager.createConnection(hbaseConf); + hbaseConn = ConnectionFactory.createConnection(hbaseConf); // opening the META table ensures that cluster is running - HTableInterface meta = null; + Table meta = null; try { meta = hbaseConn.getTable(TableName.META_TABLE_NAME); } finally { @@ -110,7 +113,7 @@ private void setUpFixtures(HiveConf conf) throws Exception { private void createHBaseTable() throws IOException { final String HBASE_TABLE_NAME = "HiveExternalTable"; - HTableDescriptor htableDesc = new HTableDescriptor(HBASE_TABLE_NAME.getBytes()); + HTableDescriptor htableDesc = new HTableDescriptor(TableName.valueOf(HBASE_TABLE_NAME)); HColumnDescriptor hcolDesc = new HColumnDescriptor("cf".getBytes()); htableDesc.addFamily(hcolDesc); @@ -123,16 +126,16 @@ private void createHBaseTable() throws IOException { float [] floats = new float [] { Float.MIN_VALUE, -1.0F, Float.MAX_VALUE }; double [] doubles = new double [] { Double.MIN_VALUE, -1.0, Double.MAX_VALUE }; - HBaseAdmin hbaseAdmin = null; - HTableInterface htable = null; + Admin hbaseAdmin = null; + Table htable = null; try { - hbaseAdmin = new HBaseAdmin(hbaseConn.getConfiguration()); + hbaseAdmin = hbaseConn.getAdmin(); if (Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)) { // if table is already in there, don't recreate. return; } hbaseAdmin.createTable(htableDesc); - htable = hbaseConn.getTable(HBASE_TABLE_NAME); + htable = hbaseConn.getTable(TableName.valueOf(HBASE_TABLE_NAME)); // data Put[] puts = new Put[]{ @@ -140,14 +143,14 @@ private void createHBaseTable() throws IOException { // store data for (int i = 0; i < puts.length; i++) { - puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i])); - puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte[]{bytes[i]}); - puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i])); - puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i])); - puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i])); - puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i])); - puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i])); - puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i])); + puts[i].addColumn("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i])); + puts[i].addColumn("cf".getBytes(), "cq-byte".getBytes(), new byte[]{bytes[i]}); + puts[i].addColumn("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i])); + puts[i].addColumn("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i])); + puts[i].addColumn("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i])); + puts[i].addColumn("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i])); + puts[i].addColumn("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i])); + puts[i].addColumn("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i])); htable.put(puts[i]); } @@ -170,7 +173,6 @@ public void tearDown() throws Exception { hbaseConn = null; } if (hbaseCluster != null) { - HConnectionManager.deleteAllConnections(true); hbaseCluster.shutdown(); hbaseCluster = null; } diff --git a/llap-server/pom.xml b/llap-server/pom.xml index 47a04cc310..176110d014 100644 --- a/llap-server/pom.xml +++ b/llap-server/pom.xml @@ -82,8 +82,8 @@ io.netty - netty - 3.6.2.Final + netty-all + ${netty.version} org.apache.avro @@ -153,6 +153,18 @@ commmons-logging commons-logging + + io.netty + netty-all + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.hadoop + hadoop-yarn-api + @@ -169,6 +181,14 @@ commmons-logging commons-logging + + io.netty + netty-all + + + org.apache.hadoop + hadoop-yarn-api + @@ -234,6 +254,16 @@ org.codehaus.jettison jettison + + org.eclipse.jetty + jetty-server + ${jetty.version} + + + org.eclipse.jetty + jetty-util + ${jetty.version} + @@ -266,6 +296,12 @@ hadoop-hdfs ${hadoop.version} test + + + io.netty + netty-all + + org.apache.hive @@ -279,6 +315,12 @@ ${hadoop.version} tests test + + + io.netty + netty-all + + junit @@ -326,6 +368,21 @@ org.apache.hbase + hbase-mapreduce + ${hbase.version} + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.hbase hbase-common ${hbase.version} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index 6b08da66b6..51fc1c564e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -698,9 +698,9 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) } // Check whether the shuffle version is compatible if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( - request.getHeader(ShuffleHeader.HTTP_HEADER_NAME)) + request.headers().get(ShuffleHeader.HTTP_HEADER_NAME)) || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( - request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) { + request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) { sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST); } final Map> q = @@ -904,12 +904,12 @@ protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) { if (!connectionKeepAliveEnabled && !keepAliveParam) { LOG.info("Setting connection close header..."); - response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE); + response.headers().add(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE); } else { - response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, + response.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength)); - response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + response.headers().add(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + response.headers().add(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut); LOG.debug("Content Length in shuffle : " + contentLength); } @@ -937,7 +937,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri); // hash from the fetcher String urlHashStr = - request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH); + request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH); if (urlHashStr == null) { LOG.info("Missing header hash for " + appid); throw new IOException("fetcher cannot be authenticated"); @@ -953,11 +953,11 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), tokenSecret); - response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); + response.headers().add(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); // Put shuffle version into http header - response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, + response.headers().add(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); - response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, + response.headers().add(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); if (LOG.isDebugEnabled()) { int len = reply.length(); @@ -1025,11 +1025,11 @@ protected void sendError(ChannelHandlerContext ctx, protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.headers().add(CONTENT_TYPE, "text/plain; charset=UTF-8"); // Put shuffle version into http header - response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, + response.headers().add(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); - response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, + response.headers().add(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); response.setContent( ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); diff --git a/metastore/pom.xml b/metastore/pom.xml index 04c6f47879..c8f919c684 100644 --- a/metastore/pom.xml +++ b/metastore/pom.xml @@ -155,7 +155,7 @@ org.apache.hadoop - hadoop-hdfs + hadoop-hdfs-client ${hadoop.version} true diff --git a/pom.xml b/pom.xml index 006e8f8611..dc31bd5107 100644 --- a/pom.xml +++ b/pom.xml @@ -144,10 +144,10 @@ 14.0.1 2.4.11 1.3.166 - 2.8.1 + 3.0.0-beta1 ${basedir}/${hive.path.to.root}/testutils/hadoop 1.3 - 1.1.1 + 2.0.0-alpha3 3.3.0 2.6.1 @@ -194,7 +194,7 @@ 1.7.10 4.0.4 3.0.0-SNAPSHOT - 0.9.0 + 0.9.1-SNAPSHOT 0.92.0-incubating 2.2.0 2.0.0 @@ -754,6 +754,10 @@ commmons-logging commons-logging + + com.codahale.metrics + metrics-core + @@ -835,6 +839,68 @@ org.apache.hbase hbase-hadoop2-compat ${hbase.version} + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + org.mortbay.jetty + jsp-2.1 + + + org.mortbay.jetty + jsp-api-2.1 + + + org.mortbay.jetty + servlet-api-2.5 + + + org.mortbay.jetty + servlet-api-2.5 + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + com.codahale.metrics + metrics-core + + org.apache.hbase @@ -842,6 +908,11 @@ ${hbase.version} + org.apache.hbase + hbase-mapreduce + ${hbase.version} + + org.apache.hadoop hadoop-minicluster ${hadoop.version} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java index 19b97e4cf2..dc83a4b040 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; @@ -101,7 +102,7 @@ private final BytesRefArrayWritable patialS = new BytesRefArrayWritable(); private byte[][] bytesArray; private BytesRefArrayWritable s; - + private int numRepeat = 1000; @Before public void setup() throws Exception { conf = new Configuration(); @@ -143,6 +144,8 @@ public void setup() throws Exception { // LazyString has no so-called NULL sequence. The value is empty string if not. patialS.set(7, new BytesRefWritable("".getBytes("UTF-8"))); + numRepeat = (int) Math.ceil((double)SequenceFile.SYNC_INTERVAL / (double)bytesArray.length); + } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } @@ -659,24 +662,24 @@ public void testSync() throws IOException { } private void splitBeforeSync() throws IOException { - writeThenReadByRecordReader(600, 1000, 2, 1, null); + writeThenReadByRecordReader(600, numRepeat, 2, 1, null); } private void splitRightBeforeSync() throws IOException { - writeThenReadByRecordReader(500, 1000, 2, 17750, null); + writeThenReadByRecordReader(500, numRepeat, 2, 17750, null); } private void splitInMiddleOfSync() throws IOException { - writeThenReadByRecordReader(500, 1000, 2, 17760, null); + writeThenReadByRecordReader(500, numRepeat, 2, 17760, null); } private void splitRightAfterSync() throws IOException { - writeThenReadByRecordReader(500, 1000, 2, 17770, null); + writeThenReadByRecordReader(500, numRepeat, 2, 17770, null); } private void splitAfterSync() throws IOException { - writeThenReadByRecordReader(500, 1000, 2, 19950, null); + writeThenReadByRecordReader(500, numRepeat, 2, 19950, null); } private void writeThenReadByRecordReader(int intervalRecordCount, @@ -711,7 +714,7 @@ private void writeThenReadByRecordReader(int intervalRecordCount, jonconf.set("mapred.input.dir", testDir.toString()); HiveConf.setLongVar(jonconf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, minSplitSize); InputSplit[] splits = inputFormat.getSplits(jonconf, splitNumber); - assertEquals("splits length should be " + splitNumber, splits.length, splitNumber); + assertEquals("splits length should be " + splitNumber, splitNumber, splits.length); int readCount = 0; for (int i = 0; i < splits.length; i++) { int previousReadCount = readCount; diff --git a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out index b582471afd..db0ab8d169 100644 --- a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out +++ b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out @@ -19,4 +19,4 @@ POSTHOOK: type: LOAD #### A masked pattern was here #### POSTHOOK: Output: default@exim_department #### A masked pattern was here #### -FAILED: SemanticException [Error 10320]: Error while performing IO operation : No FileSystem for scheme: nosuchschema +FAILED: SemanticException [Error 10320]: Error while performing IO operation : No FileSystem for scheme "nosuchschema" diff --git a/ql/src/test/results/clientnegative/external1.q.out b/ql/src/test/results/clientnegative/external1.q.out index 661d669206..f2bc9c6ea2 100644 --- a/ql/src/test/results/clientnegative/external1.q.out +++ b/ql/src/test/results/clientnegative/external1.q.out @@ -3,4 +3,4 @@ PREHOOK: type: CREATETABLE #### A masked pattern was here #### PREHOOK: Output: database:default PREHOOK: Output: default@external1 -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.io.IOException: No FileSystem for scheme: invalidscheme +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "invalidscheme" diff --git a/ql/src/test/results/clientnegative/external2.q.out b/ql/src/test/results/clientnegative/external2.q.out index eb5518c017..05ddc28820 100644 --- a/ql/src/test/results/clientnegative/external2.q.out +++ b/ql/src/test/results/clientnegative/external2.q.out @@ -10,4 +10,4 @@ POSTHOOK: Output: default@external2 PREHOOK: type: ALTERTABLE_ADDPARTS #### A masked pattern was here #### PREHOOK: Output: default@external2 -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.io.IOException: No FileSystem for scheme: invalidscheme +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "invalidscheme" diff --git a/serde/pom.xml b/serde/pom.xml index 7419cfb17a..0247c32452 100644 --- a/serde/pom.xml +++ b/serde/pom.xml @@ -152,6 +152,12 @@ org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + test + + + org.apache.hadoop hadoop-hdfs ${hadoop.version} test diff --git a/shims/0.23/pom.xml b/shims/0.23/pom.xml index 3ff1d38776..61cf459a35 100644 --- a/shims/0.23/pom.xml +++ b/shims/0.23/pom.xml @@ -64,6 +64,12 @@ org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + true + + + org.apache.hadoop hadoop-hdfs ${hadoop.version} true @@ -198,6 +204,12 @@ ${hadoop.version} true test-jar + + + com.codahale.metrics + metrics-core + + org.apache.hadoop diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index e9445eb11d..1f86e7678b 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -296,8 +296,8 @@ public MiniMrShim(Configuration conf, int numberOfTaskTrackers, JobConf jConf = new JobConf(conf); jConf.set("yarn.scheduler.capacity.root.queues", "default"); jConf.set("yarn.scheduler.capacity.root.default.capacity", "100"); - jConf.setInt(MRJobConfig.MAP_MEMORY_MB, 128); - jConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 128); + jConf.setInt(MRJobConfig.MAP_MEMORY_MB, 512); + jConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 512); jConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 128); jConf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, 512); jConf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); @@ -329,8 +329,8 @@ public void setupConfiguration(Configuration conf) { for (Map.Entry pair: jConf) { conf.set(pair.getKey(), pair.getValue()); } - conf.setInt(MRJobConfig.MAP_MEMORY_MB, 128); - conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 128); + conf.setInt(MRJobConfig.MAP_MEMORY_MB, 512); + conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 512); conf.setInt(MRJobConfig.MR_AM_VMEM_MB, 128); } } @@ -1128,10 +1128,11 @@ public Boolean run() throws Exception { @Override public boolean runDistCp(List srcPaths, Path dst, Configuration conf) throws IOException { - DistCpOptions options = new DistCpOptions(srcPaths, dst); - options.setSyncFolder(true); - options.setSkipCRC(true); - options.preserve(FileAttribute.BLOCKSIZE); + DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst) + .withSyncFolder(true) + .withCRC(true) + .preserve(FileAttribute.BLOCKSIZE) + .build(); // Creates the command-line parameters for distcp List params = constructDistCpParams(srcPaths, dst, conf); @@ -1207,18 +1208,24 @@ public boolean isPathEncrypted(Path path) throws IOException { if(!"hdfs".equalsIgnoreCase(path.toUri().getScheme())) { return false; } - try { - return (hdfsAdmin.getEncryptionZoneForPath(fullPath) != null); - } catch (FileNotFoundException fnfe) { - LOG.debug("Failed to get EZ for non-existent path: "+ fullPath, fnfe); - return false; - } + + return (getEncryptionZoneForPath(fullPath) != null); + } + + private EncryptionZone getEncryptionZoneForPath(Path path) throws IOException { + if (path.getFileSystem(conf).exists(path)) { + return hdfsAdmin.getEncryptionZoneForPath(path); + } else if (!path.getParent().equals(path)) { + return getEncryptionZoneForPath(path.getParent()); + } else { + return null; + } } @Override public boolean arePathsOnSameEncryptionZone(Path path1, Path path2) throws IOException { - return equivalentEncryptionZones(hdfsAdmin.getEncryptionZoneForPath(path1), - hdfsAdmin.getEncryptionZoneForPath(path2)); + return equivalentEncryptionZones(getEncryptionZoneForPath(path1), + getEncryptionZoneForPath(path2)); } private boolean equivalentEncryptionZones(EncryptionZone zone1, EncryptionZone zone2) { @@ -1256,8 +1263,8 @@ public boolean arePathsOnSameEncryptionZone(Path path1, Path path2, public int comparePathKeyStrength(Path path1, Path path2) throws IOException { EncryptionZone zone1, zone2; - zone1 = hdfsAdmin.getEncryptionZoneForPath(path1); - zone2 = hdfsAdmin.getEncryptionZoneForPath(path2); + zone1 = getEncryptionZoneForPath(path1); + zone2 = getEncryptionZoneForPath(path2); if (zone1 == null && zone2 == null) { return 0; diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java index 2c37a51cf4..a82b2f0564 100644 --- a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java +++ b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java @@ -23,6 +23,7 @@ import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Shell; @@ -181,6 +182,12 @@ public boolean rename(Path src, Path dst) throws IOException { } @Override + protected void rename(Path src, Path dst, Rename... options) + throws IOException { + super.rename(swizzleParamPath(src), swizzleParamPath(dst), options); + } + + @Override public boolean delete(Path f, boolean recursive) throws IOException { return super.delete(swizzleParamPath(f), recursive); } @@ -264,6 +271,11 @@ public ContentSummary getContentSummary(Path f) throws IOException { } @Override + public FileStatus getFileLinkStatus(Path f) throws IOException { + return swizzleFileStatus(super.getFileLinkStatus(swizzleParamPath(f)), false); + } + + @Override public FileStatus getFileStatus(Path f) throws IOException { return swizzleFileStatus(super.getFileStatus(swizzleParamPath(f)), false); } diff --git a/shims/scheduler/pom.xml b/shims/scheduler/pom.xml index 0eadb69435..076e2eed42 100644 --- a/shims/scheduler/pom.xml +++ b/shims/scheduler/pom.xml @@ -87,8 +87,14 @@ org.apache.hadoop hadoop-yarn-server-tests ${hadoop.version} - true + true test-jar + + + com.codahale.metrics + metrics-core + + diff --git a/spark-client/pom.xml b/spark-client/pom.xml index 784d908e4a..b38c6fa7a4 100644 --- a/spark-client/pom.xml +++ b/spark-client/pom.xml @@ -85,6 +85,10 @@ commmons-logging commons-logging + + com.fasterxml.jackson.core + jackson-databind + diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml index 8df622f179..3800d3eb9a 100644 --- a/standalone-metastore/pom.xml +++ b/standalone-metastore/pom.xml @@ -62,7 +62,7 @@ 0.1.2 3.1.0 14.0.1 - 2.8.0 + 3.0.0-beta1 2.6.1 2.6.5 5.5.1 @@ -190,6 +190,22 @@ + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hive diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java index c10e36f94a..ccb2086f2f 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,10 +148,11 @@ public Boolean run() throws Exception { public static boolean runDistCp(List srcPaths, Path dst, Configuration conf) throws IOException { - DistCpOptions options = new DistCpOptions(srcPaths, dst); - options.setSyncFolder(true); - options.setSkipCRC(true); - options.preserve(DistCpOptions.FileAttribute.BLOCKSIZE); + DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst) + .withSyncFolder(true) + .withCRC(true) + .preserve(FileAttribute.BLOCKSIZE) + .build(); // Creates the command-line parameters for distcp List params = constructDistCpParams(srcPaths, dst, conf); diff --git a/testutils/ptest2/src/main/resources/batch-exec.vm b/testutils/ptest2/src/main/resources/batch-exec.vm index 2d16ca3e8a..4ff74f4c1e 100644 --- a/testutils/ptest2/src/main/resources/batch-exec.vm +++ b/testutils/ptest2/src/main/resources/batch-exec.vm @@ -35,7 +35,7 @@ then export PATH=$JAVA_HOME/bin/:$PATH fi export ANT_OPTS="-Xmx1g -XX:MaxPermSize=256m -Djava.io.tmpdir=$logDir/tmp ${antEnvOpts}" -export MAVEN_OPTS="-Xmx256m -Djava.io.tmpdir=$logDir/tmp ${mavenEnvOpts}" +export MAVEN_OPTS="-Xmx1g -Djava.io.tmpdir=$logDir/tmp ${mavenEnvOpts}" export HADOOP_ROOT_LOGGER=INFO,console export HADOOP_OPTS="-Dhive.log.dir=$logDir -Dhive.query.id=hadoop -Djava.io.tmpdir=$logDir/tmp" cd $localDir/$instanceName/${repositoryName}-source || exit 1