Description
Currently, the Parquet-Avro data format requires users to define a POJO class for marshalling and unmarshalling. It's a bit bothering especially for unmarshalling an existing Parquet file with a complicated data structure.
Avro provides GenericRecord in a such case, but it doesn't work with the current unmarshaller for now, as follows:
$ cat example.java ///usr/bin/env jbang "$0" "$@" ; exit $? //DEPS org.slf4j:slf4j-simple:1.7.31 //DEPS org.apache.camel:camel-bom:4.0.0-M3@pom //DEPS org.apache.camel:camel-core //DEPS org.apache.camel:camel-main //DEPS org.apache.camel:camel-parquet-avro:4.0.0-SNAPSHOT //DEPS org.apache.hadoop:hadoop-client:3.3.6 import org.apache.avro.generic.GenericRecord; import org.apache.camel.*; import org.apache.camel.builder.*; import org.apache.camel.dataformat.parquet.avro.*; import org.apache.camel.main.*; import org.apache.camel.spi.*; import static org.apache.camel.builder.PredicateBuilder.*; class example { public static void main(String... args) throws Exception { System.setProperty("org.slf4j.simpleLogger.logFile", "System.out"); Main main = new Main(); ParquetAvroDataFormat format = new ParquetAvroDataFormat(); format.setUnmarshalType(GenericRecord.class); main.configure().addRoutesBuilder(new RouteBuilder() { public void configure() throws Exception { from("file:/tmp?fileName=example1.parquet&noop=true") .unmarshal(format) .marshal(format) .log("${body}"); } }); main.run(); } } $ jbang example.java ... [Camel (camel-1) thread #1 - file:///tmp] ERROR org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery for (MessageId: 356E1287483C55C-0000000000000000 on ExchangeId: 356E1287483C55C-0000000000000000). Exhausted after delivery attempt: 1 caught: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord Message History (source location and message history is disabled) --------------------------------------------------------------------------------------------------------------------------------------- Source ID Processor Elapsed (ms) route1/route1 from[file:///tmp?fileName=example1.parquet&noop=tr 89591203 ... route1/marshal1 marshal[org.apache.camel.model.DataFormatDefinitio 0 Stacktrace --------------------------------------------------------------------------------------------------------------------------------------- org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:403) at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:734) at org.apache.avro.specific.SpecificData.lambda$new$3(SpecificData.java:337) at org.apache.avro.util.internal.ClassValueCache$1.computeValue(ClassValueCache.java:35) at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228) at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210) at java.base/java.lang.ClassValue.get(ClassValue.java:116) at org.apache.avro.util.internal.ClassValueCache.apply(ClassValueCache.java:45) at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346) at org.apache.camel.dataformat.parquet.avro.ParquetAvroDataFormat.marshal(ParquetAvroDataFormat.java:70) at org.apache.camel.support.processor.MarshalProcessor.process(MarshalProcessor.java:64) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) at org.apache.camel.processor.Pipeline.process(Pipeline.java:164) at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:379) at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:491) at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:244) at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:205) at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)
So I'd like to propose a new feature to unmarshal Parquet data into Avro's GenericRecord (and vice versa) if POJO is not specified as unmarshalType.
Attachments
Issue Links
- links to