Description
When using
org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf
with dataflow-runner I got the following issue:
[WARNING] java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for the accumulator at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:78) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239) at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213) at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468) at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60) at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996) at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308) at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115) at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:566) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282) at java.lang.Thread.run (Thread.java:829) Caused by: java.io.IOException: Could not obtain a Coder for the accumulator at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:207) at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179) at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438) at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248) at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175) at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239) at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213) at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468) at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60) at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996) at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308) at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115) at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:566) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282) at java.lang.Thread.run (Thread.java:829) Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer coder for type parameter AccumT at org.apache.beam.sdk.coders.CoderRegistry.getCoder (CoderRegistry.java:328) at org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder (CombineFnBase.java:119) at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder (Combine.java:391) at org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder (AggregationCombineFnAdapter.java:75) at org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder (CombineFns.java:430) at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder (SchemaAggregateFn.java:335) at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:204) at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179) at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438) at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248) at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175) at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239) at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213) at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468) at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60) at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996) at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308) at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115) at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:566) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282) at java.lang.Thread.run (Thread.java:829) [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 57.006 s [INFO] Finished at: 2021-10-25T13:54:23Z [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project connected-stories-analytics-logger-store: An exception occured while executing the Java class. java.io.IOException: Could not obtain a Coder for the accumulator: Cannot infer coder for type parameter AccumT -> [Help 1]
So I found out that
CountIfFn.Accum
on Beam seems not to have an associated Coder.
FYI - iemejia
Attachments
Issue Links
- is related to
-
BEAM-11743 Implement COUNTIF Aggregation function for Zetasql
- Resolved
- links to