Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2275

HoodieDeltaStreamerException when using OCC and a second concurrent writer

    XMLWordPrintableJSON

Details

    Description

       I am trying to utilize Optimistic Concurrency Control in order to allow two writers to update a single table simultaneously. The two writers are:

      • Writer A: Deltastreamer job consuming continuously from Kafka
      • Writer B: A spark datasource-based writer that is consuming parquet files out of S3
      • Table Type: Copy on Write

       

      After a few commits from each writer the deltastreamer will fail with the following exception:

       

      org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
       "partitionToWriteStats" : {
       ...

       

      What appears to be happening is a lack of commit isolation between the two writers
      Writer B (spark datasource writer) will land commits which are eventually picked up by Writer A (Delta Streamer). This is an issue because the Delta Streamer needs checkpoint information which the spark datasource of course does not include in its commits. My understanding was that OCC was built for this very purpose (among others). 

      OCC config for Delta Streamer:

      hoodie.write.concurrency.mode=optimistic_concurrency_control
       hoodie.cleaner.policy.failed.writes=LAZY
       hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
       hoodie.write.lock.zookeeper.url=<zk_host>
       hoodie.write.lock.zookeeper.port=2181
       hoodie.write.lock.zookeeper.lock_key=writer_lock
       hoodie.write.lock.zookeeper.base_path=/hudi-write-locks

       

      OCC config for spark datasource:

      // Multi-writer concurrency
       .option("hoodie.cleaner.policy.failed.writes", "LAZY")
       .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
       .option(
       "hoodie.write.lock.provider",
       org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
       )
       .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
       .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
       .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
       .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks")

      Steps to Reproduce:

      • Start a deltastreamer job against some table Foo
      • In parallel, start writing to the same table Foo using spark datasource writer
      • Note that after a few commits from each the deltastreamer is likely to fail with the above exception when the datasource writer creates non-isolated inflight commits

      NOTE: I have not tested this with two of the same datasources (ex. two deltastreamer jobs)

      NOTE 2: Another detail that may be relevant is that the two writers are on completely different spark clusters but I assumed this shouldn't be an issue since we're locking using Zookeeper

      Attachments

        Issue Links

          Activity

            People

              codope Sagar Sumit
              dave_hagman Dave Hagman
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: