diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 6219ae4..794780e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -102,6 +102,8 @@ import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; +import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer; @@ -305,14 +307,21 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration switch (edgeType) { case BROADCAST_EDGE: UnorderedUnpartitionedKVEdgeConfigurer et1Conf = UnorderedUnpartitionedKVEdgeConfigurer - .newBuilder(keyClass, valClass).setFromConfiguration(conf).build(); + .newBuilder(keyClass, valClass) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName()) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName()) + .build(); return et1Conf.createDefaultBroadcastEdgeProperty(); case CUSTOM_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); UnorderedPartitionedKVEdgeConfigurer et2Conf = UnorderedPartitionedKVEdgeConfigurer .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName()) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName()) + .build(); EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = @@ -327,7 +336,10 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration partitionerConf = createPartitionerConf(partitionerClassName, conf); UnorderedPartitionedKVEdgeConfigurer et3Conf = UnorderedPartitionedKVEdgeConfigurer .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName()) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName()) + .build(); return et3Conf.createDefaultEdgeProperty(); case SIMPLE_EDGE: default: @@ -335,7 +347,11 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration partitionerConf = createPartitionerConf(partitionerClassName, conf); OrderedPartitionedKVEdgeConfigurer et4Conf = OrderedPartitionedKVEdgeConfigurer .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), + TezBytesComparator.class.getName()) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName()) + .build(); return et4Conf.createDefaultEdgeProperty(); } }