Uploaded image for project: 'S2Graph'
  1. S2Graph
  2. S2GRAPH-255

Enable incremental processing of s2jobs

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: To Do
    • Minor
    • Resolution: Unresolved
    • None
    • None
    • s2jobs
    • None

    Description

      Background

      S2jobs, in many use cases are batch processing multiple sources each time, reading every source every time when the job launches. But considering that each source has different update intervals, such processing has following disadvantages.

      • Waste of resource: Source that hasn't been updated also gets processed
      • Waiting between the time interval: Updated source should wait the fixed schedule to get processed.
      • Error vulnerable: Specifying time partitions such as
        path/date_id=yyyy-mm-dd

        as path of each sources was the common way to construct data pipeline using s2jobs. Leaving the possibility of broken pipeline when such path hasn't been created or updated.

      Somehow decoupling job schedule from sources update interval and incrementally processing each sources can be an solution to these problems

      Idea

      Having assumption that each jobs name has global uniqueness and each jobs sources can be identified by its name, (and also considering each source can have multiple paths) we can form an unique unit of transaction with the form as following.

       job_name, source_name, path (or an hive table) 

      This unit in implementation is named TxId(transaction id).

      With TxId, we can specify each paths the job need to read and record its offset with the form below.

       job_name, source_name, path (or an hive table), commit_id 

      This form in implementation is named TxLog and we record the offset of each TxId at field commit_id.

      Scenario with such utilities

      • Step 1) program can specify which paths should be read at current interval by comparing the given path(or its subdirectory/partition) with corresponding TxLog
      • Step 2) Then the program can rephrase each sources to read only the soucres/paths updated.
      • Step 3) After job gets finished, the program should finally write TxLog of each TxId.

      Implementation: TransactionUtil

      TxId and TxLog

      case class TxId(jobName: String,
                      appName: String,
                      sourceName: String,
                      path: String)
      
      case class TxLog(id: Long,
                       txId: TxId,
                       commitType: CommitType,
                       commitId: String,
                       lastModifiedAt: Long,
                       enabled: Option[Boolean])               
      

      TransactionUtil Interface

      trait TransactionUtil {
      
        val config: Config
        /**
          * read last committed transaction using context provided from spark session.
          *
          * @param txId
          * @return
          */
        def readTx(txId: TxId): Option[TxLog]
      
        /**
          * write latest target as last committed transaction
          *
          * @param txId
          * @param latestTarget
          */
        def writeTx(txLog: TxLog): Unit
      }
      

      Usage

      org.apache.s2graph.s2jobs.JobLauncher.scala

        def main(args: Array[String]): Unit = {
      
          val options = parseArguments(args)
          logger.info(s"Job Options : ${options}")
      
          val jobDescription = JobDescription(getConfig(options))
      
          val ss = SparkSession
            .builder()
            .appName(s"${jobDescription.name}")
            .config("spark.driver.maxResultSize", "20g")
            .enableHiveSupport()
            .getOrCreate()
      
          ...
      
          if (options.incremental) {
            val config = ConfigFactory.load()
            val txUtil = DBTransactionUtil(config)
      
            TransactionUtil.withTx(ss, options.name, jobDescription, txUtil)(_.run())
          } else {
            val job = new Job(ss, jobDescription)
      
            job.run()
          }
        }
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ColdCow Chanwoo Yoon
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m