Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates an input HadoopRDD, and later used in MapOperator, FilterOperator, etc.
And, given the introduction of
HIVE-8118, we may have multiple downstream RDDs that share the same input HadoopRDD, and we would like to have the HadoopRDD to be cached, to avoid scanning the same table multiple times. A typical case would be like the following:
Here, MT_11 and MT_12 are MapTran from a splitted MapWork,
and RT_1 and RT_2 are two ReduceTran. Note that, this example is simplified, as we may also have ShuffleTran between MapTran and ReduceTran.
When multiple Spark threads are running, MT_11 may be executed first, and it will ask for an iterator from the HadoopRDD will trigger the creation of the iterator, which in turn triggers the initialization of the IOContext associated with that particular thread.
Now, the problem is: before MT_12 starts executing, it will also ask for an iterator from the
HadoopRDD, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, this will skip the initialization of the IOContext associated with this particular thread. And, when MT_12 starts executing, it will try to initialize the MapOperator, but since the IOContext is not initialized, this will fail miserably.