diff --git common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java index 26528f9..0a0e5d0 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java +++ common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java @@ -18,10 +18,23 @@ package org.apache.hadoop.hive.common; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.Arrays; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public class ValidTxnListImpl implements ValidTxnList { + static final private Log LOG = LogFactory.getLog(ValidTxnListImpl.class.getName()); + final static private int MAX_UNCOMPRESSED_LENGTH = 256; + final static private char COMPRESSION_MARKER = 'C'; + final static private String STRING_ENCODING = "ISO-8859-1"; + private long[] exceptions; private long highWatermark; @@ -95,7 +108,25 @@ public String writeToString() { buf.append(except); } } - return buf.toString(); + if (buf.length() > MAX_UNCOMPRESSED_LENGTH) { + try { + ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); + GZIPOutputStream gzip = new GZIPOutputStream(byteBuf); + gzip.write(buf.toString().getBytes()); + gzip.close(); + StringBuilder buf2 = new StringBuilder(); + buf2.append(COMPRESSION_MARKER); + buf2.append(buf.length()); + buf2.append(':'); + buf2.append(byteBuf.toString(STRING_ENCODING)); + return buf2.toString(); + } catch (IOException e) { + LOG.error("Unable to compress transaction list, " + e.getMessage()); + throw new RuntimeException(e); + } + } else { + return buf.toString(); + } } @Override @@ -104,11 +135,36 @@ public void readFromString(String src) { highWatermark = Long.MAX_VALUE; exceptions = new long[0]; } else { - String[] values = src.split(":"); + String[] values; + if (src.charAt(0) == COMPRESSION_MARKER) { + try { + int colon = src.indexOf(':'); + int len = Integer.valueOf(src.substring(1, colon)); + ByteArrayInputStream byteBuf = + new ByteArrayInputStream(src.substring(colon + 1).getBytes(STRING_ENCODING)); + GZIPInputStream gzip = new GZIPInputStream(byteBuf); + byte[] buf = new byte[len]; + int bytesRead = 0; + int offset = 0; + int maxReadLen = len; + do { + bytesRead = gzip.read(buf, offset, maxReadLen); + offset += bytesRead; + maxReadLen -= bytesRead; + } while (maxReadLen > 0); + values = new String(buf).split(":"); + } catch (IOException e) { + LOG.error("Unable to decode compressed transaction list, " + e.getMessage()); + throw new RuntimeException(e); + } + + } else { + values = src.split(":"); + } highWatermark = Long.parseLong(values[0]); exceptions = new long[values.length - 1]; - for(int i = 1; i < values.length; ++i) { - exceptions[i-1] = Long.parseLong(values[i]); + for (int i = 1; i < values.length; ++i) { + exceptions[i - 1] = Long.parseLong(values[i]); } } } diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 018c603..a4fafd4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -610,6 +610,10 @@ "transform function (the custom mapper/reducer that the user has specified in the query)"), HIVESCRIPTTRUNCATEENV("hive.script.operator.truncate.env", false, "Truncate each environment variable for external script in scripts operator to 20KB (to fit system limits)"), + HIVESCRIPT_ENV_BLACKLIST("hive.script.operator.env.blacklist", + "hive.txn.valid.txns,hive.script.operator.env.blacklist", + "Comma separated list of keys from the configuration file not to convert to environment " + + "variables when envoking the script operator"), HIVEMAPREDMODE("hive.mapred.mode", "nonstrict", "The mode in which the Hive operations are being performed. \n" + "In strict mode, some risky queries are not allowed to run. They include:\n" + diff --git common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java new file mode 100644 index 0000000..7a4539d --- /dev/null +++ common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; + +/** + * Tests for {@link org.apache.hadoop.hive.common.ValidTxnListImpl} + */ +public class TestValidTxnImpl { + + @Test + public void noExceptions() throws Exception { + ValidTxnList txnList = new ValidTxnListImpl(new long[0], 1); + String str = txnList.writeToString(); + Assert.assertEquals("1:", str); + ValidTxnList newList = new ValidTxnListImpl(); + newList.readFromString(str); + Assert.assertTrue(newList.isTxnCommitted(1)); + Assert.assertFalse(newList.isTxnCommitted(2)); + } + + @Test + public void exceptions() throws Exception { + ValidTxnList txnList = new ValidTxnListImpl(new long[]{2L,4L}, 5); + String str = txnList.writeToString(); + Assert.assertEquals("5:2:4", str); + ValidTxnList newList = new ValidTxnListImpl(); + newList.readFromString(str); + Assert.assertTrue(newList.isTxnCommitted(1)); + Assert.assertFalse(newList.isTxnCommitted(2)); + Assert.assertTrue(newList.isTxnCommitted(3)); + Assert.assertFalse(newList.isTxnCommitted(4)); + Assert.assertTrue(newList.isTxnCommitted(5)); + Assert.assertFalse(newList.isTxnCommitted(6)); + } + + @Test + public void longEnoughToCompress() throws Exception { + long[] exceptions = new long[1000]; + for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; + ValidTxnList txnList = new ValidTxnListImpl(exceptions, 2000); + String str = txnList.writeToString(); + Assert.assertEquals('C', str.charAt(0)); + ValidTxnList newList = new ValidTxnListImpl(); + newList.readFromString(str); + for (int i = 0; i < 100; i++) Assert.assertTrue(newList.isTxnCommitted(i)); + for (int i = 100; i < 1100; i++) Assert.assertFalse(newList.isTxnCommitted(i)); + for (int i = 1100; i < 2001; i++) Assert.assertTrue(newList.isTxnCommitted(i)); + Assert.assertFalse(newList.isTxnCommitted(2001)); + } + + @Test + public void readWriteConfig() throws Exception { + long[] exceptions = new long[1000]; + for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; + ValidTxnList txnList = new ValidTxnListImpl(exceptions, 2000); + String str = txnList.writeToString(); + Assert.assertEquals('C', str.charAt(0)); + Configuration conf = new Configuration(); + conf.set(ValidTxnList.VALID_TXNS_KEY, str); + File tmpFile = File.createTempFile("TestValidTxnImpl", "readWriteConfig"); + DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpFile)); + conf.write(out); + out.close(); + DataInputStream in = new DataInputStream(new FileInputStream(tmpFile)); + Configuration newConf = new Configuration(); + newConf.readFields(in); + Assert.assertEquals(str, newConf.get(ValidTxnList.VALID_TXNS_KEY)); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index 8228e09..632ec6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -27,8 +27,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit; @@ -82,6 +84,8 @@ transient Deserializer scriptOutputDeserializer; transient volatile Throwable scriptError = null; transient RecordWriter scriptOutWriter = null; + // List of conf entries not to turn into env vars + transient Set blackListedConfEntries = null; static final String IO_EXCEPTION_BROKEN_PIPE_STRING = "Broken pipe"; static final String IO_EXCEPTION_STREAM_CLOSED = "Stream closed"; @@ -119,7 +123,8 @@ String safeEnvVarName(String name) { /** * Most UNIX implementations impose some limit on the total size of environment variables and - * size of strings. To fit in this limit we need sometimes to truncate strings. + * size of strings. To fit in this limit we need sometimes to truncate strings. Also, + * some values tend be long and are meaningless to scripts, so strain them out. * @param value environment variable value to check * @param name name of variable (used only for logging purposes) * @param truncate truncate value or not @@ -138,6 +143,21 @@ String safeEnvVarValue(String value, String name, boolean truncate) { return value; } + boolean blackListed(String name) { + if (blackListedConfEntries == null) { + blackListedConfEntries = new HashSet(); + String bl = hconf.get(HiveConf.ConfVars.HIVESCRIPT_ENV_BLACKLIST.toString()); + if (bl != null && bl.length() > 0) { + String[] bls = bl.split(","); + for (String b : bls) { + b.replaceAll(".", "_"); + blackListedConfEntries.add(b); + } + } + } + return blackListedConfEntries.contains(name); + } + /** * addJobConfToEnvironment is mostly shamelessly copied from hadoop streaming. Added additional * check on environment variable length @@ -147,13 +167,16 @@ void addJobConfToEnvironment(Configuration conf, Map env) { while (it.hasNext()) { Map.Entry en = it.next(); String name = en.getKey(); - // String value = (String)en.getValue(); // does not apply variable - // expansion - String value = conf.get(name); // does variable expansion - name = safeEnvVarName(name); - boolean truncate = conf.getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false); - value = safeEnvVarValue(value, name, truncate); - env.put(name, value); + if (!blackListed(name)) { + // String value = (String)en.getValue(); // does not apply variable + // expansion + String value = conf.get(name); // does variable expansion + name = safeEnvVarName(name); + boolean truncate = conf + .getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false); + value = safeEnvVarValue(value, name, truncate); + env.put(name, value); + } } } diff --git ql/src/test/queries/clientpositive/transform_acid.q ql/src/test/queries/clientpositive/transform_acid.q new file mode 100644 index 0000000..4cb9e38 --- /dev/null +++ ql/src/test/queries/clientpositive/transform_acid.q @@ -0,0 +1,13 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.enforce.bucketing=true; + +-- EXCLUDE_OS_WINDOWS + +create table transform_acid(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); +insert into table transform_acid select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 1; + + +ADD FILE ../../ql/src/test/scripts/transform_acid_grep.sh; + +SELECT transform(*) USING 'transform_acid_grep.sh' AS (col string) FROM transform_acid; diff --git ql/src/test/results/clientpositive/transform_acid.q.out ql/src/test/results/clientpositive/transform_acid.q.out new file mode 100644 index 0000000..704a261 --- /dev/null +++ ql/src/test/results/clientpositive/transform_acid.q.out @@ -0,0 +1,31 @@ +PREHOOK: query: -- EXCLUDE_OS_WINDOWS + +create table transform_acid(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@transform_acid +POSTHOOK: query: -- EXCLUDE_OS_WINDOWS + +create table transform_acid(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@transform_acid +PREHOOK: query: insert into table transform_acid select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@transform_acid +POSTHOOK: query: insert into table transform_acid select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@transform_acid +POSTHOOK: Lineage: transform_acid.a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: transform_acid.b EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: SELECT transform(*) USING 'transform_acid_grep.sh' AS (col string) FROM transform_acid +PREHOOK: type: QUERY +PREHOOK: Input: default@transform_acid +#### A masked pattern was here #### +POSTHOOK: query: SELECT transform(*) USING 'transform_acid_grep.sh' AS (col string) FROM transform_acid +POSTHOOK: type: QUERY +POSTHOOK: Input: default@transform_acid +#### A masked pattern was here #### +a diff --git ql/src/test/scripts/transform_acid_grep.sh ql/src/test/scripts/transform_acid_grep.sh new file mode 100644 index 0000000..2b2001a --- /dev/null +++ ql/src/test/scripts/transform_acid_grep.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +env | grep hive.txn.valid.txns +echo a +exit 0 +