withTable("t") {
withTempPath { path =>
var numTotalCachedHit = 0
val listener = new QueryExecutionListener {
override def onFailure(f: String, qe: QueryExecution, e: Exception):Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
qe.withCachedData match {
case c: SaveIntoDataSourceCommand
if c.query.isInstanceOf[InMemoryRelation] =>
numTotalCachedHit += 1
case _ =>
println(qe.withCachedData)
}
}
}
spark.listenerManager.register(listener)
val udf1 = udf({ (x: Int, y: Int) => x + y })
val df = spark.range(0, 3).toDF("a")
.withColumn("b", udf1(col("a"), lit(10)))
df.cache()
df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties)
assert(numTotalCachedHit == 1, "expected to be cached in jdbc")
}
}