Avro
  1. Avro
  2. AVRO-513

java mapreduce api should pass iterator of matching objects to reduce

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: java
    • Labels:
      None

      Description

      The Java mapreduce API added in AVRO-493 requires reducers implementations to explicitly detect sequences of matching data.

      Rather the reduce method might better look something like:

      void reduce(Iterator<IN>, Collector<OUT>);

      Where all equal values are passed in a single call.

      1. AVRO-513.patch
        8 kB
        Doug Cutting
      2. AVRO-513.patch
        7 kB
        Doug Cutting
      3. AVRO-513.patch
        7 kB
        Doug Cutting
      4. AVRO-513.patch
        7 kB
        Doug Cutting

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          This was addressed by AVRO-581.

          Show
          Doug Cutting added a comment - This was addressed by AVRO-581 .
          Hide
          Doug Cutting added a comment -

          AVRO-581 may provide a better way of implementing this.

          Show
          Doug Cutting added a comment - AVRO-581 may provide a better way of implementing this.
          Hide
          Tom White added a comment -

          +1

          Show
          Tom White added a comment - +1
          Hide
          Doug Cutting added a comment -

          > We need to do something about the close() method - at least document that if overridden in subclasses they must call super.close() as the first line in the overridden method.

          I've added some javadoc about that, and also updated the class's javadoc.

          Show
          Doug Cutting added a comment - > We need to do something about the close() method - at least document that if overridden in subclasses they must call super.close() as the first line in the overridden method. I've added some javadoc about that, and also updated the class's javadoc.
          Hide
          Tom White added a comment -

          > So all objects in the queue must be unique.

          You're right - because we are buffering in the queue, we need to make sure that all instances in the queue are distinct copies. So the pooling idea you mentioned above might be a good optimization for later.

          > The risk that user code swallows the InterruptableException can be fixed by setting 'done=true' before calling interrupt().

          I think this is correct.

          We need to do something about the close() method - at least document that if overridden in subclasses they must call super.close() as the first line in the overridden method.

          Show
          Tom White added a comment - > So all objects in the queue must be unique. You're right - because we are buffering in the queue, we need to make sure that all instances in the queue are distinct copies. So the pooling idea you mentioned above might be a good optimization for later. > The risk that user code swallows the InterruptableException can be fixed by setting 'done=true' before calling interrupt(). I think this is correct. We need to do something about the close() method - at least document that if overridden in subclasses they must call super.close() as the first line in the overridden method.
          Hide
          Doug Cutting added a comment -

          > I was thinking that you only need to copy at the beginning of the group, since you can compare subsequent values to the copy, until they differ, at which point you make a new copy.

          But they might differ in fields that are not compared, e.g., count. So all objects in the queue must be unique.

          > I think it's possible that the interrupt occurs between the check on "done" and the call to take(), so the call to take() would go ahead and cause a deadlock.

          Interrupt() sets the thread's interrupt flag, and it remains set until a method that throws InterruptableException is called. So if it's called before or after take(), that's fine, since take() will throw it either way when the queue is empty.

          The risk that user code swallows the InterruptableException can be fixed by setting 'done=true' before calling interrupt(). Then if user code swallows the interrupt and the queue is empty, we'd never call queue.take(), since, by definition, the thread wasn't between checking 'done' and calling 'take()' when it got the interrupt. Does that sound right?

          Show
          Doug Cutting added a comment - > I was thinking that you only need to copy at the beginning of the group, since you can compare subsequent values to the copy, until they differ, at which point you make a new copy. But they might differ in fields that are not compared, e.g., count. So all objects in the queue must be unique. > I think it's possible that the interrupt occurs between the check on "done" and the call to take(), so the call to take() would go ahead and cause a deadlock. Interrupt() sets the thread's interrupt flag, and it remains set until a method that throws InterruptableException is called. So if it's called before or after take(), that's fine, since take() will throw it either way when the queue is empty. The risk that user code swallows the InterruptableException can be fixed by setting 'done=true' before calling interrupt(). Then if user code swallows the interrupt and the queue is empty, we'd never call queue.take(), since, by definition, the thread wasn't between checking 'done' and calling 'take()' when it got the interrupt. Does that sound right?
          Hide
          Scott Carey added a comment -

          Not sure about an interrupt signal being intercepted, or not being delivered. But I think it's possible that the interrupt occurs between the check on "done" and the call to take(), so the call to take() would go ahead and cause a deadlock.

          I think all Java blocking I/O and queue operations check the interrupted status on the thread before sleeping/waiting. Or more precisely --the JVM checks it before the thread yields, and will throw the interrupted exception if the flag is set.

          Show
          Scott Carey added a comment - Not sure about an interrupt signal being intercepted, or not being delivered. But I think it's possible that the interrupt occurs between the check on "done" and the call to take(), so the call to take() would go ahead and cause a deadlock. I think all Java blocking I/O and queue operations check the interrupted status on the thread before sleeping/waiting. Or more precisely --the JVM checks it before the thread yields, and will throw the interrupted exception if the flag is set.
          Hide
          Tom White added a comment -

          > I suppose once a value's been consumed from the queue it could be returned to a pool used by the deserializer. We could limit the size of the pool to be the same size as the queue. Is that what you had in mind?

          I was thinking that you only need to copy at the beginning of the group, since you can compare subsequent values to the copy, until they differ, at which point you make a new copy.

          > I worry a bit that something else could interrupt the thread or intercept the InterruptedException, e.g., in the user's reducer. Is that a well-founded worry?

          The interrupt could be used to signal to check a shared variable indicating that the reduce is done, rather than the runner interpreting an interrupt as a done signal. This would take care of the problem of something accidentally interrupting.

          Not sure about an interrupt signal being intercepted, or not being delivered. But I think it's possible that the interrupt occurs between the check on "done" and the call to take(), so the call to take() would go ahead and cause a deadlock.

          > A better approach might be to put in a sentinel value. Unfortunately this has to be of type T, and we don't know how to construct a T.

          Could we use AvroWrapper here? Perhaps a null wrapper means end of reduce.

          Show
          Tom White added a comment - > I suppose once a value's been consumed from the queue it could be returned to a pool used by the deserializer. We could limit the size of the pool to be the same size as the queue. Is that what you had in mind? I was thinking that you only need to copy at the beginning of the group, since you can compare subsequent values to the copy, until they differ, at which point you make a new copy. > I worry a bit that something else could interrupt the thread or intercept the InterruptedException, e.g., in the user's reducer. Is that a well-founded worry? The interrupt could be used to signal to check a shared variable indicating that the reduce is done, rather than the runner interpreting an interrupt as a done signal. This would take care of the problem of something accidentally interrupting. Not sure about an interrupt signal being intercepted, or not being delivered. But I think it's possible that the interrupt occurs between the check on "done" and the call to take(), so the call to take() would go ahead and cause a deadlock. > A better approach might be to put in a sentinel value. Unfortunately this has to be of type T, and we don't know how to construct a T. Could we use AvroWrapper here? Perhaps a null wrapper means end of reduce.
          Hide
          Doug Cutting added a comment -

          > This could be improved to be a copy per reduce group, although it's more work.

          I suppose once a value's been consumed from the queue it could be returned to a pool used by the deserializer. We could limit the size of the pool to be the same size as the queue. Is that what you had in mind?

          > The next() method should check to see if there is a next and throw NoSuchElementException if not.

          Fixed.

          > Rather than polling the queue, you could use the blocking take() method and interrupt the thread from close() to signal that there are no more values.

          Here's a version that does this. I worry a bit that something else could interrupt the thread or intercept the InterruptedException, e.g., in the user's reducer. Is that a well-founded worry? A better approach might be to put in a sentinel value. Unfortunately this has to be of type T, and we don't know how to construct a T.

          > Starting a thread from within a subclass constructor is unsafe.

          Fixed.

          Show
          Doug Cutting added a comment - > This could be improved to be a copy per reduce group, although it's more work. I suppose once a value's been consumed from the queue it could be returned to a pool used by the deserializer. We could limit the size of the pool to be the same size as the queue. Is that what you had in mind? > The next() method should check to see if there is a next and throw NoSuchElementException if not. Fixed. > Rather than polling the queue, you could use the blocking take() method and interrupt the thread from close() to signal that there are no more values. Here's a version that does this. I worry a bit that something else could interrupt the thread or intercept the InterruptedException, e.g., in the user's reducer. Is that a well-founded worry? A better approach might be to put in a sentinel value. Unfortunately this has to be of type T, and we don't know how to construct a T. > Starting a thread from within a subclass constructor is unsafe. Fixed.
          Hide
          Tom White added a comment -

          Generally looks good.

          • Does the change to AvroKeySerialization mean that a copy is made per reduce value? This could be improved to be a copy per reduce group, although it's more work.
          • If a subclass wants to override close() then they have to call super.close() or the reducer may not complete. This could be the source of very subtle bugs, so it might be better to mark close() as final, and have another close method for subclasses, closeReducer() perhaps? However, this gets in the way of users who may want to use the non-grouping reduce() method. I don't see a good answer here.
          • If I call next() without calling hasNext(), I don't get a NoSuchElementException, which doesn't conform to the Iterator contract. The next() method should check to see if there is a next and throw NoSuchElementException if not.
          • Rather than polling the queue, you could use the blocking take() method and interrupt the thread from close() to signal that there are no more values.
          • Starting a thread from within a subclass constructor is unsafe (because the object may not be fully constructed, see http://www.ibm.com/developerworks/java/library/j-jtp0618.html#4). Better to call runner.start() just after constructing the runner.
          Show
          Tom White added a comment - Generally looks good. Does the change to AvroKeySerialization mean that a copy is made per reduce value? This could be improved to be a copy per reduce group, although it's more work. If a subclass wants to override close() then they have to call super.close() or the reducer may not complete. This could be the source of very subtle bugs, so it might be better to mark close() as final, and have another close method for subclasses, closeReducer() perhaps? However, this gets in the way of users who may want to use the non-grouping reduce() method. I don't see a good answer here. If I call next() without calling hasNext(), I don't get a NoSuchElementException, which doesn't conform to the Iterator contract. The next() method should check to see if there is a next and throw NoSuchElementException if not. Rather than polling the queue, you could use the blocking take() method and interrupt the thread from close() to signal that there are no more values. Starting a thread from within a subclass constructor is unsafe (because the object may not be fully constructed, see http://www.ibm.com/developerworks/java/library/j-jtp0618.html#4 ). Better to call runner.start() just after constructing the runner.
          Hide
          Doug Cutting added a comment -

          Here's an implementation of this that passes tests.

          Show
          Doug Cutting added a comment - Here's an implementation of this that passes tests.
          Hide
          Doug Cutting added a comment -

          Do we want to include this in 1.4.0?

          Show
          Doug Cutting added a comment - Do we want to include this in 1.4.0?
          Hide
          Doug Cutting added a comment -

          Oops. This is harder than I thought. With Avro data in the key, and nulls in the value and a grouping comparator that always returns equals, Hadoop will call reduce once with the first key and an iterator over all of the null values. But we need to see each of the keys. Sigh.

          Perhaps Avro's reduce could be run in a separate thread that reads from a queue fed by Hadoop's reduce?

          Show
          Doug Cutting added a comment - Oops. This is harder than I thought. With Avro data in the key, and nulls in the value and a grouping comparator that always returns equals, Hadoop will call reduce once with the first key and an iterator over all of the null values. But we need to see each of the keys. Sigh. Perhaps Avro's reduce could be run in a separate thread that reads from a queue fed by Hadoop's reduce?
          Hide
          Doug Cutting added a comment -

          To implement this, we can specify a Hadoop grouping comparator that always returns true, so Hadoop's reduce method is called with a sorted iterator over all items in the partition. Then we'd can wrap this in an iterator that keeps a copy of the previous item and whose #hasNext() implementation returns false when the previous and current items differ according to Avro's comparator, and pass that down to Avro's reduce method. To keep a copy of the previous item we will need to implement a #copy(Object, Schema) method for GenericData and SpecificData.

          Show
          Doug Cutting added a comment - To implement this, we can specify a Hadoop grouping comparator that always returns true, so Hadoop's reduce method is called with a sorted iterator over all items in the partition. Then we'd can wrap this in an iterator that keeps a copy of the previous item and whose #hasNext() implementation returns false when the previous and current items differ according to Avro's comparator, and pass that down to Avro's reduce method. To keep a copy of the previous item we will need to implement a #copy(Object, Schema) method for GenericData and SpecificData.

            People

            • Assignee:
              Doug Cutting
              Reporter:
              Doug Cutting
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development