Details
-
Improvement
-
Status: To Do
-
Minor
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Background
S2jobs, in many use cases are batch processing multiple sources each time, reading every source every time when the job launches. But considering that each source has different update intervals, such processing has following disadvantages.
- Waste of resource: Source that hasn't been updated also gets processed
- Waiting between the time interval: Updated source should wait the fixed schedule to get processed.
- Error vulnerable: Specifying time partitions such as
path/date_id=yyyy-mm-dd
as path of each sources was the common way to construct data pipeline using s2jobs. Leaving the possibility of broken pipeline when such path hasn't been created or updated.
Somehow decoupling job schedule from sources update interval and incrementally processing each sources can be an solution to these problems
Idea
Having assumption that each jobs name has global uniqueness and each jobs sources can be identified by its name, (and also considering each source can have multiple paths) we can form an unique unit of transaction with the form as following.
job_name, source_name, path (or an hive table)
This unit in implementation is named TxId(transaction id).
With TxId, we can specify each paths the job need to read and record its offset with the form below.
job_name, source_name, path (or an hive table), commit_id
This form in implementation is named TxLog and we record the offset of each TxId at field commit_id.
Scenario with such utilities
- Step 1) program can specify which paths should be read at current interval by comparing the given path(or its subdirectory/partition) with corresponding TxLog
- Step 2) Then the program can rephrase each sources to read only the soucres/paths updated.
- Step 3) After job gets finished, the program should finally write TxLog of each TxId.
Implementation: TransactionUtil
TxId and TxLog
case class TxId(jobName: String, appName: String, sourceName: String, path: String) case class TxLog(id: Long, txId: TxId, commitType: CommitType, commitId: String, lastModifiedAt: Long, enabled: Option[Boolean])
TransactionUtil Interface
trait TransactionUtil { val config: Config /** * read last committed transaction using context provided from spark session. * * @param txId * @return */ def readTx(txId: TxId): Option[TxLog] /** * write latest target as last committed transaction * * @param txId * @param latestTarget */ def writeTx(txLog: TxLog): Unit }
Usage
org.apache.s2graph.s2jobs.JobLauncher.scala
def main(args: Array[String]): Unit = { val options = parseArguments(args) logger.info(s"Job Options : ${options}") val jobDescription = JobDescription(getConfig(options)) val ss = SparkSession .builder() .appName(s"${jobDescription.name}") .config("spark.driver.maxResultSize", "20g") .enableHiveSupport() .getOrCreate() ... if (options.incremental) { val config = ConfigFactory.load() val txUtil = DBTransactionUtil(config) TransactionUtil.withTx(ss, options.name, jobDescription, txUtil)(_.run()) } else { val job = new Job(ss, jobDescription) job.run() } }
Attachments
Issue Links
- links to