Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22947

SPIP: as-of join in Spark SQL



    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.2.1
    • Fix Version/s: None
    • Component/s: SQL
    • Labels:


      Background and Motivation

      Time series analysis is one of the most common analysis on financial data. In time series analysis, as-of join is a very common operation. Supporting as-of join in Spark SQL will allow many use cases of using Spark SQL for time series analysis.

      As-of join is “join on time” with inexact time matching criteria. Various library has implemented asof join or similar functionality:
      Kdb: https://code.kx.com/wiki/Reference/aj
      Pandas: http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof
      R: This functionality is called “Last Observation Carried Forward”
      JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin
      Flint: https://github.com/twosigma/flint#temporal-join-functions

      This proposal advocates introducing new API in Spark SQL to support as-of join.

      Target Personas

      Data scientists, data engineers


      • New API in Spark SQL that allows as-of join
      • As-of join of multiple table (>2) should be performant, because it’s very common that users need to join multiple data sources together for further analysis.
      • Define Distribution, Partitioning and shuffle strategy for ordered time series data


      These are out of scope for the existing SPIP, should be considered in future SPIP as improvement to Spark’s time series analysis ability:

      • Utilize partition information from data source, i.e, begin/end of each partition to reduce sorting/shuffling
      • Define API for user to implement asof join time spec in business calendar (i.e. lookback one business day, this is very common in financial data analysis because of market calendars)
      • Support broadcast join

      Proposed API Changes


      TimeContext is an object that defines the time scope of the analysis, it has begin time (inclusive) and end time (exclusive). User should be able to change the time scope of the analysis (i.e, from one month to five year) by just changing the TimeContext.

      To Spark engine, TimeContext is a hint that:
      can be used to repartition data for join
      serve as a predicate that can be pushed down to storage layer

      Time context is similar to filtering time by begin/end, the main difference is that time context can be expanded based on the operation taken (see example in as-of join).

      Time context example:

      TimeContext timeContext = TimeContext("20160101", "20170101")


      User Case A (join without key)

      Join two DataFrames on time, with one day lookback:

      TimeContext timeContext = TimeContext("20160101", "20170101")
      dfA = ...
      dfB = ...
      JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day")
      result = dfA.asofJoin(dfB, joinSpec)

      Example input/output:

      time, quantity
      20160101, 100
      20160102, 50
      20160104, -50
      20160105, 100
      time, price
      20151231, 100.0
      20160104, 105.0
      20160105, 102.0
      time, quantity, price
      20160101, 100, 100.0
      20160102, 50, null
      20160104, -50, 105.0
      20160105, 100, 102.0

      Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This is an important illustration of the time context - it is able to expand the context to 20151231 on dfB because of the 1 day lookback.

      Use Case B (join with key)

      To join on time and another key (for instance, id), we use “by” to specify the key.

      TimeContext timeContext = TimeContext("20160101", "20170101")
      dfA = ...
      dfB = ...
      JoinSpec joinSpec = JoinSpec(timeContext).on("time").by("id").tolerance("-1day")
      result = dfA.asofJoin(dfB, joinSpec)

      Example input/output:

      time, id, quantity
      20160101, 1, 100
      20160101, 2, 50
      20160102, 1, -50
      20160102, 2, 50
      time, id, price
      20151231, 1, 100.0
      20150102, 1, 105.0
      20150102, 2, 195.0
      time, id, quantity, price
      20160101, 1, 100, 100.0
      20160101, 2, 50, null
      20160102, 1, -50, 105.0
      20160102, 2, 50, 195.0

      Optional Design Sketch

      Implementation A

      (This is just initial thought of how to implement this)

      (1) Using begin/end of the TimeContext, we first partition the left DataFrame intonon-overlapping partitions. For the purpose of demonstration, assume we partition it into one-day partitions:

      [20160101, 20160102) [20160102, 20160103) ... [20161231, 20170101)

      (2) Then we partition right DataFrame into overlapping partitions, taking into account tolerance, e.g. one day lookback:

      [20151231, 20160102) [20160101, 20160103) ... [20161230, 20170101)

      (3) Pair left and right partitions

      (4) For each pair of partitions, because all data for the join is in the partition pair, we can now join the partition pair locally.

      (5) Use partitioning in (1) as the output distribution so we can reuse it for sequential asof joins

      Optional Rejected Sketch

      Rejected Implementation A

      Another implementation is to sample the data to figure out its time range instead of using a time context. This approach is implemented in https://github.com/twosigma/flint

      This approach suffers in performance if sampling data is expensive. For instance, when the data to be sampled is the output of an expensive computation, sampling the data would cause the expensive computation to be done twice.




            • Assignee:
              icexelloss Li Jin
            • Votes:
              9 Vote for this issue
              34 Start watching this issue


              • Created: