Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Not A Bug
-
2.28.0
-
None
Description
I am trying to use the group api to apply multiple aggregations to a Row. When i add a combine fn where the output is a list[T] the job blows up at run time.
Couldn't find field type for Tjava.lang.RuntimeException: Couldn't find field type for T at org.apache.beam.sdk.schemas.FieldTypeDescriptors.fieldTypeForJavaType(FieldTypeDescriptors.java:98) at org.apache.beam.sdk.schemas.FieldTypeDescriptors.getArrayFieldType(FieldTypeDescriptors.java:118) at org.apache.beam.sdk.schemas.FieldTypeDescriptors.fieldTypeForJavaType(FieldTypeDescriptors.java:86) at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.aggregateFields(SchemaAggregateFn.java:234) at org.apache.beam.sdk.schemas.transforms.Group$ByFields.aggregateField(Group.java:648)
It looks like a type erasure issue. Is there any work around. I am happy to manually tell beam what the inner type is
Example test case (in scala) here
// code placeholderdwd "data set " should "be aggregatable with group by" in { val schema = Schema.of( Schema.Field.nullable("key", Schema.FieldType.STRING), Schema.Field.nullable("value1", Schema.FieldType.INT32), Schema.Field.nullable("value2", Schema.FieldType.INT32) ) val keys = Vector("foo", "bar", "baz") val inputRaw = 1.to(1000).map(_ => { Row.withSchema(schema) .addValues( keys(Random.nextInt(keys.length)), Random.nextInt(10000), (Random.nextInt(100) + 256), ).build() }) testPipeline { p => val quantileFn = ApproximateQuantilesCombineFn.create[java.lang.Integer](20) val outputType = quantileFn.getOutputType println(outputType) val input = p .apply(Create.of(inputRaw.asJava)) .setRowSchema(schema) val res = input .apply(Group.byFieldNames("key") .aggregateField("value1", quantileFn, "quantiles")) .apply(MapDoFn.of(x => { println(x.toString) x.toString })).setCoder(StringUtf8Coder.of()) PAssert.that(res).containsInAnyOrder() } } }