Description
This check https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/datamgr.go#L269
in it's current position prevents the "normal teardown" that the reader expects. This means that readers for instructions that terminate early such as due to splitting stay resident in memory and never close.
In practice this is benign as the buffer would already be closed, but with streaming this memory leak would become noticable.
The fix is to move the check to after the sentinel check, and additionally check there for early termination to avoid closing the buffer twice.