When using Scrunch PCollections and attempting to map to a pair of values, the keyvalue implicit function in CanParallelDo will "upgrade" the result to a PTable[K, V]. This is often the desired behaviour, but as Scrunch PTable is not an extension of Scrunch PCollection, then there are cases where this is not what is wanted.
Concrete example from music land: I am trying to count the number of plays for each track in each country. I want to do this:
trackPlayedMessage(tpm => (tpm.track, tpm.country)).count()
However because of the implicit CanParallelTransform that is substituted, I cannot call .count() because what I get is a PTable and not a PCollection.
There are a number of possible remedies that I'm happy to have a go at, but I'd like some input as to which would be best:
- Make PTable[K,V] a real extension of PCollection[(K, V)] (analagous to how it works in Crunch)
- Add an "asPCollection" method to PTable which "downgrades" the PTable[K, V] to a PCollection[(K, V)].
- Make mapToTable and flatMapToTable distinct from map and flatMap to make the choice explicity (warning: breaks existing API).
- Expose an equivalent to LowPriorityParallelTransforms.single to be invoked explicitly to get a collection instead of a table using .map(fn)(implicitly, single)
- Something else