Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.11.0, 1.12.0, 1.13.0, 1.14.0
-
None
-
None
Description
encounter thread-safe problem when using StreamExecutionEnvironment#initializeContextEnvironment (or related code, such as PackagedProgramUtils#getPipelineFromProgram) in multiple-threads environment.
The reason is the initializeContextEnvironment method is not thread-safe:
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory); }
contextEnvironmentFactory is a static variable, and when initializeContextEnvironment is executed in multiple-threads environment, the value of contextEnvironmentFactory may be changed by other thread when executing threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
The solution is: use the local variable ctx instead of the static variable contextEnvironmentFactory.
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; threadLocalContextEnvironmentFactory.set(ctx); }
Another thing I doubt is whether contextEnvironmentFactory is really needed ? Currently, contextEnvironmentFactory is not set or reset individually, it is always changed with threadLocalContextEnvironmentFactory.