Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.16.0
-
None
-
None
Description
When streaming POJO types, the codec is registered automatically. However, when streaming a tuples containing a UDT, the cassandra driver can't serialize the type.
Motivating Example: If we have a table containing a collection of UDTs, then the only way to append is through a tuple stream.
create type link (
title text,
url text
);
create table users (
id int primary key,
links set<frozen<link>>
);
If we were to use a POJO stream, the field containing the collection would be overwritten with a new collection on each upsert. If we set the query in a tuple stream:
DataStream<Tuple2<Set<Link>, Integer>> linkStream = ... CassandraSink.addSink(linkStream) .setQuery("update users set links = links + ? where id = ?") ... .build();
We will get a CodecNotFoundException. Using the datastax java driver outside of the Flink framework, it is easy to register a codec:
Session session = cluster.connect();
new MappingManager(session).udtCodec(Link.class);
However, this requires access to a session, which ClusterBuilder does not expose in any way.
Potential solutions: expose Session or MapperManager in some way to the ClusterBuilder class or create some method such as registerUDT on CassandraSinkBuilder.