Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.4.0
-
None
-
package com.vnl.stocks;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class StocksProcessing {
{ tokens = value.split(","); return new StockPrice(tokens[0], Double.parseDouble(tokens[1])); }
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//Read from a socket stream at map it to StockPrice objects
DataStream<StockPrice> socketStockStream = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, StockPrice>() {
private String[] tokens;
@Override
public StockPrice map(String value) throws Exception});
socketStockStream.print();
//Generate other stock streams
DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10));
// DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20));
// DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30));
// DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40));
//Merge all stock streams together
DataStream<StockPrice> stockStream = socketStockStream.union(SPX_stream/, FTSE_stream, DJI_stream, BUX_stream/);
// stockStream.print();
Thread.sleep(100);
AllWindowedStream<StockPrice, GlobalWindow> windowedStream = stockStream
.countWindowAll(10, 5);
//.keyBy("symbol")
//.timeWindowAll(Time.of(10, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));
//stockStream.keyBy("symbol");
//Compute some simple statistics on a rolling window
DataStream<StockPrice> lowest = windowedStream.maxBy("price");
//DataStream<StockPrice> highest = windowedStream.;
/*DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol")
.maxBy("price").flatten();
DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol")
.mapWindow(new WindowMean()).flatten();*/
lowest.print();
Thread.sleep(100);
/*
AllWindowedStream<StockPrice, GlobalWindow> windowedStream1 = lowest
.countWindowAll(5,2);
//windowedStream1.print();
DataStream<StockPrice> highest = windowedStream1.minBy("price");*/
//highest.print();
env.execute("Stock stream");
}
}package com.vnl.stocks; import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class StocksProcessing { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Read from a socket stream at map it to StockPrice objects DataStream<StockPrice> socketStockStream = env .socketTextStream("localhost", 9999) .map(new MapFunction<String, StockPrice>() { private String[] tokens; @Override public StockPrice map(String value) throws Exception { tokens = value.split(","); return new StockPrice(tokens[0], Double.parseDouble(tokens[1])); } }); socketStockStream.print(); //Generate other stock streams DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10)); // DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20)); // DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30)); // DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40)); //Merge all stock streams together DataStream<StockPrice> stockStream = socketStockStream.union(SPX_stream/ , FTSE_stream, DJI_stream, BUX_stream /); // stockStream.print(); Thread.sleep(100); AllWindowedStream<StockPrice, GlobalWindow> windowedStream = stockStream .countWindowAll(10, 5); //.keyBy("symbol") //.timeWindowAll(Time.of(10, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)); //stockStream.keyBy("symbol"); //Compute some simple statistics on a rolling window DataStream<StockPrice> lowest = windowedStream.maxBy("price"); //DataStream<StockPrice> highest = windowedStream.; /*DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol") .maxBy("price").flatten(); DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol") .mapWindow(new WindowMean()).flatten();*/ lowest.print(); Thread.sleep(100); /* AllWindowedStream<StockPrice, GlobalWindow> windowedStream1 = lowest .countWindowAll(5,2); //windowedStream1.print(); DataStream<StockPrice> highest = windowedStream1.minBy("price");*/ //highest.print(); env.execute("Stock stream"); } }
-
Important
Description
I used AllWindowedStream<?,GlobalWindow> to process a stream and generate maximum of my window using countWindowAll functions. In this output the size and slide of window works incorrectly.
Refer below example for the bug
Initial stream : 1,2,3,4,5,6.........
Output 1: (Find min for window 10,5) : 1,6,11.....(This is correct)
However if i calculate maximum, I get output as:
Output 2: (Find max for window 10,5) : 5,10,15.... (which is wrong)
Expected: 10,15,20....
Please resolve this error.