diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 1ef6cc5..fbde81d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -105,6 +105,8 @@ import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; +import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; @@ -309,14 +311,21 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration switch (edgeType) { case BROADCAST_EDGE: UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig - .newBuilder(keyClass, valClass).setFromConfiguration(conf).build(); + .newBuilder(keyClass, valClass) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et1Conf.createDefaultBroadcastEdgeProperty(); case CUSTOM_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); EdgeManagerPluginDescriptor edgeDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = @@ -331,7 +340,10 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration partitionerConf = createPartitionerConf(partitionerClassName, conf); UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et3Conf.createDefaultEdgeProperty(); case SIMPLE_EDGE: default: @@ -339,7 +351,11 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration partitionerConf = createPartitionerConf(partitionerClassName, conf); OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf).build(); + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), + TezBytesComparator.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et4Conf.createDefaultEdgeProperty(); } }