Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-4059 Pig on Spark
  3. PIG-5205

Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • spark-branch
    • spark
    • None

    Description

      in org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction

         @Override
              public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
                  try {
                 ....
                      List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
                      for (int j = 0; j < bags.length; j ++) {
                          Seq<Tuple> bag = bags[j];
                          Iterator<Tuple> iterator = JavaConversions
                                  .asJavaCollection(bag).iterator();
                          final int index = i;
                          tupleIterators.add(new IteratorTransform<Tuple, Tuple>(
                                  iterator) {
                              @Override
                              protected Tuple transform(Tuple next) {
                                  try {
                                      Tuple tuple = tf.newTuple(3);
                                      tuple.set(0, index);
                                     # we record duplicate key info here
                                      #for every records, we will use   out.set(0, key) later. may be the key info can be removed 
                                   tuple.set(1, key);   
                                      tuple.set(2, next);
                                      return tuple;
                                  } catch (ExecException e) {
                                      throw new RuntimeException(e);
                                  }
                              }
                          });
                          ++ i;
                      }
      
                      Tuple out = tf.newTuple(2);
                      out.set(0, key);
                      out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
                      if (LOG.isDebugEnabled()) {
                          LOG.debug("ToGroupKeyValueFunction out " + out);
                      }
      
                      return out;
                  } catch (Exception e) {
                      throw new RuntimeException(e);
                  }
              }
      
      

      Attachments

        1. PIG-5205.patch
          2 kB
          liyunzhang

        Activity

          People

            kellyzly liyunzhang
            kellyzly liyunzhang
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: