Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22014

Sample windows in Spark SQL

    XMLWordPrintableJSON

Details

    • Wish
    • Status: Closed
    • Minor
    • Resolution: Auto Closed
    • 2.2.0
    • None
    • DStreams, SQL
    • None

    Description

      Hello,
      I am using spark to process measurement data. It is possible to create sample windows in Spark Streaming, where the duration of the window is smaller than the slide. But when I try to do the same with Spark SQL (The measurement data has a time stamp column) then I got an analysis exception:

      Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, 60000000, 180000000, 0)' due to data type mismatch: The slide duration (180000000) must be less than or equal to the windowDuration (60000000)
      

      Here is a example:

      import java.sql.Timestamp;
      import java.text.SimpleDateFormat;
      import java.util.ArrayList;
      import java.util.Date;
      import java.util.List;
      
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Encoders;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.RowFactory;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.sql.functions;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      
      public class App {
      	public static Timestamp createTimestamp(String in) throws Exception {
      		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
      	    Date parsedDate = dateFormat.parse(in);
      	    return new Timestamp(parsedDate.getTime());
      	}
      	
      	public static void main(String[] args) {
      		SparkSession spark = SparkSession.builder().appName("Window Sampling Example").getOrCreate();
      		
      		List<String> sensorData = new ArrayList<String>();
      		sensorData.add("2017-08-04 00:00:00, 22.75");
      		sensorData.add("2017-08-04 00:01:00, 23.82");
      		sensorData.add("2017-08-04 00:02:00, 24.15");
      		sensorData.add("2017-08-04 00:03:00, 23.16");
      		sensorData.add("2017-08-04 00:04:00, 22.62");
      		sensorData.add("2017-08-04 00:05:00, 22.89");
      		sensorData.add("2017-08-04 00:06:00, 23.21");
      		sensorData.add("2017-08-04 00:07:00, 24.59");
      		sensorData.add("2017-08-04 00:08:00, 24.44");
      		
      		Dataset<String> in = spark.createDataset(sensorData, Encoders.STRING());
      		
      		StructType sensorSchema = DataTypes.createStructType(new StructField[] { 
      				DataTypes.createStructField("timestamp", DataTypes.TimestampType, false),
      				DataTypes.createStructField("value", DataTypes.DoubleType, false),
      		});
      		
      		Dataset<Row> data = spark.createDataFrame(in.toJavaRDD().map(new Function<String, Row>() {
      			public Row call(String line) throws Exception {
      				return RowFactory.create(createTimestamp(line.split(",")[0]), Double.parseDouble(line.split(",")[1]));
      			}
      		}), sensorSchema);
      		
      		data.groupBy(functions.window(data.col("timestamp"), "1 minutes", "3 minutes")).avg("value").orderBy("window").show(false);
      	}
      }
      

      I think there should be no difference (duration and slide) in a "Spark Streaming window" and a "Spark SQL window" function.

      Attachments

        Activity

          People

            Unassigned Unassigned
            simon3038 Simon Schiff
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: