Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12255

Group.aggregateField does not work with combine fns that return list[T]

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Not A Bug
    • 2.28.0
    • Missing
    • beam-model
    • None

    Description

      I am trying to use the group api to apply multiple aggregations to a Row. When i add a combine fn where the output is a list[T] the job blows up at run time.

       

      Couldn't find field type for Tjava.lang.RuntimeException: Couldn't find field type for T at org.apache.beam.sdk.schemas.FieldTypeDescriptors.fieldTypeForJavaType(FieldTypeDescriptors.java:98) at org.apache.beam.sdk.schemas.FieldTypeDescriptors.getArrayFieldType(FieldTypeDescriptors.java:118) at org.apache.beam.sdk.schemas.FieldTypeDescriptors.fieldTypeForJavaType(FieldTypeDescriptors.java:86) at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.aggregateFields(SchemaAggregateFn.java:234) at org.apache.beam.sdk.schemas.transforms.Group$ByFields.aggregateField(Group.java:648)

       

      It looks like a type erasure issue. Is there any work around. I am happy to manually tell beam what the inner type is

       

      Example test case (in scala) here

       

      // code placeholderdwd
      
        "data set " should "be aggregatable with group by" in {
          val schema = Schema.of(
            Schema.Field.nullable("key", Schema.FieldType.STRING),
            Schema.Field.nullable("value1", Schema.FieldType.INT32),
            Schema.Field.nullable("value2", Schema.FieldType.INT32)
          )
      
          val keys = Vector("foo", "bar", "baz")
      
          val inputRaw = 1.to(1000).map(_ => {
            Row.withSchema(schema)
              .addValues(
                keys(Random.nextInt(keys.length)),
                Random.nextInt(10000),
                (Random.nextInt(100) + 256),
              ).build()
          })
      
      
          testPipeline { p =>
      
            val quantileFn = ApproximateQuantilesCombineFn.create[java.lang.Integer](20)
            val outputType = quantileFn.getOutputType
            println(outputType)
      
            val input = p
              .apply(Create.of(inputRaw.asJava))
              .setRowSchema(schema)
      
            val res = input
              .apply(Group.byFieldNames("key")
                .aggregateField("value1", quantileFn, "quantiles"))
              .apply(MapDoFn.of(x => {
                println(x.toString)
                x.toString
              })).setCoder(StringUtf8Coder.of())
      
            PAssert.that(res).containsInAnyOrder()
      
          }
      
        }
      
      
      }

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            ldefeo1 luke de feo
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: