Uploaded image for project: 'Bahir'
  1. Bahir
  2. BAHIR-232

Support Flink Table API & SQL for flink-connector-activemq

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: Flink-1.0
    • Fix Version/s: None
    • Labels:
      None

      Description

      flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSinkFactory. Then we can connect activemq by the following way:

      String TABLE_CREATE_SQL = "CREATE TABLE books (" +
              " id int, " +
              " title varchar, " +
              " author varchar, " +
              " price double, " +
              " qty int " +
              ") with (" +
              " 'connector.type' = 'activemq', " +
              " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " +
              " 'connector.destination-type' = 'QUEUE', " +
              " 'connector.destination-name' = 'source_queue' " +
              ")";
      
      String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" +
              "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" +
              "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" +
              "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" +
              "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
              "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" +
              "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" +
              "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" +
              "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" +
              "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" +
              "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";
      
      String QUERY_TABLE_SQL = "SELECT * FROM books";
      
      // create activemq source table
      tEnv.sqlUpdate(TABLE_CREATE_SQL);
      
      // produce event to activemq
      tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);
      
      // consume from activemq
      Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);
      

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Leo Zhou zl
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: