Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
in org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction
@Override public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) { try { .... List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>(); for (int j = 0; j < bags.length; j ++) { Seq<Tuple> bag = bags[j]; Iterator<Tuple> iterator = JavaConversions .asJavaCollection(bag).iterator(); final int index = i; tupleIterators.add(new IteratorTransform<Tuple, Tuple>( iterator) { @Override protected Tuple transform(Tuple next) { try { Tuple tuple = tf.newTuple(3); tuple.set(0, index); # we record duplicate key info here #for every records, we will use out.set(0, key) later. may be the key info can be removed tuple.set(1, key); tuple.set(2, next); return tuple; } catch (ExecException e) { throw new RuntimeException(e); } } }); ++ i; } Tuple out = tf.newTuple(2); out.set(0, key); out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator())); if (LOG.isDebugEnabled()) { LOG.debug("ToGroupKeyValueFunction out " + out); } return out; } catch (Exception e) { throw new RuntimeException(e); } }