Details
-
Improvement
-
Status: Resolved
-
P3
-
Resolution: Fixed
-
None
Description
Defining a composite transform today requires writing a full named subclass of PTransform (as the programming guide documents but there are cases where users may want to define a fairly trivial composite transform using a less verbose Java 8 lambda expression.
Consider an example where the user has defined MyDeserializationTransform that attempts to deserialize byte arrays into some object, returning a PCollectionTuple with tags for successfully deserialized records (mainTag) and for errors (errorTag).
If we introduce a PTransform::compose method that takes in a SerializableFunction, the user can handle errors in a small lambda expression:
byteArrays .apply("attempt to deserialize messages", new MyDeserializationTransform()) .apply("write deserialization errors", PTransform.compose((PCollectionTuple input) -> { input .get(errorTag) .apply(new MyErrorOutputTransform()); return input.get(mainTag); }) .apply("more processing on the deserialized messages", new MyOtherTransform())
This style allows a more concise and fluent pipeline definition than is currently possible.
Attachments
Issue Links
- links to