Uploaded image for project: 'Phoenix'
  1. Phoenix
  2. PHOENIX-7001

Change Data Capture leveraging Max Lookback and Uncovered Indexes

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Reopened
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None
    • None

    Description

      The use cases for a Change Data Capture (CDC) feature are centered around capturing changes to a given table (or updatable view) as these changes happen in near real-time. A CDC application can retrieve changes in real-time or with some delay, or even retrieves the same set of changes multiple times. This means the CDC use case can be generalized as time range queries where the time range is typically short such as last x minutes or hours or expressed as a specific time range in the last n days where n is typically less than 7.

      A change is an update in a row. That is, a change is either updating one or more columns of a table for a given row or deleting a row. It is desirable to provide these changes in the order of their arrival. One can visualize the delivery of these changes through a stream from a Phoenix table to the application that is initiated by the application similar to the delivery of any other Phoenix query results. The difference is that a regular query result includes at most one result row for each row satisfying the query and the deleted rows are not visible to the query result while the CDC stream/result can include multiple result rows for each row and the result includes deleted rows. Some use cases need to also get the pre and/or post image of the row along with a change on the row. 

      The design proposed here leverages Phoenix Max Lookback and Uncovered Global Indexes. The max lookback feature retains recent changes to a table, that is, the changes that have been done in the last x days typically. This means that the max lookback feature already captures the changes to a given table. Currently, the max lookback age is configurable at the cluster level. We need to extend this capability to be able to configure the max lookback age at the table level so that each table can have a different max lookback age based on its CDC application requirements.

      To deliver the changes in the order of their arrival, we need a time based index. This index should be uncovered as the changes are already retained in the table by the max lookback feature. The arrival time will be defined as the mutation timestamp generated by the server. An uncovered index would allow us to efficiently and orderly access to the changes. Changes to an index table are also preserved by the max lookback feature.

      A CDC feature can be composed of the following components:

      • CDCUncoveredIndexRegionScanner: This is a server side scanner on an uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. It goes through index table rows using a raw scan to identify data table rows and retrieves these rows using a raw scan. Using the time range, it forms a JSON blob to represent changes to the row including pre and/or post row images.
      • CDC Query Compiler: This is a client side component. It prepares the scan object based on the given CDC query statement. 
      • CDC DDL Compiler: This is a client side component. It creates the time based uncovered global index based on the given CDC DDL statement and a virtual table of CDC type. CDC will be a new table type. 

      A CDC DDL syntax to create CDC on a (data) table can be as follows: 

      Create CDC <CDC Table Name> on <Data Table Name> INCLUDE (pre | post)  SALT_BUCKETS=<salt bucket count>

      The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC table PK columns start with the timestamp and continue with the data table PK columns. The CDC table includes one non-PK column which is a JSON column. The change is expressed in this JSON column in multiple ways based on the CDC DDL or query statement. The change can be expressed as just the mutation for the change, the pre image of the row (the image before the change), the post image, or any combination of these. The CDC table is not a physical table on disk. It is just a virtual table to be used in a CDC query. Phoenix stores just the metadata for this virtual table. 

      A CDC query can be as follow:

      Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) AND PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)

      This query would return the rows of the CDC table which is constructed on the server side by CDCUncoveredIndexRegionScanner by joining the uncovered index row versions with the corresponding data table row version (using raw scans). The above select query can be hinted at by using a new CDC hint to return just the actual change, pre, or post image of the row, or a combination of them to overwrite the default JSON column format defined by the CDC DDL statement. 

      The CDC application will run the above query in a loop. When the difference between the current time of the application and the upper limit of the time range of the query becomes less than s milliseconds, say x milliseconds, then the application thread needs to sleep s - x milliseconds. The value for s can be small such as 1000 milliseconds. This is to make sure that time skew among the server wall clocks does not lead to data loss.  

      A global time based index may create hot spotting during writes. This is because the same region of the global index will keep getting updated. Since the global index would be uncovered, the size of the updates will be usually smaller than the data table updates. If we assume that index mutations are n times smaller than data table mutations, then a single index region will be able to sustain writes from n data table regions if the data table does not have any other indexes. When the data table has other indexes, the data table write can slow down by 3 times or so. This allows a single index region to match with 3n data table regions. If the number of active data table regions is more than a single index region can sustain then we need to distribute the load to multiple index regions using salting. 

      A local time based index does not have the hot spotting issue but can result in slower CDC queries for tables with a large number of regions. That is why this proposal suggests using global indexes.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              kadir Kadir Ozdemir
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: