Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.2.0, 2.2.1
-
None
Description
I have the following streaming query:
baseDataSet .groupBy(window(col(UTC_DATE_TIME), applicationProperties.getProperty("current_active_users_window_length") + " minutes", "5 seconds")) .agg(approx_count_distinct(col(INTERNAL_USER_ID), applicationProperties.getDoubleProperty("approximate_distinct_count_error_percentage")).as("value")) .filter(col("window.end").leq(current_timestamp())) .select(unix_timestamp(col("window.end")).as("timestamp"), col("value")) .writeStream() .trigger(Trigger.ProcessingTime(applicationProperties.getIntegerProperty("current_active_users_trigger_interval"), TimeUnit.SECONDS)) .format(ActiveUsersSinkProvider.class.getCanonicalName()) .outputMode(OutputMode.Update()) .option("checkpointLocation", SystemProperties.APP_CHECKPOINT_DIR + "/current_active_users") .start();
In the sink I'm trying to map the dataset to a Java bean with the following code:
data.as(Encoders.bean(LongTimeBased.class)).collectAsList()
where LongTimeBased is:
public class LongTimeBased { private long timestamp; private long value; public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
So whatever data is aggregated the timestamp is correct but the value field is always 1. When I select the value field from every row, its value is correct:
for(Row row : data.collectAsList()) { Long value = row.getAs("value"); //correct value; }