Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Information Provided
-
Not applicable
-
None
-
Google Cloud Dataflow
Description
I have tried running a simple example to calculate a running average or sum using the `stats` package however it does not seems to work.
Here's a reproducer
package main import ( "context" "encoding/json" "flag" "fmt" "time" "cloud.google.com/go/pubsub" "github.com/apache/beam/sdks/go/pkg/beam" "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio" "github.com/apache/beam/sdks/go/pkg/beam/log" "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts" "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" "github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx" "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/go/pkg/beam/x/debug" ) var ( input = flag.String("input", "iot-data", "Pubsub input topic.") ) type sensor struct { name string value int } var ( data = []sensor{ {name: "temperature", value: 24}, {name: "humidity", value: 10}, {name: "temperature", value: 20}, {name: "temperature", value: 22}, {name: "humidity", value: 14}, {name: "humidity", value: 18}, } ) func main() { flag.Parse() beam.Init() ctx := context.Background() project := gcpopts.GetProject(ctx) log.Infof(ctx, "Publishing %v messages to: %v", len(data), *input) defer pubsubx.CleanupTopic(ctx, project, *input) sub, err := Publish(ctx, project, *input, data...) if err != nil { log.Fatal(ctx, err) } log.Infof(ctx, "Running streaming sensor data with subscription: %v", sub.ID()) p := beam.NewPipeline() s := p.Root() // Reads sensor data from pubsub // Returns PCollection<[]byte> col := pubsubio.Read(s, project, *input, &pubsubio.ReadOptions{Subscription: sub.ID()}) // Transforms incoming bytes from pubsub to a string,int key value // where the key is the sensor name and the value is the sensor reading // Accepts PCollection<[]byte> // Returns PCollection<KV<string,int>> data := beam.ParDo(s, extractSensorData, col) // Calculate running average per sensor // // Accpets PCollection<KV<string,int>> // Returns PCollection<KV<string,int>> sum := stats.MeanPerKey(s, data) debug.Print(s, sum) if err := beamx.Run(context.Background(), p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } } func extractSensorData(msg []byte) (string, int) { ctx := context.Background() data := &sensor{} if err := json.Unmarshal(msg, data); err != nil { log.Fatal(ctx, err) } return data.name, data.value } func Publish(ctx context.Context, project, topic string, messages ...sensor) (*pubsub.Subscription, error) { client, err := pubsub.NewClient(ctx, project) if err != nil { return nil, err } t, err := pubsubx.EnsureTopic(ctx, client, topic) if err != nil { return nil, err } sub, err := pubsubx.EnsureSubscription(ctx, client, topic, fmt.Sprintf("%v.sub.%v", topic, time.Now().Unix())) if err != nil { return nil, err } for _, msg := range messages { s := &sensor{} bytes, err := json.Marshal(s) if err != nil { return nil, fmt.Errorf("failed to unmarshal '%v': %v", msg, err) } m := &pubsub.Message{ Data: ([]byte)(bytes), // Attributes: ?? } id, err := t.Publish(ctx, m).Get(ctx) if err != nil { return nil, fmt.Errorf("failed to publish '%v': %v", msg, err) } log.Infof(ctx, "Published %v with id: %v", msg, id) } return sub, nil }
I ran this code in the following way
go run . --project="<my-project>" --runner dataflow --staging_location gs://<my-gs-bucket>/binaries/ --temp_location gs://<my-gs-bucket>/tmp/ --region "europe-west1" --worker_harness_container_image=alethenorio/beam-go:v2.11.0
The code published to pubsub and then reads the messages and attempts to call `stats.MeanPerKey` to create a running average.
When deploying this on cloud dataflow, using a container I built myself from the v2.11.0 version (alethenorio/beam-go:v2.11.0) I get the following error every time
Worker panic: Unexpected transform URN: beam:transform:combine_grouped_values:v1goroutine 1 [running]: runtime/debug.Stack(0x50, 0x0, 0x0) /usr/local/go/src/runtime/debug/stack.go:24 +0x9d runtime/debug.PrintStack() /usr/local/go/src/runtime/debug/stack.go:16 +0x22 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init.hook.func1() /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/init/init.go:77 +0xac panic(0xd4b160, 0xc001172f70) /usr/local/go/src/runtime/panic.go:522 +0x1b5 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*builder).makeLink(0xc00078b980, 0xc0007aa2c0, 0x18, 0xc0007aa180, 0x17, 0x0, 0x400000000000040, 0xffffffffffffffff, 0x0, 0x0) /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:521 +0x308f github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*builder).makePCollection(0xc00078b980, 0xc0007aa2c0, 0x18, 0xc0011775a0, 0xf, 0x0, 0x0) /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:281 +0x5ff github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.UnmarshalPlan(0xc00078b2c0, 0xc00031a0f0, 0xd49820, 0xff5e10) /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:71 +0x393 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc000304300, 0x1026be0, 0xc00031a0f0, 0xc000774880, 0xc00031a0f0) /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:155 +0x1ae github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0x1026be0, 0xc00031a0f0, 0xc000774880) /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:114 +0x1cf github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main(0x1026be0, 0xc00031a0f0, 0x7ffe0437db7c, 0xf, 0x7ffe0437db9f, 0xf, 0x0, 0x0) /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:129 +0x786 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init.hook() /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/harness/init/init.go:86 +0xee github.com/apache/beam/sdks/go/pkg/beam/core/runtime.Init() /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/core/runtime/init.go:42 +0x50 github.com/apache/beam/sdks/go/pkg/beam.Init(...) /home/localuser/go/pkg/mod/github.com/apache/beam@v2.11.0+incompatible/sdks/go/pkg/beam/forward.go:111 main.main() /home/localuser/reproducer/main.go:43 +0x8f
I realize the Go SDK is not stable yet but I was uncertain where to go to post this issue in case the devs are not aware so I hope this is fine.
I get the feeling there is some issue with the gRPC requests sending the wrong URN but I couldn't find where in the code the `v1goroutine` gets set (I think it needs to be just v1)