After running enough flow throw ShellBolt in some cases after tens of minutes ShellBolt stopped reporting task ids. After this error condition no new task ids where reported back. When acking of the tuples processed by the bolt where set in callback related to arrival of the task ids all tuple trees going through the bolt would fail after reporting stopped. ShellBolt will continue to operate new tuples and respond to heartbeats.
After running some tests and making some changes to the code. I have following hypothesis for the reason:
org.apache.storm.utils.ShellBoltMessageQueue has two queues one being for taskIds and the other for bolt messages.
taskIds queue is implemented by LinkedList and bolt msg queue LinkedBlockingQueue. Both of the queues are operated similarly.
One major difference between the structures is that LinkedList is not synchronized.
In the code:
ShellBoltMessageQueue.java:58 add method is used without holding the lock. Where as ShellBoltMessageQueue.java:110 uses the poll method with the lock.
As in ShellBolt BoltReaderRunnable and BoltWriterRunnable are run concurrently this can lead to race condition.
If I move the ShellBoltMessageQueue.java:58 inside the lock and run the test in similar fashion it seems to solve the issue.