Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Bug
-
1.10.0
-
None
-
This is my code
class TestQueries extends Serializable{ def testQuery(): Unit = { // Enable settings val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env, settings) //Consumer kafka topic //... topic_consumer val stream: DataStream[String] = env.addSource(topic_consumer) // Convert stream to DataStream[Row] val result: DataStream[Row] = stream.map(str => desJson(str))(rowType) // desJson is a function to return Row values from deserialize json topic // rowType is a rowTypeInfo with (fieldTypes, fieldNames). fieldTypes are Strings and fieldNames ("user", "name", "lastName") // Register table tableEnv.createTemporaryView("table", result) //Queries val first_query = tableEnv.sqlQuery("SELECT * from table WHERE name = 'Sansa'") val second_query = tableEnv.sqlQuery("SELECT * from table WHERE lastName = 'Stark'") //In the following two lines is where the exception occurs val first_row: DataStream[Row] = tableEnv.toAppendStream[Row](first_query) val second_row: DataStream[Row] = tableEnv.toAppendStream[Row](second_query) //Elasticsearch // Sending data to Elasticsearch env.execute("Test Queries") }
This is my code class TestQueries extends Serializable{ def testQuery(): Unit = { // Enable settings val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env, settings) //Consumer kafka topic //... topic_consumer val stream: DataStream[ String ] = env.addSource(topic_consumer) // Convert stream to DataStream[Row] val result: DataStream[Row] = stream.map(str => desJson(str))(rowType) // desJson is a function to return Row values from deserialize json topic // rowType is a rowTypeInfo with (fieldTypes, fieldNames). fieldTypes are Strings and fieldNames ( "user" , "name" , "lastName" ) // Register table tableEnv.createTemporaryView( "table" , result) //Queries val first_query = tableEnv.sqlQuery( "SELECT * from table WHERE name = 'Sansa' " ) val second_query = tableEnv.sqlQuery( "SELECT * from table WHERE lastName = 'Stark' " ) //In the following two lines is where the exception occurs val first_row: DataStream[Row] = tableEnv.toAppendStream[Row](first_query) val second_row: DataStream[Row] = tableEnv.toAppendStream[Row](second_query) //Elasticsearch // Sending data to Elasticsearch env.execute( "Test Queries" ) }
Description
I used the latest flink version(1.10.0) and sbt(1.3.7). I have this exception when upload a job with streaming sql query:
Caused by: java.lang.ClassCastException: class org.codehaus.janino.CompilerFactory cannot be cast to class org.codehaus.commons.compiler.ICompilerFactory (org.codehaus.janino.CompilerFactory is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3270d194; org.codehaus.commons.compiler.ICompilerFactory is in unnamed module of loader 'app')
at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:432)
When I running main class with sbt run it works perfectly.