Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3871

Add Kafka TableSource with Avro serialization

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Add a Kafka TableSource which supports Avro serialized data.

      The KafkaAvroTableSource should support two modes:

      1. SpecificRecord Mode: In this case the user specifies a class which was code-generated by Avro depending on a schema. Flink treats these classes as regular POJOs. Hence, they are also natively supported by the Table API and SQL. Classes generated by Avro contain their Schema in a static field. The schema should be used to automatically derive field names and types. Hence, there is no additional information required than the name of the class.
      2. GenericRecord Mode: In this case the user specifies an Avro Schema. The schema is used to deserialize the data into a GenericRecord which must be translated into possibly nested Row based on the schema information. Again, the Avro Schema is used to automatically derive the field names and types. This mode is less efficient than the SpecificRecord mode because the GenericRecord needs to be converted into Row.

      This feature depends on FLINK-5280, i.e., support for nested data in TableSource.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2762

          @fhueske I've split the PR according to your suggestion. Now this PR only contains serializer, deserializer and table source.
          Since sink part depends on this PR I cannot publish it now, so I'll publish it when this PR is merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 @fhueske I've split the PR according to your suggestion. Now this PR only contains serializer, deserializer and table source. Since sink part depends on this PR I cannot publish it now, so I'll publish it when this PR is merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2762

          Thanks a lot for the update @mushketyk. I'm quite busy at the moment but will try to have a look at your PR soon. Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Thanks a lot for the update @mushketyk. I'm quite busy at the moment but will try to have a look at your PR soon. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2762

          @fhueske No worries. Take your time.

          Best regards,
          Ivan.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 @fhueske No worries. Take your time. Best regards, Ivan.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r88461322

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,95 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericDatumReader;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.Row;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +
          +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema;
          +
          +/**
          + * Deserialization schema from Avro to

          {@link Row}

          .
          + *
          + * <p>Deserializes the <code>byte[]</code> messages in Avro format and reads
          + * the specified fields.
          + *
          + * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /** Field names in a row */
          + private final String[] fieldNames;
          + /** Types to parse fields as. Indices match fieldNames indices. */
          + private final TypeInformation[] fieldTypes;
          + /** Avro deserialization schema */
          + private final Schema schema;
          + /** Reader that deserializes byte array into a record */
          + private final DatumReader<GenericRecord> datumReader;
          + /** Record to deserialize byte array to */
          + private final GenericRecord record;
          +
          + /**
          + * Creates a Avro deserializtion schema for the given type classes.
          — End diff –

          deserializtion + "a"

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r88461322 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema; + +/** + * Deserialization schema from Avro to {@link Row} . + * + * <p>Deserializes the <code>byte[]</code> messages in Avro format and reads + * the specified fields. + * + * <p>Failure during deserialization are forwarded as wrapped IOExceptions. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** Field names in a row */ + private final String[] fieldNames; + /** Types to parse fields as. Indices match fieldNames indices. */ + private final TypeInformation[] fieldTypes; + /** Avro deserialization schema */ + private final Schema schema; + /** Reader that deserializes byte array into a record */ + private final DatumReader<GenericRecord> datumReader; + /** Record to deserialize byte array to */ + private final GenericRecord record; + + /** + * Creates a Avro deserializtion schema for the given type classes. — End diff – deserializtion + "a"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r88451582

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,84 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.sources.StreamTableSource;
          +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +
          +import java.util.Properties;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + *
          + * <p>The field names are used to parse the Avro file and so are the types.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param fieldNames Row field names.
          + * @param fieldTypes Row field types.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + String[] fieldNames,
          + Class<?>[] fieldTypes)

          { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + }
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}.
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param fieldNames Row field names.
          + * @param fieldTypes Row field types.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + String[] fieldNames,
          + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(
          + TypeInformation<?>[] fieldTypes) {
          +
          + return new AvroRowDeserializationSchema(new String[]

          {"f1", "f2", "f3"}, fieldTypes);
          + }
          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(
          + Class<?>[] fieldTypes) {
          +
          + return new AvroRowDeserializationSchema(new String[]{"f1", "f2", "f3"}

          , TypeUtil.toTypeInfo(fieldTypes));
          — End diff –

          why are the field names set to `new String[]

          {"f1", "f2", "f3"}

          `?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r88451582 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + * + * <p>The field names are used to parse the Avro file and so are the types. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + } + + /** + * Creates a generic Kafka Avro {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + } + + private static AvroRowDeserializationSchema createDeserializationSchema( + TypeInformation<?>[] fieldTypes) { + + return new AvroRowDeserializationSchema(new String[] {"f1", "f2", "f3"}, fieldTypes); + } + + private static AvroRowDeserializationSchema createDeserializationSchema( + Class<?>[] fieldTypes) { + + return new AvroRowDeserializationSchema(new String[]{"f1", "f2", "f3"} , TypeUtil.toTypeInfo(fieldTypes)); — End diff – why are the field names set to `new String[] {"f1", "f2", "f3"} `?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r88451554

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,84 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.sources.StreamTableSource;
          +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +
          +import java.util.Properties;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + *
          + * <p>The field names are used to parse the Avro file and so are the types.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param fieldNames Row field names.
          + * @param fieldTypes Row field types.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + String[] fieldNames,
          + Class<?>[] fieldTypes)

          { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + }
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}.
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param fieldNames Row field names.
          + * @param fieldTypes Row field types.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + String[] fieldNames,
          + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(
          + TypeInformation<?>[] fieldTypes) {
          +
          + return new AvroRowDeserializationSchema(new String[]

          {"f1", "f2", "f3"}, fieldTypes);
          — End diff –

          why are the field names set to `new String[]{"f1", "f2", "f3"}

          `?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r88451554 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + * + * <p>The field names are used to parse the Avro file and so are the types. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + } + + /** + * Creates a generic Kafka Avro {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + } + + private static AvroRowDeserializationSchema createDeserializationSchema( + TypeInformation<?>[] fieldTypes) { + + return new AvroRowDeserializationSchema(new String[] {"f1", "f2", "f3"}, fieldTypes); — End diff – why are the field names set to `new String[]{"f1", "f2", "f3"} `?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r88461076

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,83 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericDatumWriter;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.Row;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +
          +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema;
          +
          +/**
          + * Serialization schema that serializes an object into a Avro bytes.
          + * <p>
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /** Field names in a Row */
          + private final String[] fieldNames;
          + /** Avro serialization schema */
          + private final Schema schema;
          + /** Writer to serialize Avro GeneralRecord into a byte array */
          + private final DatumWriter<GenericRecord> datumWriter;
          + /** Output stream to serialize records into byte array */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          + /** Low level class for serialization of Avro values */
          + private final Encoder encoder = EncoderFactory.get().directBinaryEncoder(arrayOutputStream, null);
          — End diff –

          use `binaryEncoder` instead of `directBinaryEncoder` to get a buffering encoder

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r88461076 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema; + +/** + * Serialization schema that serializes an object into a Avro bytes. + * <p> + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** Field names in a Row */ + private final String[] fieldNames; + /** Avro serialization schema */ + private final Schema schema; + /** Writer to serialize Avro GeneralRecord into a byte array */ + private final DatumWriter<GenericRecord> datumWriter; + /** Output stream to serialize records into byte array */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + /** Low level class for serialization of Avro values */ + private final Encoder encoder = EncoderFactory.get().directBinaryEncoder(arrayOutputStream, null); — End diff – use `binaryEncoder` instead of `directBinaryEncoder` to get a buffering encoder
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r88464767

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroDeserializationSchemaTest.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.Row;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +
          +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
          +import static org.junit.Assert.assertEquals;
          +
          +public class AvroDeserializationSchemaTest {
          +
          + private static final String[] FIELD_NAMES = new String[]

          {"f1", "f2", "f3"}

          ;
          + private static final TypeInformation[] FIELD_TYPES = toTypeInfo(new Class[]

          {Integer.class, Boolean.class, String.class}

          );
          +
          + private AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(
          + FIELD_NAMES, FIELD_TYPES
          + );
          + private AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(
          + FIELD_NAMES, FIELD_TYPES
          + );
          +
          + @Test
          + public void serializeAndDeserializeRow() throws IOException

          { + Row row = createRow(); + + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + }

          +
          + @Test
          + public void serializeRowSeveralTimes() throws IOException

          { + Row row = createRow(); + + serializationSchema.serialize(row); + serializationSchema.serialize(row); + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + }

          +
          + @Test
          + public void deserializeRowSeveralTimes() throws IOException

          { + Row row = createRow(); + + byte[] bytes = serializationSchema.serialize(row); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + }

          +
          + private Row createRow() {
          + Row row = new Row(3);
          — End diff –

          A bit more test data would be good:

          • rows with `null` values
          • more complex types like `DateTime`, `BigInteger`, `BigDecimal`
          • custom POJOs
          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r88464767 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroDeserializationSchemaTest.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; +import static org.junit.Assert.assertEquals; + +public class AvroDeserializationSchemaTest { + + private static final String[] FIELD_NAMES = new String[] {"f1", "f2", "f3"} ; + private static final TypeInformation[] FIELD_TYPES = toTypeInfo(new Class[] {Integer.class, Boolean.class, String.class} ); + + private AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema( + FIELD_NAMES, FIELD_TYPES + ); + private AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema( + FIELD_NAMES, FIELD_TYPES + ); + + @Test + public void serializeAndDeserializeRow() throws IOException { + Row row = createRow(); + + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + } + + @Test + public void serializeRowSeveralTimes() throws IOException { + Row row = createRow(); + + serializationSchema.serialize(row); + serializationSchema.serialize(row); + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + } + + @Test + public void deserializeRowSeveralTimes() throws IOException { + Row row = createRow(); + + byte[] bytes = serializationSchema.serialize(row); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + } + + private Row createRow() { + Row row = new Row(3); — End diff – A bit more test data would be good: rows with `null` values more complex types like `DateTime`, `BigInteger`, `BigDecimal` custom POJOs
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r88463790

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,95 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericDatumReader;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.Row;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +
          +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema;
          +
          +/**
          + * Deserialization schema from Avro to

          {@link Row}

          .
          + *
          + * <p>Deserializes the <code>byte[]</code> messages in Avro format and reads
          + * the specified fields.
          + *
          + * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /** Field names in a row */
          + private final String[] fieldNames;
          + /** Types to parse fields as. Indices match fieldNames indices. */
          + private final TypeInformation[] fieldTypes;
          + /** Avro deserialization schema */
          + private final Schema schema;
          + /** Reader that deserializes byte array into a record */
          + private final DatumReader<GenericRecord> datumReader;
          + /** Record to deserialize byte array to */
          + private final GenericRecord record;
          +
          + /**
          + * Creates a Avro deserializtion schema for the given type classes.
          + *
          + * @param fieldNames
          + * @param fieldTypes Type classes to parse Avro fields as.
          + */
          + public AvroRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes)

          { + this.schema = createRowAvroSchema(fieldNames, fieldTypes); + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + this.datumReader = new GenericDatumReader<>(schema); + this.record = new GenericData.Record(schema); + }

          +
          + @Override
          + public Row deserialize(byte[] message) throws IOException

          { + readRecord(message); + return convertRecordToRow(); + }

          +
          + private void readRecord(byte[] message) throws IOException {
          + ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(message);
          — End diff –

          creating a new `ByteArrayInputStream` and `Decoder` for each record is quite expensive. Can we reuse them as you did in the serializer?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r88463790 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema; + +/** + * Deserialization schema from Avro to {@link Row} . + * + * <p>Deserializes the <code>byte[]</code> messages in Avro format and reads + * the specified fields. + * + * <p>Failure during deserialization are forwarded as wrapped IOExceptions. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** Field names in a row */ + private final String[] fieldNames; + /** Types to parse fields as. Indices match fieldNames indices. */ + private final TypeInformation[] fieldTypes; + /** Avro deserialization schema */ + private final Schema schema; + /** Reader that deserializes byte array into a record */ + private final DatumReader<GenericRecord> datumReader; + /** Record to deserialize byte array to */ + private final GenericRecord record; + + /** + * Creates a Avro deserializtion schema for the given type classes. + * + * @param fieldNames + * @param fieldTypes Type classes to parse Avro fields as. + */ + public AvroRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + this.schema = createRowAvroSchema(fieldNames, fieldTypes); + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + this.datumReader = new GenericDatumReader<>(schema); + this.record = new GenericData.Record(schema); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + readRecord(message); + return convertRecordToRow(); + } + + private void readRecord(byte[] message) throws IOException { + ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(message); — End diff – creating a new `ByteArrayInputStream` and `Decoder` for each record is quite expensive. Can we reuse them as you did in the serializer?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2762

          Hi @mushketyk, do you plan to continue with this PR?

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Hi @mushketyk, do you plan to continue with this PR? Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2762

          HI @fhueske, sorry I was a bit busy lately. I'll update the PR tonight.

          Best regards,
          Ivan

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 HI @fhueske, sorry I was a bit busy lately. I'll update the PR tonight. Best regards, Ivan
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2762

          No worries and no need to hurry!
          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 No worries and no need to hurry! Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r90504858

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,84 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.sources.StreamTableSource;
          +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +
          +import java.util.Properties;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + *
          + * <p>The field names are used to parse the Avro file and so are the types.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param fieldNames Row field names.
          + * @param fieldTypes Row field types.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + String[] fieldNames,
          + Class<?>[] fieldTypes)

          { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + }
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}.
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param fieldNames Row field names.
          + * @param fieldTypes Row field types.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + String[] fieldNames,
          + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(
          + TypeInformation<?>[] fieldTypes) {
          +
          + return new AvroRowDeserializationSchema(new String[]

          {"f1", "f2", "f3"}

          , fieldTypes);
          — End diff –

          Sorry, accidentally typed it.
          Will fix.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r90504858 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + * + * <p>The field names are used to parse the Avro file and so are the types. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + } + + /** + * Creates a generic Kafka Avro {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + } + + private static AvroRowDeserializationSchema createDeserializationSchema( + TypeInformation<?>[] fieldTypes) { + + return new AvroRowDeserializationSchema(new String[] {"f1", "f2", "f3"} , fieldTypes); — End diff – Sorry, accidentally typed it. Will fix.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r90504878

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,84 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.sources.StreamTableSource;
          +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +
          +import java.util.Properties;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + *
          + * <p>The field names are used to parse the Avro file and so are the types.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param fieldNames Row field names.
          + * @param fieldTypes Row field types.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + String[] fieldNames,
          + Class<?>[] fieldTypes)

          { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + }
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}.
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param fieldNames Row field names.
          + * @param fieldTypes Row field types.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + String[] fieldNames,
          + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(
          + TypeInformation<?>[] fieldTypes) {
          +
          + return new AvroRowDeserializationSchema(new String[]

          {"f1", "f2", "f3"}, fieldTypes);
          + }
          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(
          + Class<?>[] fieldTypes) {
          +
          + return new AvroRowDeserializationSchema(new String[]{"f1", "f2", "f3"}

          , TypeUtil.toTypeInfo(fieldTypes));
          — End diff –

          Ditto.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r90504878 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + * + * <p>The field names are used to parse the Avro file and so are the types. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + } + + /** + * Creates a generic Kafka Avro {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldTypes), fieldNames, fieldTypes); + } + + private static AvroRowDeserializationSchema createDeserializationSchema( + TypeInformation<?>[] fieldTypes) { + + return new AvroRowDeserializationSchema(new String[] {"f1", "f2", "f3"}, fieldTypes); + } + + private static AvroRowDeserializationSchema createDeserializationSchema( + Class<?>[] fieldTypes) { + + return new AvroRowDeserializationSchema(new String[]{"f1", "f2", "f3"} , TypeUtil.toTypeInfo(fieldTypes)); — End diff – Ditto.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r90504916

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,83 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericDatumWriter;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.Row;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +
          +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema;
          +
          +/**
          + * Serialization schema that serializes an object into a Avro bytes.
          + * <p>
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /** Field names in a Row */
          + private final String[] fieldNames;
          + /** Avro serialization schema */
          + private final Schema schema;
          + /** Writer to serialize Avro GeneralRecord into a byte array */
          + private final DatumWriter<GenericRecord> datumWriter;
          + /** Output stream to serialize records into byte array */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          + /** Low level class for serialization of Avro values */
          + private final Encoder encoder = EncoderFactory.get().directBinaryEncoder(arrayOutputStream, null);
          — End diff –

          Ok, thank you for suggestion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r90504916 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema; + +/** + * Serialization schema that serializes an object into a Avro bytes. + * <p> + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** Field names in a Row */ + private final String[] fieldNames; + /** Avro serialization schema */ + private final Schema schema; + /** Writer to serialize Avro GeneralRecord into a byte array */ + private final DatumWriter<GenericRecord> datumWriter; + /** Output stream to serialize records into byte array */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + /** Low level class for serialization of Avro values */ + private final Encoder encoder = EncoderFactory.get().directBinaryEncoder(arrayOutputStream, null); — End diff – Ok, thank you for suggestion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r90504943

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,95 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericDatumReader;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.Row;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +
          +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema;
          +
          +/**
          + * Deserialization schema from Avro to

          {@link Row}

          .
          + *
          + * <p>Deserializes the <code>byte[]</code> messages in Avro format and reads
          + * the specified fields.
          + *
          + * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /** Field names in a row */
          + private final String[] fieldNames;
          + /** Types to parse fields as. Indices match fieldNames indices. */
          + private final TypeInformation[] fieldTypes;
          + /** Avro deserialization schema */
          + private final Schema schema;
          + /** Reader that deserializes byte array into a record */
          + private final DatumReader<GenericRecord> datumReader;
          + /** Record to deserialize byte array to */
          + private final GenericRecord record;
          +
          + /**
          + * Creates a Avro deserializtion schema for the given type classes.
          — End diff –

          Good catch. Thank you.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r90504943 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.createRowAvroSchema; + +/** + * Deserialization schema from Avro to {@link Row} . + * + * <p>Deserializes the <code>byte[]</code> messages in Avro format and reads + * the specified fields. + * + * <p>Failure during deserialization are forwarded as wrapped IOExceptions. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** Field names in a row */ + private final String[] fieldNames; + /** Types to parse fields as. Indices match fieldNames indices. */ + private final TypeInformation[] fieldTypes; + /** Avro deserialization schema */ + private final Schema schema; + /** Reader that deserializes byte array into a record */ + private final DatumReader<GenericRecord> datumReader; + /** Record to deserialize byte array to */ + private final GenericRecord record; + + /** + * Creates a Avro deserializtion schema for the given type classes. — End diff – Good catch. Thank you.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2762#discussion_r90505208

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroDeserializationSchemaTest.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.table.Row;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +
          +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
          +import static org.junit.Assert.assertEquals;
          +
          +public class AvroDeserializationSchemaTest {
          +
          + private static final String[] FIELD_NAMES = new String[]

          {"f1", "f2", "f3"}

          ;
          + private static final TypeInformation[] FIELD_TYPES = toTypeInfo(new Class[]

          {Integer.class, Boolean.class, String.class}

          );
          +
          + private AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(
          + FIELD_NAMES, FIELD_TYPES
          + );
          + private AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(
          + FIELD_NAMES, FIELD_TYPES
          + );
          +
          + @Test
          + public void serializeAndDeserializeRow() throws IOException

          { + Row row = createRow(); + + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + }

          +
          + @Test
          + public void serializeRowSeveralTimes() throws IOException

          { + Row row = createRow(); + + serializationSchema.serialize(row); + serializationSchema.serialize(row); + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + }

          +
          + @Test
          + public void deserializeRowSeveralTimes() throws IOException

          { + Row row = createRow(); + + byte[] bytes = serializationSchema.serialize(row); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + }

          +
          + private Row createRow() {
          + Row row = new Row(3);
          — End diff –

          Makes sense. I'll add these tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2762#discussion_r90505208 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroDeserializationSchemaTest.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; +import static org.junit.Assert.assertEquals; + +public class AvroDeserializationSchemaTest { + + private static final String[] FIELD_NAMES = new String[] {"f1", "f2", "f3"} ; + private static final TypeInformation[] FIELD_TYPES = toTypeInfo(new Class[] {Integer.class, Boolean.class, String.class} ); + + private AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema( + FIELD_NAMES, FIELD_TYPES + ); + private AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema( + FIELD_NAMES, FIELD_TYPES + ); + + @Test + public void serializeAndDeserializeRow() throws IOException { + Row row = createRow(); + + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + } + + @Test + public void serializeRowSeveralTimes() throws IOException { + Row row = createRow(); + + serializationSchema.serialize(row); + serializationSchema.serialize(row); + byte[] bytes = serializationSchema.serialize(row); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + } + + @Test + public void deserializeRowSeveralTimes() throws IOException { + Row row = createRow(); + + byte[] bytes = serializationSchema.serialize(row); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + Row resultRow = deserializationSchema.deserialize(bytes); + + assertEqualsRows(row, resultRow); + } + + private Row createRow() { + Row row = new Row(3); — End diff – Makes sense. I'll add these tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2762

          @fhueske Just a small question about your comment. You wrote:
          "users should be able to provide a Schema and a String ->String to convert aGenericRecordinto a Row"

          What is String -> String for?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 @fhueske Just a small question about your comment. You wrote: "users should be able to provide a Schema and a String ->String to convert aGenericRecordinto a Row" What is String -> String for?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2762

          The `String -> String` mapping would be a name mapping of fields from the `GenericRecord` to `Row` attributes. This would allow users to freely choose the attribute names of a table which is generated from an Avro schema.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 The `String -> String` mapping would be a name mapping of fields from the `GenericRecord` to `Row` attributes. This would allow users to freely choose the attribute names of a table which is generated from an Avro schema.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2762

          Are there any other cases when a user needs a custom conversion from a `GenericRecord` to a `Row` instance?
          `String -> String` will definitely work, but I also can think about a generic convertor `GenericRecord => Row`, but this may be an overkill. What do you think about this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 Are there any other cases when a user needs a custom conversion from a `GenericRecord` to a `Row` instance? `String -> String` will definitely work, but I also can think about a generic convertor `GenericRecord => Row`, but this may be an overkill. What do you think about this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2762

          Would be good if the TableSource could be used by providing parameters and without extending it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Would be good if the TableSource could be used by providing parameters and without extending it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2762

          Hi @fhueske
          Ok, I'll give a try.

          I've pushed an updated PR where Schema is now external. I'll work on the transformation part tomorrow.

          I had few issues while trying to accommodate your comments.
          1. When I try to to use DateTime as a value of one of the fields of a GenericRecord I receive the following exception:

          ```
          0 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime is not a valid POJO type

          java.lang.RuntimeException: java.lang.InstantiationException

          at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316)
          at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332)
          at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
          at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
          at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
          at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
          at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
          at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
          at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
          at org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.readRecord(AvroRowDeserializationSchema.java:77)
          at org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:70)
          at org.apache.flink.streaming.connectors.kafka.AvroDeserializationSchemaTest.deserializeRowWithComplexTypes(AvroDeserializationSchemaTest.java:117)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:497)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
          at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
          at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:497)
          at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
          Caused by: java.lang.InstantiationException
          at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
          at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
          at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:314)
          ... 43 more
          ```

          I am not how to serialize DateTime using Avro. Do you have any suggestions?

          2. You suggested that "creating a new ByteArrayInputStream and Decoder for each record is quite expensive". I don't see a way how to do this. The problem is that to create Decoder I need to pass an instance of an InputStream, but when an ByteArrayInputStream is created its content cannot be changed. Therefore a new instance of ByteArrayInputStream should be created for each message.

          I think I can create a different implementation of InputStream that allows to set a buffer (let's call it `MutableByteArrayInputStream`). Then we can create one instance of `MutableByteArrayInputStream` and just call `inputStream.setBuffer(message)` for every message that should be deserialized.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 Hi @fhueske Ok, I'll give a try. I've pushed an updated PR where Schema is now external. I'll work on the transformation part tomorrow. I had few issues while trying to accommodate your comments. 1. When I try to to use DateTime as a value of one of the fields of a GenericRecord I receive the following exception: ``` 0 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime is not a valid POJO type java.lang.RuntimeException: java.lang.InstantiationException at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316) at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.readRecord(AvroRowDeserializationSchema.java:77) at org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:70) at org.apache.flink.streaming.connectors.kafka.AvroDeserializationSchemaTest.deserializeRowWithComplexTypes(AvroDeserializationSchemaTest.java:117) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.lang.InstantiationException at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:314) ... 43 more ``` I am not how to serialize DateTime using Avro. Do you have any suggestions? 2. You suggested that "creating a new ByteArrayInputStream and Decoder for each record is quite expensive". I don't see a way how to do this. The problem is that to create Decoder I need to pass an instance of an InputStream, but when an ByteArrayInputStream is created its content cannot be changed. Therefore a new instance of ByteArrayInputStream should be created for each message. I think I can create a different implementation of InputStream that allows to set a buffer (let's call it `MutableByteArrayInputStream`). Then we can create one instance of `MutableByteArrayInputStream` and just call `inputStream.setBuffer(message)` for every message that should be deserialized.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2762

          Hi @fhueske

          I've updated PR according to your review.

          Best regards,
          Ivan.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 Hi @fhueske I've updated PR according to your review. Best regards, Ivan.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2762

          Hi @mushketyk, while reviewing your PR I noticed a significant shortcoming of the `TableSource` interface. `TableSource` does not support nested schemas. IMO, this needs to be fixed before we can have a proper integration of Avro integration with the Table API / SQL.

          I created FLINK-5280(https://issues.apache.org/jira/browse/FLINK-5280) and also updated the description of this issue (FLINK-3871(https://issues.apache.org/jira/browse/FLINK-3871). The `KafkaJsonTableSource` needs to be reworked to support nested data as well (FLINK-5281(https://issues.apache.org/jira/browse/FLINK-5281)).

          I think we have to pause this PR until FLINK-5280 is resolved.
          What do you think?

          Best, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Hi @mushketyk, while reviewing your PR I noticed a significant shortcoming of the `TableSource` interface. `TableSource` does not support nested schemas. IMO, this needs to be fixed before we can have a proper integration of Avro integration with the Table API / SQL. I created FLINK-5280 ( https://issues.apache.org/jira/browse/FLINK-5280 ) and also updated the description of this issue ( FLINK-3871 ( https://issues.apache.org/jira/browse/FLINK-3871 ). The `KafkaJsonTableSource` needs to be reworked to support nested data as well ( FLINK-5281 ( https://issues.apache.org/jira/browse/FLINK-5281 )). I think we have to pause this PR until FLINK-5280 is resolved. What do you think? Best, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2762

          Hi @fhueske ,

          Sorry, I am still relatively new to Flink so I don't have the full context. Could you elaborate why this is an issue and how does this shortcoming affect this PR?

          Do you have an idea how difficult will it be to implement this change? If it's not very involved I could help with it to move this PR forward.

          Best regards,
          Ivan.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 Hi @fhueske , Sorry, I am still relatively new to Flink so I don't have the full context. Could you elaborate why this is an issue and how does this shortcoming affect this PR? Do you have an idea how difficult will it be to implement this change? If it's not very involved I could help with it to move this PR forward. Best regards, Ivan.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2762

          Hi @mushketyk, sorry for the concise description before. The problem is the following:

          A `TableSource` provides schema information for the Table it produces. However, the methods `TableSource.getFieldNames()` and `TableSouce.getFieldTypes()` return flat arrays which are interpreted as flat schema (the second field in the array represents the second table attribute) without any nesting.

          Avro and many other storage formats (JSON, Parquet, ...) support nested data structures. With the current limitation of the `TableSource` interface, we would need to convert the nested data into a flat schema. However, the Table API and SQL support processing of nested data and it would be a much better integration to pass Avro objects in their original structure into the Table API / SQL query (see my updated proposal for FLINK-3871(https://issues.apache.org/jira/browse/FLINK-3871).

          In order to be able to create a Table of nested Avro data, we need to improve the `TableSource` interface first. Once this is done, we can continue with this PR.

          I'm very sorry, that I did not think about this earlier and the effort you already put into this issue.
          Please let me know if you have any questions.
          Best, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Hi @mushketyk, sorry for the concise description before. The problem is the following: A `TableSource` provides schema information for the Table it produces. However, the methods `TableSource.getFieldNames()` and `TableSouce.getFieldTypes()` return flat arrays which are interpreted as flat schema (the second field in the array represents the second table attribute) without any nesting. Avro and many other storage formats (JSON, Parquet, ...) support nested data structures. With the current limitation of the `TableSource` interface, we would need to convert the nested data into a flat schema. However, the Table API and SQL support processing of nested data and it would be a much better integration to pass Avro objects in their original structure into the Table API / SQL query (see my updated proposal for FLINK-3871 ( https://issues.apache.org/jira/browse/FLINK-3871 ). In order to be able to create a Table of nested Avro data, we need to improve the `TableSource` interface first. Once this is done, we can continue with this PR. I'm very sorry, that I did not think about this earlier and the effort you already put into this issue. Please let me know if you have any questions. Best, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2762

          @mushketyk if you don't mind, I would continue working on this issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2762 @mushketyk if you don't mind, I would continue working on this issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/3663

          FLINK-3871 [table] Add Kafka TableSource with Avro serialization

          This PR adds KafkaAvroTableSource. It serializes/deserializes (nested) Avro records to (nested) Flink rows. Avro Utf8 strings are converted to regular Java strings.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-3871

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3663.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3663


          commit 589e45c5c50c328783f71d219c6606e972f42f34
          Author: twalthr <twalthr@apache.org>
          Date: 2017-04-03T12:44:46Z

          FLINK-3871 [table] Add Kafka TableSource with Avro serialization


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3663 FLINK-3871 [table] Add Kafka TableSource with Avro serialization This PR adds KafkaAvroTableSource. It serializes/deserializes (nested) Avro records to (nested) Flink rows. Avro Utf8 strings are converted to regular Java strings. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-3871 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3663.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3663 commit 589e45c5c50c328783f71d219c6606e972f42f34 Author: twalthr <twalthr@apache.org> Date: 2017-04-03T12:44:46Z FLINK-3871 [table] Add Kafka TableSource with Avro serialization
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113236267

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,122 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.avro.reflect.ReflectDatumWriter;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Serialization schema that serializes

          {@link Row}

          over

          {@link SpecificRecord}

          into a Avro bytes.
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /**
          + * Avro serialization schema.
          + */
          + private final Schema schema;
          +
          + /**
          + * Writer to serialize Avro record into a byte array.
          + */
          + private final DatumWriter<GenericRecord> datumWriter;
          +
          + /**
          + * Output stream to serialize records into byte array.
          + */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          +
          + /**
          + * Low-level class for serialization of Avro values.
          + */
          + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
          +
          + /**
          + * Creates a Avro serialization schema for the given schema.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz)

          { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + }

          +
          + @Override
          + @SuppressWarnings("unchecked")
          + public byte[] serialize(Row row) {
          + // convert to record
          + final Object record = convertToRecord(schema, row);
          +
          + // write
          + try

          { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + }

          catch (IOException e)

          { + throw new RuntimeException("Failed to serialize Row.", e); + }

          + }
          +
          + /**
          + * Converts a (nested) Flink Row into Avro's

          {@link GenericRecord}

          .
          + * Strings are converted into Avro's

          {@link Utf8}

          fields.
          + */
          + private static Object convertToRecord(Schema schema, Object rowObj) {
          + if (rowObj instanceof Row) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD)

          { + schema = types.get(1); + }

          + else

          { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + }

          + } else if (schema.getType() != Schema.Type.RECORD)

          { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + }

          + final List<Schema.Field> fields = schema.getFields();
          + final GenericRecord record = new GenericData.Record(schema);
          — End diff –

          Can we reuse the `GenericRecord`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113236267 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord} . + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema); — End diff – Can we reuse the `GenericRecord`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113179472

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import java.util.List;
          +import java.util.Properties;
          +import org.apache.avro.Schema;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.specific.SpecificRecordBase;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.AvroTypeInfo;
          +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +import org.apache.flink.table.sources.StreamTableSource;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          using a given

          {@link SpecificRecord}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param record Avro specific record.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + Class<? extends SpecificRecordBase> record)

          { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record)

          { + return new AvroRowDeserializationSchema(record); + }

          +
          + /**
          + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
          + * Replaces generic Utf8 with basic String type information.
          + */
          + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) {
          + if (schema.getType() == Schema.Type.RECORD) {
          + final List<Schema.Field> fields = schema.getFields();
          + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
          +
          + final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
          + final String[] names = new String[fields.size()];
          + for (int i = 0; i < fields.size(); i++)

          { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + }

          + return new RowTypeInfo(types, names);
          + } else if (extracted instanceof GenericTypeInfo<?>) {
          + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
          + if (genericTypeInfo.getTypeClass() == Utf8.class)

          { + return BasicTypeInfo.STRING_TYPE_INFO; + }

          + }
          + return extracted;
          + }
          +
          + private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) {
          + final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(record);
          + // determine schema to retrieve deterministic field order
          + final Schema schema = SpecificData.get().getSchema(record);
          + final RowTypeInfo row = (RowTypeInfo) convertToRowTypeInformation(avroTypeInfo, schema);
          + final TypeInformation<?>[] types = new TypeInformation<?>[row.getArity()];
          + for (int i = 0; i < row.getArity(); i++)

          { + types[i] = row.getTypeAt(i); + }

          + return types;
          + }
          +
          + private static String[] createFieldNames(Class<? extends SpecificRecord> record) {
          — End diff –

          `record` -> `avroClass`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113179472 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) { + return new AvroRowDeserializationSchema(record); + } + + /** + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. + * Replaces generic Utf8 with basic String type information. + */ + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List<Schema.Field> fields = schema.getFields(); + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted; + + final TypeInformation<?>[] types = new TypeInformation<?> [fields.size()] ; + final String[] names = new String [fields.size()] ; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo<?>) { + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } + + private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) { + final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(record); + // determine schema to retrieve deterministic field order + final Schema schema = SpecificData.get().getSchema(record); + final RowTypeInfo row = (RowTypeInfo) convertToRowTypeInformation(avroTypeInfo, schema); + final TypeInformation<?>[] types = new TypeInformation<?> [row.getArity()] ; + for (int i = 0; i < row.getArity(); i++) { + types[i] = row.getTypeAt(i); + } + return types; + } + + private static String[] createFieldNames(Class<? extends SpecificRecord> record) { — End diff – `record` -> `avroClass`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113161806

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import java.util.List;
          +import java.util.Properties;
          +import org.apache.avro.Schema;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.specific.SpecificRecordBase;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.AvroTypeInfo;
          +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +import org.apache.flink.table.sources.StreamTableSource;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          using a given

          {@link SpecificRecord}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param record Avro specific record.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + Class<? extends SpecificRecordBase> record)

          { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record)

          { + return new AvroRowDeserializationSchema(record); + }

          +
          + /**
          + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
          + * Replaces generic Utf8 with basic String type information.
          + */
          + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) {
          — End diff –

          Change this to
          ```
          private static TypeInformation<Row> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema)
          ```

          and factor out the recursive logic to a method
          ```
          convertToTypeInfomation(TypeInformation<?> extracted, Schema schema)
          ```
          ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113161806 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) { + return new AvroRowDeserializationSchema(record); + } + + /** + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. + * Replaces generic Utf8 with basic String type information. + */ + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) { — End diff – Change this to ``` private static TypeInformation<Row> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) ``` and factor out the recursive logic to a method ``` convertToTypeInfomation(TypeInformation<?> extracted, Schema schema) ``` ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113207820

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import java.util.List;
          +import java.util.Properties;
          +import org.apache.avro.Schema;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.specific.SpecificRecordBase;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.AvroTypeInfo;
          +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +import org.apache.flink.table.sources.StreamTableSource;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          using a given

          {@link SpecificRecord}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param record Avro specific record.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + Class<? extends SpecificRecordBase> record)

          { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record)

          { + return new AvroRowDeserializationSchema(record); + }

          +
          + /**
          + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
          + * Replaces generic Utf8 with basic String type information.
          + */
          + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) {
          + if (schema.getType() == Schema.Type.RECORD) {
          + final List<Schema.Field> fields = schema.getFields();
          + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
          +
          + final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
          + final String[] names = new String[fields.size()];
          + for (int i = 0; i < fields.size(); i++)

          { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + }

          + return new RowTypeInfo(types, names);
          + } else if (extracted instanceof GenericTypeInfo<?>) {
          + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
          + if (genericTypeInfo.getTypeClass() == Utf8.class)

          { + return BasicTypeInfo.STRING_TYPE_INFO; + }

          + }
          + return extracted;
          + }
          +
          + private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) {
          — End diff –

          This method can be removed as well, if we refactor `KafkaTableSource` as described below.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113207820 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) { + return new AvroRowDeserializationSchema(record); + } + + /** + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. + * Replaces generic Utf8 with basic String type information. + */ + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List<Schema.Field> fields = schema.getFields(); + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted; + + final TypeInformation<?>[] types = new TypeInformation<?> [fields.size()] ; + final String[] names = new String [fields.size()] ; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo<?>) { + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } + + private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) { — End diff – This method can be removed as well, if we refactor `KafkaTableSource` as described below.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113241294

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.avro.reflect.ReflectDatumReader;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Deserialization schema from Avro bytes over

          {@link SpecificRecord}

          to

          {@link Row}

          .
          + *
          + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
          + *
          + *

          {@link Utf8}

          is converted to regular Java Strings.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /**
          + * Schema for deterministic field order.
          + */
          + private final Schema schema;
          +
          + /**
          + * Reader that deserializes byte array into a record.
          + */
          + private final DatumReader<GenericRecord> datumReader;
          +
          + /**
          + * Input stream to read message from.
          + */
          + private final MutableByteArrayInputStream inputStream;
          +
          + /**
          + * Avro decoder that decodes binary data
          + */
          + private final Decoder decoder;
          +
          + /**
          + * Record to deserialize byte array to.
          + */
          + private GenericRecord record;
          — End diff –

          `GenericRecord` -> `SpecificRecord`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241294 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row} . + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<GenericRecord> datumReader; + + /** + * Input stream to read message from. + */ + private final MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data + */ + private final Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private GenericRecord record; — End diff – `GenericRecord` -> `SpecificRecord`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113179453

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import java.util.List;
          +import java.util.Properties;
          +import org.apache.avro.Schema;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.specific.SpecificRecordBase;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.AvroTypeInfo;
          +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +import org.apache.flink.table.sources.StreamTableSource;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          using a given

          {@link SpecificRecord}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param record Avro specific record.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + Class<? extends SpecificRecordBase> record)

          { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record)

          { + return new AvroRowDeserializationSchema(record); + }

          +
          + /**
          + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
          + * Replaces generic Utf8 with basic String type information.
          + */
          + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) {
          + if (schema.getType() == Schema.Type.RECORD) {
          + final List<Schema.Field> fields = schema.getFields();
          + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
          +
          + final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
          + final String[] names = new String[fields.size()];
          + for (int i = 0; i < fields.size(); i++)

          { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + }

          + return new RowTypeInfo(types, names);
          + } else if (extracted instanceof GenericTypeInfo<?>) {
          + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
          + if (genericTypeInfo.getTypeClass() == Utf8.class)

          { + return BasicTypeInfo.STRING_TYPE_INFO; + }

          + }
          + return extracted;
          + }
          +
          + private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) {
          — End diff –

          `record` -> `avroClass`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113179453 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) { + return new AvroRowDeserializationSchema(record); + } + + /** + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. + * Replaces generic Utf8 with basic String type information. + */ + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List<Schema.Field> fields = schema.getFields(); + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted; + + final TypeInformation<?>[] types = new TypeInformation<?> [fields.size()] ; + final String[] names = new String [fields.size()] ; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo<?>) { + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } + + private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) { — End diff – `record` -> `avroClass`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113237296

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,122 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.avro.reflect.ReflectDatumWriter;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Serialization schema that serializes

          {@link Row}

          over

          {@link SpecificRecord}

          into a Avro bytes.
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /**
          + * Avro serialization schema.
          + */
          + private final Schema schema;
          +
          + /**
          + * Writer to serialize Avro record into a byte array.
          + */
          + private final DatumWriter<GenericRecord> datumWriter;
          +
          + /**
          + * Output stream to serialize records into byte array.
          + */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          +
          + /**
          + * Low-level class for serialization of Avro values.
          + */
          + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
          +
          + /**
          + * Creates a Avro serialization schema for the given schema.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz)

          { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + }

          +
          + @Override
          + @SuppressWarnings("unchecked")
          + public byte[] serialize(Row row) {
          + // convert to record
          + final Object record = convertToRecord(schema, row);
          +
          + // write
          + try

          { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + }

          catch (IOException e)

          { + throw new RuntimeException("Failed to serialize Row.", e); + }

          + }
          +
          + /**
          + * Converts a (nested) Flink Row into Avro's

          {@link GenericRecord}

          .
          + * Strings are converted into Avro's

          {@link Utf8}

          fields.
          + */
          + private static Object convertToRecord(Schema schema, Object rowObj) {
          + if (rowObj instanceof Row) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
          — End diff –

          This limitation exists because the Table API cannot handle UNION types either, right?
          Isn't this the same as having a nullable record field?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113237296 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord} . + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { — End diff – This limitation exists because the Table API cannot handle UNION types either, right? Isn't this the same as having a nullable record field?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113241221

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.avro.reflect.ReflectDatumReader;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Deserialization schema from Avro bytes over

          {@link SpecificRecord}

          to

          {@link Row}

          .
          + *
          + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
          + *
          + *

          {@link Utf8}

          is converted to regular Java Strings.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /**
          + * Schema for deterministic field order.
          + */
          + private final Schema schema;
          +
          + /**
          + * Reader that deserializes byte array into a record.
          + */
          + private final DatumReader<GenericRecord> datumReader;
          — End diff –

          `GenericRecord` -> `SpecificRecord`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241221 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row} . + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<GenericRecord> datumReader; — End diff – `GenericRecord` -> `SpecificRecord`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113241182

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.avro.reflect.ReflectDatumReader;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Deserialization schema from Avro bytes over

          {@link SpecificRecord}

          to

          {@link Row}

          .
          + *
          + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
          + *
          + *

          {@link Utf8}

          is converted to regular Java Strings.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /**
          + * Schema for deterministic field order.
          + */
          + private final Schema schema;
          +
          + /**
          + * Reader that deserializes byte array into a record.
          + */
          + private final DatumReader<GenericRecord> datumReader;
          +
          + /**
          + * Input stream to read message from.
          + */
          + private final MutableByteArrayInputStream inputStream;
          +
          + /**
          + * Avro decoder that decodes binary data
          + */
          + private final Decoder decoder;
          +
          + /**
          + * Record to deserialize byte array to.
          + */
          + private GenericRecord record;
          +
          + /**
          + * Creates a Avro deserialization schema for the given record.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
          + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
          + this.schema = SpecificData.get().getSchema(recordClazz);
          + this.datumReader = new ReflectDatumReader<>(schema);
          + this.record = new GenericData.Record(schema);
          — End diff –

          We can use a specific record here. We have the class for it.
          ```
          this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241182 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row} . + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<GenericRecord> datumReader; + + /** + * Input stream to read message from. + */ + private final MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data + */ + private final Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private GenericRecord record; + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); — End diff – We can use a specific record here. We have the class for it. ``` this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113236676

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,122 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.avro.reflect.ReflectDatumWriter;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Serialization schema that serializes

          {@link Row}

          over

          {@link SpecificRecord}

          into a Avro bytes.
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /**
          + * Avro serialization schema.
          + */
          + private final Schema schema;
          +
          + /**
          + * Writer to serialize Avro record into a byte array.
          + */
          + private final DatumWriter<GenericRecord> datumWriter;
          +
          + /**
          + * Output stream to serialize records into byte array.
          + */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          +
          + /**
          + * Low-level class for serialization of Avro values.
          + */
          + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
          +
          + /**
          + * Creates a Avro serialization schema for the given schema.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz)

          { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + }

          +
          + @Override
          + @SuppressWarnings("unchecked")
          + public byte[] serialize(Row row) {
          + // convert to record
          + final Object record = convertToRecord(schema, row);
          +
          + // write
          + try

          { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + }

          catch (IOException e)

          { + throw new RuntimeException("Failed to serialize Row.", e); + }

          + }
          +
          + /**
          + * Converts a (nested) Flink Row into Avro's

          {@link GenericRecord}

          .
          + * Strings are converted into Avro's

          {@link Utf8}

          fields.
          + */
          + private static Object convertToRecord(Schema schema, Object rowObj) {
          + if (rowObj instanceof Row) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
          — End diff –

          are union types always ordered? Could it happen that type `0` is `RECORD` and `1` is `NULL`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113236676 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord} . + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { — End diff – are union types always ordered? Could it happen that type `0` is `RECORD` and `1` is `NULL`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113243381

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.avro.reflect.ReflectDatumReader;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Deserialization schema from Avro bytes over

          {@link SpecificRecord} to {@link Row}.
          + *
          + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
          + *
          + * {@link Utf8} is converted to regular Java Strings.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /**
          + * Schema for deterministic field order.
          + */
          + private final Schema schema;
          +
          + /**
          + * Reader that deserializes byte array into a record.
          + */
          + private final DatumReader<GenericRecord> datumReader;
          +
          + /**
          + * Input stream to read message from.
          + */
          + private final MutableByteArrayInputStream inputStream;
          +
          + /**
          + * Avro decoder that decodes binary data
          + */
          + private final Decoder decoder;
          +
          + /**
          + * Record to deserialize byte array to.
          + */
          + private GenericRecord record;
          +
          + /**
          + * Creates a Avro deserialization schema for the given record.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + }
          +
          + @Override
          + public Row deserialize(byte[] message) throws IOException {
          + // read record
          + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + }
          +
          + // convert to row
          + final Object row = convertToRow(schema, record);
          + return (Row) row;
          + }
          +
          + /**
          + * Converts a (nested) Avro {@link SpecificRecord}

          into Flink's Row type.
          + * Avro's

          {@link Utf8}

          fields are converted into regular Java strings.
          + */
          + private static Object convertToRow(Schema schema, Object recordObj) {
          + if (recordObj instanceof GenericRecord) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD)

          { + schema = types.get(1); + }

          + else

          { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + }

          + } else if (schema.getType() != Schema.Type.RECORD)

          { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + }

          + final List<Schema.Field> fields = schema.getFields();
          + final Row row = new Row(fields.size());
          + for (int i = 0; i < fields.size(); i++) {
          + final Schema.Field field = fields.get;
          + final GenericRecord record = (GenericRecord) recordObj;
          — End diff –

          Move cast out of the loop

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113243381 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<GenericRecord> datumReader; + + /** + * Input stream to read message from. + */ + private final MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data + */ + private final Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private GenericRecord record; + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** + * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. + * Avro's {@link Utf8} fields are converted into regular Java strings. + */ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final Row row = new Row(fields.size()); + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get ; + final GenericRecord record = (GenericRecord) recordObj; — End diff – Move cast out of the loop
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113207375

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import java.util.List;
          +import java.util.Properties;
          +import org.apache.avro.Schema;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.specific.SpecificRecordBase;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.AvroTypeInfo;
          +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
          +import org.apache.flink.table.sources.StreamTableSource;
          +
          +/**
          + * A version-agnostic Kafka Avro

          {@link StreamTableSource}.
          + *
          + * <p>The version-specific Kafka consumers need to extend this class and
          + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
          + */
          +public abstract class KafkaAvroTableSource extends KafkaTableSource {
          +
          + /**
          + * Creates a generic Kafka Avro {@link StreamTableSource}

          using a given

          {@link SpecificRecord}

          .
          + *
          + * @param topic Kafka topic to consume.
          + * @param properties Properties for the Kafka consumer.
          + * @param record Avro specific record.
          + */
          + KafkaAvroTableSource(
          + String topic,
          + Properties properties,
          + Class<? extends SpecificRecordBase> record)

          { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + }

          +
          + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record)

          { + return new AvroRowDeserializationSchema(record); + }

          +
          + /**
          + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
          + * Replaces generic Utf8 with basic String type information.
          + */
          + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) {
          + if (schema.getType() == Schema.Type.RECORD) {
          + final List<Schema.Field> fields = schema.getFields();
          + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted;
          +
          + final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()];
          + final String[] names = new String[fields.size()];
          + for (int i = 0; i < fields.size(); i++)

          { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + }

          + return new RowTypeInfo(types, names);
          + } else if (extracted instanceof GenericTypeInfo<?>) {
          + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted;
          + if (genericTypeInfo.getTypeClass() == Utf8.class)

          { + return BasicTypeInfo.STRING_TYPE_INFO; + }

          + }
          + return extracted;
          + }
          +
          + private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) {
          + final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(record);
          + // determine schema to retrieve deterministic field order
          + final Schema schema = SpecificData.get().getSchema(record);
          + final RowTypeInfo row = (RowTypeInfo) convertToRowTypeInformation(avroTypeInfo, schema);
          + final TypeInformation<?>[] types = new TypeInformation<?>[row.getArity()];
          + for (int i = 0; i < row.getArity(); i++)

          { + types[i] = row.getTypeAt(i); + }

          + return types;
          + }
          +
          + private static String[] createFieldNames(Class<? extends SpecificRecord> record) {
          — End diff –

          We could refactor `KafkaTableSource` to not require a `String[] fieldNames` and a `TypeInformation[] fieldTypes` as constructor parameters but just a `TypeInformation<Row> rowType`.

          The field names are a leftover from the time when `TableSource` did not publish the field names by the `TypeInformation`.

          When we do that, we can remove this method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113207375 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord} . + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) { + return new AvroRowDeserializationSchema(record); + } + + /** + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. + * Replaces generic Utf8 with basic String type information. + */ + private static TypeInformation<?> convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List<Schema.Field> fields = schema.getFields(); + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted; + + final TypeInformation<?>[] types = new TypeInformation<?> [fields.size()] ; + final String[] names = new String [fields.size()] ; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo<?>) { + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } + + private static <T extends SpecificRecordBase> TypeInformation<?>[] createFieldTypes(Class<T> record) { + final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(record); + // determine schema to retrieve deterministic field order + final Schema schema = SpecificData.get().getSchema(record); + final RowTypeInfo row = (RowTypeInfo) convertToRowTypeInformation(avroTypeInfo, schema); + final TypeInformation<?>[] types = new TypeInformation<?> [row.getArity()] ; + for (int i = 0; i < row.getArity(); i++) { + types[i] = row.getTypeAt(i); + } + return types; + } + + private static String[] createFieldNames(Class<? extends SpecificRecord> record) { — End diff – We could refactor `KafkaTableSource` to not require a `String[] fieldNames` and a `TypeInformation[] fieldTypes` as constructor parameters but just a `TypeInformation<Row> rowType`. The field names are a leftover from the time when `TableSource` did not publish the field names by the `TypeInformation`. When we do that, we can remove this method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113241700

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          — End diff –

          Change all `GenericRecord` to `SpecificRecord`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241700 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; — End diff – Change all `GenericRecord` to `SpecificRecord`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113243664

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.avro.reflect.ReflectDatumReader;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Deserialization schema from Avro bytes over

          {@link SpecificRecord} to {@link Row}.
          + *
          + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
          + *
          + * {@link Utf8} is converted to regular Java Strings.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /**
          + * Schema for deterministic field order.
          + */
          + private final Schema schema;
          +
          + /**
          + * Reader that deserializes byte array into a record.
          + */
          + private final DatumReader<GenericRecord> datumReader;
          +
          + /**
          + * Input stream to read message from.
          + */
          + private final MutableByteArrayInputStream inputStream;
          +
          + /**
          + * Avro decoder that decodes binary data
          + */
          + private final Decoder decoder;
          +
          + /**
          + * Record to deserialize byte array to.
          + */
          + private GenericRecord record;
          +
          + /**
          + * Creates a Avro deserialization schema for the given record.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + }
          +
          + @Override
          + public Row deserialize(byte[] message) throws IOException {
          + // read record
          + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + }
          +
          + // convert to row
          + final Object row = convertToRow(schema, record);
          + return (Row) row;
          + }
          +
          + /**
          + * Converts a (nested) Avro {@link SpecificRecord}

          into Flink's Row type.
          + * Avro's

          {@link Utf8}

          fields are converted into regular Java strings.
          + */
          + private static Object convertToRow(Schema schema, Object recordObj) {
          + if (recordObj instanceof GenericRecord) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD)

          { + schema = types.get(1); + }

          + else

          { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + }

          + } else if (schema.getType() != Schema.Type.RECORD)

          { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + }

          + final List<Schema.Field> fields = schema.getFields();
          + final Row row = new Row(fields.size());
          — End diff –

          Can we create a `Row` once and reuse it here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113243664 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<GenericRecord> datumReader; + + /** + * Input stream to read message from. + */ + private final MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data + */ + private final Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private GenericRecord record; + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** + * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. + * Avro's {@link Utf8} fields are converted into regular Java strings. + */ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final Row row = new Row(fields.size()); — End diff – Can we create a `Row` once and reuse it here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113244628

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,122 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.avro.reflect.ReflectDatumWriter;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Serialization schema that serializes

          {@link Row}

          over

          {@link SpecificRecord}

          into a Avro bytes.
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /**
          + * Avro serialization schema.
          + */
          + private final Schema schema;
          +
          + /**
          + * Writer to serialize Avro record into a byte array.
          + */
          + private final DatumWriter<GenericRecord> datumWriter;
          +
          + /**
          + * Output stream to serialize records into byte array.
          + */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          +
          + /**
          + * Low-level class for serialization of Avro values.
          + */
          + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
          +
          + /**
          + * Creates a Avro serialization schema for the given schema.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz)

          { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + }

          +
          + @Override
          + @SuppressWarnings("unchecked")
          + public byte[] serialize(Row row) {
          + // convert to record
          + final Object record = convertToRecord(schema, row);
          +
          + // write
          + try

          { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + }

          catch (IOException e)

          { + throw new RuntimeException("Failed to serialize Row.", e); + }

          + }
          +
          + /**
          + * Converts a (nested) Flink Row into Avro's

          {@link GenericRecord}

          .
          + * Strings are converted into Avro's

          {@link Utf8}

          fields.
          + */
          + private static Object convertToRecord(Schema schema, Object rowObj) {
          + if (rowObj instanceof Row) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
          — End diff –

          See comment on UNION in deserializer

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113244628 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord} . + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { — End diff – See comment on UNION in deserializer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113244339

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.avro.reflect.ReflectDatumReader;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Deserialization schema from Avro bytes over

          {@link SpecificRecord} to {@link Row}.
          + *
          + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
          + *
          + * {@link Utf8} is converted to regular Java Strings.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /**
          + * Schema for deterministic field order.
          + */
          + private final Schema schema;
          +
          + /**
          + * Reader that deserializes byte array into a record.
          + */
          + private final DatumReader<GenericRecord> datumReader;
          +
          + /**
          + * Input stream to read message from.
          + */
          + private final MutableByteArrayInputStream inputStream;
          +
          + /**
          + * Avro decoder that decodes binary data
          + */
          + private final Decoder decoder;
          +
          + /**
          + * Record to deserialize byte array to.
          + */
          + private GenericRecord record;
          +
          + /**
          + * Creates a Avro deserialization schema for the given record.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + }
          +
          + @Override
          + public Row deserialize(byte[] message) throws IOException {
          + // read record
          + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + }
          +
          + // convert to row
          + final Object row = convertToRow(schema, record);
          + return (Row) row;
          + }
          +
          + /**
          + * Converts a (nested) Avro {@link SpecificRecord}

          into Flink's Row type.
          + * Avro's

          {@link Utf8}

          fields are converted into regular Java strings.
          + */
          + private static Object convertToRow(Schema schema, Object recordObj) {
          + if (recordObj instanceof GenericRecord) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          — End diff –

          Not sure if we should support `UNION` at all.
          If the you have a UNION[NULL, RECORD] field in Avro, you'd expect it to be represented also as UNION field in a Table.
          We change it here to a nullable Record field. Not sure if that's expected.

          Should we just not accept it (its a corner case anyway) and add support once the Table API / SQL support union types?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113244339 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<GenericRecord> datumReader; + + /** + * Input stream to read message from. + */ + private final MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data + */ + private final Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private GenericRecord record; + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** + * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. + * Avro's {@link Utf8} fields are converted into regular Java strings. + */ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { — End diff – Not sure if we should support `UNION` at all. If the you have a UNION [NULL, RECORD] field in Avro, you'd expect it to be represented also as UNION field in a Table. We change it here to a nullable Record field. Not sure if that's expected. Should we just not accept it (its a corner case anyway) and add support once the Table API / SQL support union types?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113415863

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,122 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.avro.reflect.ReflectDatumWriter;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Serialization schema that serializes

          {@link Row}

          over

          {@link SpecificRecord}

          into a Avro bytes.
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /**
          + * Avro serialization schema.
          + */
          + private final Schema schema;
          +
          + /**
          + * Writer to serialize Avro record into a byte array.
          + */
          + private final DatumWriter<GenericRecord> datumWriter;
          +
          + /**
          + * Output stream to serialize records into byte array.
          + */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          +
          + /**
          + * Low-level class for serialization of Avro values.
          + */
          + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
          +
          + /**
          + * Creates a Avro serialization schema for the given schema.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz)

          { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + }

          +
          + @Override
          + @SuppressWarnings("unchecked")
          + public byte[] serialize(Row row) {
          + // convert to record
          + final Object record = convertToRecord(schema, row);
          +
          + // write
          + try

          { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + }

          catch (IOException e)

          { + throw new RuntimeException("Failed to serialize Row.", e); + }

          + }
          +
          + /**
          + * Converts a (nested) Flink Row into Avro's

          {@link GenericRecord}

          .
          + * Strings are converted into Avro's

          {@link Utf8}

          fields.
          + */
          + private static Object convertToRecord(Schema schema, Object rowObj) {
          + if (rowObj instanceof Row) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
          — End diff –

          Thanks I added the case for the reverse order. This code is needed for nullable records in order to access the schema.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113415863 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord} . + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { — End diff – Thanks I added the case for the reverse order. This code is needed for nullable records in order to access the schema.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113416754

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,122 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.avro.reflect.ReflectDatumWriter;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Serialization schema that serializes

          {@link Row}

          over

          {@link SpecificRecord}

          into a Avro bytes.
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /**
          + * Avro serialization schema.
          + */
          + private final Schema schema;
          +
          + /**
          + * Writer to serialize Avro record into a byte array.
          + */
          + private final DatumWriter<GenericRecord> datumWriter;
          +
          + /**
          + * Output stream to serialize records into byte array.
          + */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          +
          + /**
          + * Low-level class for serialization of Avro values.
          + */
          + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
          +
          + /**
          + * Creates a Avro serialization schema for the given schema.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz)

          { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + }

          +
          + @Override
          + @SuppressWarnings("unchecked")
          + public byte[] serialize(Row row) {
          + // convert to record
          + final Object record = convertToRecord(schema, row);
          +
          + // write
          + try

          { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + }

          catch (IOException e)

          { + throw new RuntimeException("Failed to serialize Row.", e); + }

          + }
          +
          + /**
          + * Converts a (nested) Flink Row into Avro's

          {@link GenericRecord}

          .
          + * Strings are converted into Avro's

          {@link Utf8}

          fields.
          + */
          + private static Object convertToRecord(Schema schema, Object rowObj) {
          + if (rowObj instanceof Row) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
          — End diff –

          Ah, OK. I see. Then let's keep it

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113416754 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord} . + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { — End diff – Ah, OK. I see. Then let's keep it
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113416949

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,122 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.avro.reflect.ReflectDatumWriter;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Serialization schema that serializes

          {@link Row}

          over

          {@link SpecificRecord}

          into a Avro bytes.
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /**
          + * Avro serialization schema.
          + */
          + private final Schema schema;
          +
          + /**
          + * Writer to serialize Avro record into a byte array.
          + */
          + private final DatumWriter<GenericRecord> datumWriter;
          +
          + /**
          + * Output stream to serialize records into byte array.
          + */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          +
          + /**
          + * Low-level class for serialization of Avro values.
          + */
          + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
          +
          + /**
          + * Creates a Avro serialization schema for the given schema.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz)

          { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + }

          +
          + @Override
          + @SuppressWarnings("unchecked")
          + public byte[] serialize(Row row) {
          + // convert to record
          + final Object record = convertToRecord(schema, row);
          +
          + // write
          + try

          { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + }

          catch (IOException e)

          { + throw new RuntimeException("Failed to serialize Row.", e); + }

          + }
          +
          + /**
          + * Converts a (nested) Flink Row into Avro's

          {@link GenericRecord}

          .
          + * Strings are converted into Avro's

          {@link Utf8}

          fields.
          + */
          + private static Object convertToRecord(Schema schema, Object rowObj) {
          + if (rowObj instanceof Row) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD)

          { + schema = types.get(1); + }

          + else

          { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + }

          + } else if (schema.getType() != Schema.Type.RECORD)

          { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + }

          + final List<Schema.Field> fields = schema.getFields();
          + final GenericRecord record = new GenericData.Record(schema);
          — End diff –

          I don't think so, because of the recursive execution for nested records.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113416949 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord} . + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema); — End diff – I don't think so, because of the recursive execution for nested records.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113417751

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java —
          @@ -0,0 +1,122 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayOutputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumWriter;
          +import org.apache.avro.io.Encoder;
          +import org.apache.avro.io.EncoderFactory;
          +import org.apache.avro.reflect.ReflectDatumWriter;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Serialization schema that serializes

          {@link Row}

          over

          {@link SpecificRecord}

          into a Avro bytes.
          + */
          +public class AvroRowSerializationSchema implements SerializationSchema<Row> {
          +
          + /**
          + * Avro serialization schema.
          + */
          + private final Schema schema;
          +
          + /**
          + * Writer to serialize Avro record into a byte array.
          + */
          + private final DatumWriter<GenericRecord> datumWriter;
          +
          + /**
          + * Output stream to serialize records into byte array.
          + */
          + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
          +
          + /**
          + * Low-level class for serialization of Avro values.
          + */
          + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
          +
          + /**
          + * Creates a Avro serialization schema for the given schema.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz)

          { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + }

          +
          + @Override
          + @SuppressWarnings("unchecked")
          + public byte[] serialize(Row row) {
          + // convert to record
          + final Object record = convertToRecord(schema, row);
          +
          + // write
          + try

          { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + }

          catch (IOException e)

          { + throw new RuntimeException("Failed to serialize Row.", e); + }

          + }
          +
          + /**
          + * Converts a (nested) Flink Row into Avro's

          {@link GenericRecord}

          .
          + * Strings are converted into Avro's

          {@link Utf8}

          fields.
          + */
          + private static Object convertToRecord(Schema schema, Object rowObj) {
          + if (rowObj instanceof Row) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD)

          { + schema = types.get(1); + }

          + else

          { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + }

          + } else if (schema.getType() != Schema.Type.RECORD)

          { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + }

          + final List<Schema.Field> fields = schema.getFields();
          + final GenericRecord record = new GenericData.Record(schema);
          — End diff –

          Those could be reused as well (and recreated if `null` at some point).
          I think it would be possible, but a bit fiddly.
          So let's keep it as it is for now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113417751 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java — @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord} . + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema); — End diff – Those could be reused as well (and recreated if `null` at some point). I think it would be possible, but a bit fiddly. So let's keep it as it is for now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113437414

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.avro.reflect.ReflectDatumReader;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Deserialization schema from Avro bytes over

          {@link SpecificRecord} to {@link Row}.
          + *
          + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
          + *
          + * {@link Utf8} is converted to regular Java Strings.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /**
          + * Schema for deterministic field order.
          + */
          + private final Schema schema;
          +
          + /**
          + * Reader that deserializes byte array into a record.
          + */
          + private final DatumReader<GenericRecord> datumReader;
          +
          + /**
          + * Input stream to read message from.
          + */
          + private final MutableByteArrayInputStream inputStream;
          +
          + /**
          + * Avro decoder that decodes binary data
          + */
          + private final Decoder decoder;
          +
          + /**
          + * Record to deserialize byte array to.
          + */
          + private GenericRecord record;
          +
          + /**
          + * Creates a Avro deserialization schema for the given record.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + }
          +
          + @Override
          + public Row deserialize(byte[] message) throws IOException {
          + // read record
          + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + }
          +
          + // convert to row
          + final Object row = convertToRow(schema, record);
          + return (Row) row;
          + }
          +
          + /**
          + * Converts a (nested) Avro {@link SpecificRecord}

          into Flink's Row type.
          + * Avro's

          {@link Utf8}

          fields are converted into regular Java strings.
          + */
          + private static Object convertToRow(Schema schema, Object recordObj) {
          + if (recordObj instanceof GenericRecord) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD)

          { + schema = types.get(1); + }

          + else

          { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + }

          + } else if (schema.getType() != Schema.Type.RECORD)

          { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + }

          + final List<Schema.Field> fields = schema.getFields();
          + final Row row = new Row(fields.size());
          — End diff –

          No, because `Row` can again be nested.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113437414 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<GenericRecord> datumReader; + + /** + * Input stream to read message from. + */ + private final MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data + */ + private final Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private GenericRecord record; + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** + * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. + * Avro's {@link Utf8} fields are converted into regular Java strings. + */ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final Row row = new Row(fields.size()); — End diff – No, because `Row` can again be nested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3663#discussion_r113439992

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java —
          @@ -0,0 +1,157 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import java.io.ByteArrayInputStream;
          +import java.io.IOException;
          +import java.util.List;
          +import org.apache.avro.Schema;
          +import org.apache.avro.generic.GenericData;
          +import org.apache.avro.generic.GenericRecord;
          +import org.apache.avro.io.DatumReader;
          +import org.apache.avro.io.Decoder;
          +import org.apache.avro.io.DecoderFactory;
          +import org.apache.avro.reflect.ReflectDatumReader;
          +import org.apache.avro.specific.SpecificData;
          +import org.apache.avro.specific.SpecificRecord;
          +import org.apache.avro.util.Utf8;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * Deserialization schema from Avro bytes over

          {@link SpecificRecord} to {@link Row}.
          + *
          + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
          + *
          + * {@link Utf8} is converted to regular Java Strings.
          + */
          +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
          +
          + /**
          + * Schema for deterministic field order.
          + */
          + private final Schema schema;
          +
          + /**
          + * Reader that deserializes byte array into a record.
          + */
          + private final DatumReader<GenericRecord> datumReader;
          +
          + /**
          + * Input stream to read message from.
          + */
          + private final MutableByteArrayInputStream inputStream;
          +
          + /**
          + * Avro decoder that decodes binary data
          + */
          + private final Decoder decoder;
          +
          + /**
          + * Record to deserialize byte array to.
          + */
          + private GenericRecord record;
          +
          + /**
          + * Creates a Avro deserialization schema for the given record.
          + *
          + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
          + */
          + @SuppressWarnings("unchecked")
          + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + }
          +
          + @Override
          + public Row deserialize(byte[] message) throws IOException {
          + // read record
          + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + }
          +
          + // convert to row
          + final Object row = convertToRow(schema, record);
          + return (Row) row;
          + }
          +
          + /**
          + * Converts a (nested) Avro {@link SpecificRecord}

          into Flink's Row type.
          + * Avro's

          {@link Utf8}

          fields are converted into regular Java strings.
          + */
          + private static Object convertToRow(Schema schema, Object recordObj) {
          + if (recordObj instanceof GenericRecord) {
          + // records can be wrapped in a union
          + if (schema.getType() == Schema.Type.UNION) {
          + final List<Schema> types = schema.getTypes();
          + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD)

          { + schema = types.get(1); + }

          + else

          { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + }

          + } else if (schema.getType() != Schema.Type.RECORD)

          { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + }

          + final List<Schema.Field> fields = schema.getFields();
          + final Row row = new Row(fields.size());
          — End diff –

          Sure, but the reusable row could also hold nested rows

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113439992 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java — @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<GenericRecord> datumReader; + + /** + * Input stream to read message from. + */ + private final MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data + */ + private final Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private GenericRecord record; + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** + * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. + * Avro's {@link Utf8} fields are converted into regular Java strings. + */ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final Row row = new Row(fields.size()); — End diff – Sure, but the reusable row could also hold nested rows
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3663

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3663
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: bbc5e29c8df71950c6216cf490817ef002c140c5

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: bbc5e29c8df71950c6216cf490817ef002c140c5
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2762

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2762

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development