Description
With hash partitions, I use the KuduContext like so:
kuduContext.createTable(modifiedTable, predictions.schema, Seq("movieid","userid"),
new CreateTableOptions().addHashPartitions(ImmutableList.of("movieid"), 3).setNumReplicas(1))
There isn't a clean way to use KuduContext with range partitions however. I have it working below, but CreateTableOptions and KuduContext both take a schema, but of different formats (one a list of ColumnSchema, the other a StructType).
val fixSchema: Schema = {
val columns = ImmutableList.of(
new ColumnSchemaBuilder("clordid", Type.STRING).key(true).build(),
new ColumnSchemaBuilder("transacttime", Type.INT64).key(true).build(),
....
new ColumnSchemaBuilder("lastupdated", Type.INT64).key(false).build())
new Schema(columns)
}
val schema =
StructType(
StructField("clordid", StringType, false) ::
StructField("transacttime", LongType, false) ::
....
StructField("lastupdated", LongType, true) :: Nil)
val kuduContext = new KuduContext(kuduMaster)
val options = new CreateTableOptions()
.setRangePartitionColumns(ImmutableList.of("transacttime"))
.addHashPartitions(ImmutableList.of("clordid"), 3)
.setNumReplicas(1)
val today = new DateTime().withTimeAtStartOfDay()
val dayInMillis = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)
for (i <- 0 until numberOfDays ){
val lowerBound = fixSchema.newPartialRow()
val lbMillis = today.plusDays.getMillis
lowerBound.addLong("transacttime", lbMillis)
val upperBound = fixSchema.newPartialRow()
upperBound.addLong("transacttime", (lbMillis+dayInMillis-1))
options.addRangePartition(lowerBound, upperBound)
}
kuduContext.createTable(tableName, schema, Seq("clordid","transacttime"),options)
Attachments
Issue Links
- duplicates
-
KUDU-1676 Spark DDL needs elegant way to specify range partitioning
- Resolved