diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java index 2b001f8..4bdb7c3 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java @@ -82,6 +82,8 @@ public class HCatLoader extends HCatBaseLoader { @Override public void setLocation(String location, Job job) throws IOException { + HCatContext.setupHCatContext(job.getConfiguration()).getConf().get() + .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true); UDFContext udfContext = UDFContext.getUDFContext(); Properties udfProps = udfContext.getUDFProperties(this.getClass(), @@ -185,9 +187,8 @@ public class HCatLoader extends HCatBaseLoader { @Override public ResourceSchema getSchema(String location, Job job) throws IOException { - HCatContext.getInstance().mergeConf(job.getConfiguration()); - HCatContext.getInstance().getConf().setBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true); + HCatContext.setupHCatContext(job.getConfiguration()).getConf().get() + .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true); Table table = phutil.getTable(location, hcatServerUri != null ? hcatServerUri : PigHCatUtil.getHCatServerUri(job), diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java index 65e799a..848b2fb 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java @@ -77,9 +77,8 @@ public class HCatStorer extends HCatBaseStorer { @Override public void setStoreLocation(String location, Job job) throws IOException { - HCatContext.getInstance().mergeConf(job.getConfiguration()); - HCatContext.getInstance().getConf().setBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false); + HCatContext.setupHCatContext(job.getConfiguration()).getConf().get() + .setBoolean(HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false); Configuration config = job.getConfiguration(); config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign); diff --git src/java/org/apache/hcatalog/common/HCatContext.java src/java/org/apache/hcatalog/common/HCatContext.java index f19489c..1c40b5e 100644 --- src/java/org/apache/hcatalog/common/HCatContext.java +++ src/java/org/apache/hcatalog/common/HCatContext.java @@ -18,37 +18,54 @@ package org.apache.hcatalog.common; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import java.util.Map; /** - * HCatContext provides global access to configuration data. + * HCatContext provides global access to configuration data. It uses a reference to the + * job configuration so that settings are automatically passed to the backend by the + * MR framework. */ public class HCatContext { private static final HCatContext hCatContext = new HCatContext(); - private final Configuration conf; + private Configuration conf = null; private HCatContext() { - conf = new Configuration(); } - public static HCatContext getInstance() { + /** + * Setup the HCatContext as a reference to the given configuration. Keys + * exclusive to an existing config are set in the new conf. + */ + public static synchronized HCatContext setupHCatContext(Configuration newConf) { + Preconditions.checkNotNull(newConf, "HCatContext must not have a null conf."); + + if (hCatContext.conf == null) { + hCatContext.conf = newConf; + return hCatContext; + } + + if (hCatContext.conf != newConf) { + for (Map.Entry entry : hCatContext.conf) { + if (newConf.get(entry.getKey()) == null) { + newConf.set(entry.getKey(), entry.getValue()); + } + } + hCatContext.conf = newConf; + } return hCatContext; } - public Configuration getConf() { - return conf; + public static HCatContext getInstance() { + return hCatContext; } - /** - * Merge the given configuration into the HCatContext conf, overwriting any existing keys. - */ - public void mergeConf(Configuration conf) { - for (Map.Entry entry : conf) { - this.conf.set(entry.getKey(), entry.getValue()); - } + public Optional getConf() { + return Optional.fromNullable(conf); } } diff --git src/java/org/apache/hcatalog/data/HCatRecordSerDe.java src/java/org/apache/hcatalog/data/HCatRecordSerDe.java index ca204c3..468a1f5 100644 --- src/java/org/apache/hcatalog/data/HCatRecordSerDe.java +++ src/java/org/apache/hcatalog/data/HCatRecordSerDe.java @@ -188,24 +188,7 @@ public class HCatRecordSerDe implements SerDe { Object res; if (fieldObjectInspector.getCategory() == Category.PRIMITIVE) { - if (field != null && field instanceof Boolean && - HCatContext.getInstance().getConf().getBoolean( - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) { - res = ((Boolean) field) ? 1 : 0; - } else if (field != null && field instanceof Short && - HCatContext.getInstance().getConf().getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) { - res = new Integer((Short) field); - } else if (field != null && field instanceof Byte && - HCatContext.getInstance().getConf().getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) { - res = new Integer((Byte) field); - } else { - res = ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveJavaObject(field); - } + res = serializePrimitiveField(field, fieldObjectInspector); } else if (fieldObjectInspector.getCategory() == Category.STRUCT) { res = serializeStruct(field, (StructObjectInspector) fieldObjectInspector); } else if (fieldObjectInspector.getCategory() == Category.LIST) { @@ -279,6 +262,32 @@ public class HCatRecordSerDe implements SerDe { } } + private static Object serializePrimitiveField(Object field, + ObjectInspector fieldObjectInspector) { + + if (field != null && HCatContext.getInstance().getConf().isPresent()) { + Configuration conf = HCatContext.getInstance().getConf().get(); + + if (field instanceof Boolean && + conf.getBoolean( + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) { + return ((Boolean) field) ? 1 : 0; + } else if (field instanceof Short && + conf.getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) { + return new Integer((Short) field); + } else if (field instanceof Byte && + conf.getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) { + return new Integer((Byte) field); + } + } + + return ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveJavaObject(field); + } /** * Return an object inspector that can read through the object diff --git src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java index 38a5fa9..b4060e9 100644 --- src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java +++ src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java @@ -139,14 +139,17 @@ public class HCatSchemaUtils { private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) { switch (((PrimitiveTypeInfo) basePrimitiveTypeInfo).getPrimitiveCategory()) { case BOOLEAN: - return HCatContext.getInstance().getConf().getBoolean( - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT) ? + return (HCatContext.getInstance().getConf().isPresent() && + HCatContext.getInstance().getConf().get().getBoolean( + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) ? Type.INT : Type.BOOLEAN; case BYTE: - return HCatContext.getInstance().getConf().getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT) ? Type.INT : Type.TINYINT; + return (HCatContext.getInstance().getConf().isPresent() && + HCatContext.getInstance().getConf().get().getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) ? + Type.INT : Type.TINYINT; case DOUBLE: return Type.DOUBLE; case FLOAT: @@ -156,9 +159,10 @@ public class HCatSchemaUtils { case LONG: return Type.BIGINT; case SHORT: - return HCatContext.getInstance().getConf().getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT) ? + return (HCatContext.getInstance().getConf().isPresent() && + HCatContext.getInstance().getConf().get().getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) ? Type.INT : Type.SMALLINT; case STRING: return Type.STRING;