Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6896

Creating a table from a POJO and use table sink to output fail

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.0
    • 1.3.1, 1.4.0
    • Table SQL / API
    • None

    Description

      Following example fails at sink, using debug mode to see the reason of ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?

      Sample:

      TumblingWindow.java
      public class TumblingWindow {
      
          public static void main(String[] args) throws Exception {
              List<Content> data = new ArrayList<Content>();
              data.add(new Content(1L, "Hi"));
              data.add(new Content(2L, "Hallo"));
              data.add(new Content(3L, "Hello"));
              data.add(new Content(4L, "Hello"));
              data.add(new Content(7L, "Hello"));
              data.add(new Content(8L, "Hello world"));
              data.add(new Content(16L, "Hello world"));
      
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      
              DataStream<Content> stream = env.fromCollection(data);
      
              DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
                      new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {
      
                          /**
                           * 
                           */
                          private static final long serialVersionUID = 410512296011057717L;
      
                          @Override
                          public long extractTimestamp(Content element) {
                              return element.getRecordTime();
                          }
      
                      });
      
              final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
      
              Table table = tableEnv.fromDataStream(stream2, "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
      
              Table windowTable = table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, urlKey")
                      .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum ");
      
              //table.printSchema();
      
              TableSink<Row> windowSink = new CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
                      WriteMode.OVERWRITE);
              windowTable.writeToSink(windowSink);
      
              // tableEnv.toDataStream(windowTable, Row.class).print();
              env.execute();
          }
      
          public static class Content implements Serializable {
      
              /**
               * 
               */
              private static final long serialVersionUID = 1429246948772430441L;
      
              private String urlKey;
      
              private long recordTime;
              // private String recordTimeStr;
      
              private long httpGetMessageCount;
              private long httpPostMessageCount;
              private long uplink;
              private long downlink;
              private long statusCode;
              private long statusCodeCount;
      
              public Content() {
                  super();
              }
      
              public Content(long recordTime, String urlKey) {
                  super();
                  this.recordTime = recordTime;
                  this.urlKey = urlKey;
              }
      
              public String getUrlKey() {
                  return urlKey;
              }
      
              public void setUrlKey(String urlKey) {
                  this.urlKey = urlKey;
              }
      
              public long getRecordTime() {
                  return recordTime;
              }
      
              public void setRecordTime(long recordTime) {
                  this.recordTime = recordTime;
              }
      
              public long getHttpGetMessageCount() {
                  return httpGetMessageCount;
              }
      
              public void setHttpGetMessageCount(long httpGetMessageCount) {
                  this.httpGetMessageCount = httpGetMessageCount;
              }
      
              public long getHttpPostMessageCount() {
                  return httpPostMessageCount;
              }
      
              public void setHttpPostMessageCount(long httpPostMessageCount) {
                  this.httpPostMessageCount = httpPostMessageCount;
              }
      
              public long getUplink() {
                  return uplink;
              }
      
              public void setUplink(long uplink) {
                  this.uplink = uplink;
              }
      
              public long getDownlink() {
                  return downlink;
              }
      
              public void setDownlink(long downlink) {
                  this.downlink = downlink;
              }
      
              public long getStatusCode() {
                  return statusCode;
              }
      
              public void setStatusCode(long statusCode) {
                  this.statusCode = statusCode;
              }
      
              public long getStatusCodeCount() {
                  return statusCodeCount;
              }
      
              public void setStatusCodeCount(long statusCodeCount) {
                  this.statusCodeCount = statusCodeCount;
              }
      
          }
      
          private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {
      
              /**
               * 
               */
              private static final long serialVersionUID = 1L;
      
              @Override
              public long extractTimestamp(Object[] element, long previousElementTimestamp) {
                  // TODO Auto-generated method stub
                  return (long) element[0];
              }
      
              @Override
              public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
                  return new Watermark(extractedTimestamp);
              }
      
          }
      }
      

      Exception trace

      Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
      	at org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateFieldAccess(CodeGenerator.scala:1661)
      	at org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateInputAccess(CodeGenerator.scala:1599)
      	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$26.apply(CodeGenerator.scala:875)
      	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$26.apply(CodeGenerator.scala:874)
      	at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:728)
      	at scala.collection.immutable.Range.foreach(Range.scala:166)
      	at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:727)
      	at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:874)
      	at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.generatedConversionFunction(DataStreamScan.scala:36)
      	at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:36)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:63)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:94)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:119)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:94)
      	at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:678)
      	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:637)
      	at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:214)
      	at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
      	at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
      	at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:66)
      
      

      Attachments

        1. debug.png
          389 kB
          Mark You

        Issue Links

          Activity

            People

              sunjincheng121 sunjincheng
              bonbonmark Mark You
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: