Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-4059 Pig on Spark
  3. PIG-5205

Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction

    XMLWordPrintableJSON

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: spark-branch
    • Component/s: spark
    • Labels:
      None

      Description

      in org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction

         @Override
              public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
                  try {
                 ....
                      List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
                      for (int j = 0; j < bags.length; j ++) {
                          Seq<Tuple> bag = bags[j];
                          Iterator<Tuple> iterator = JavaConversions
                                  .asJavaCollection(bag).iterator();
                          final int index = i;
                          tupleIterators.add(new IteratorTransform<Tuple, Tuple>(
                                  iterator) {
                              @Override
                              protected Tuple transform(Tuple next) {
                                  try {
                                      Tuple tuple = tf.newTuple(3);
                                      tuple.set(0, index);
                                     # we record duplicate key info here
                                      #for every records, we will use   out.set(0, key) later. may be the key info can be removed 
                                   tuple.set(1, key);   
                                      tuple.set(2, next);
                                      return tuple;
                                  } catch (ExecException e) {
                                      throw new RuntimeException(e);
                                  }
                              }
                          });
                          ++ i;
                      }
      
                      Tuple out = tf.newTuple(2);
                      out.set(0, key);
                      out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
                      if (LOG.isDebugEnabled()) {
                          LOG.debug("ToGroupKeyValueFunction out " + out);
                      }
      
                      return out;
                  } catch (Exception e) {
                      throw new RuntimeException(e);
                  }
              }
      
      

        Attachments

        1. PIG-5205.patch
          2 kB
          liyunzhang

          Activity

            People

            • Assignee:
              kellyzly liyunzhang
              Reporter:
              kellyzly liyunzhang
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: