Index: KafkaETLContext.java =================================================================== --- KafkaETLContext.java (revision 1164349) +++ KafkaETLContext.java (working copy) @@ -49,7 +49,7 @@ final static int DEFAULT_TIMEOUT = 60000; // one minute final static KafkaETLKey DUMMY_KEY = new KafkaETLKey(); - + protected int _index; /*index of context*/ protected String _input = null; /*input string*/ protected KafkaETLRequest _request = null; @@ -61,7 +61,7 @@ protected MultiFetchResponse _response = null; /*fetch response*/ protected Iterator _messageIt = null; /*message iterator*/ - + protected Iterator _respIterator = null; protected int _retry = 0; protected long _requestTime = 0; /*accumulative request time*/ protected long _startTime = -1; @@ -125,7 +125,7 @@ public boolean hasMore () { return _messageIt != null && _messageIt.hasNext() - || _response != null && _response.iterator().hasNext() + || _response != null && _respIterator.hasNext() || _offset < _offsetRange[1]; } @@ -135,9 +135,9 @@ boolean gotNext = get(key, value); if(_response != null) { - Iterator iter = _response.iterator(); - while ( !gotNext && iter.hasNext()) { - ByteBufferMessageSet msgSet = iter.next(); + + while ( !gotNext && _respIterator.hasNext()) { + ByteBufferMessageSet msgSet = _respIterator.next(); if ( hasError(msgSet)) return false; _messageIt = (Iterator) msgSet.iterator(); gotNext = get(key, value); @@ -156,6 +156,8 @@ long tempTime = System.currentTimeMillis(); _response = _consumer.multifetch(array); + if(_response != null) + _respIterator = _response.iterator(); _requestTime += (System.currentTimeMillis() - tempTime); return true; @@ -198,7 +200,7 @@ key.set(_index, _offset, msgAndOffset.message().checksum()); - _offset += msgAndOffset.offset(); //increase offset + _offset = msgAndOffset.offset(); //increase offset _count ++; //increase count return true;