Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-19586

camel-parquet-avro - Allow users to unmarshal Parquet file into Avro's GenericRecords

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 4.0-RC2, 4.0.0
    • None
    • None
    • Unknown

    Description

      Currently, the Parquet-Avro data format requires users to define a POJO class for marshalling and unmarshalling. It's a bit bothering especially for unmarshalling an existing Parquet file with a complicated data structure.

      Avro provides GenericRecord in a such case, but it doesn't work with the current unmarshaller for now, as follows:

      $ cat example.java
      ///usr/bin/env jbang "$0" "$@" ; exit $?
      //DEPS org.slf4j:slf4j-simple:1.7.31
      //DEPS org.apache.camel:camel-bom:4.0.0-M3@pom
      //DEPS org.apache.camel:camel-core
      //DEPS org.apache.camel:camel-main
      //DEPS org.apache.camel:camel-parquet-avro:4.0.0-SNAPSHOT
      //DEPS org.apache.hadoop:hadoop-client:3.3.6
      
      import org.apache.avro.generic.GenericRecord;
      import org.apache.camel.*;
      import org.apache.camel.builder.*;
      import org.apache.camel.dataformat.parquet.avro.*;
      import org.apache.camel.main.*;
      import org.apache.camel.spi.*;
      import static org.apache.camel.builder.PredicateBuilder.*;
      
      class example {
      
          public static void main(String... args) throws Exception {
              System.setProperty("org.slf4j.simpleLogger.logFile", "System.out");
      
              Main main = new Main();
              ParquetAvroDataFormat format = new ParquetAvroDataFormat();
              format.setUnmarshalType(GenericRecord.class);
              main.configure().addRoutesBuilder(new RouteBuilder() {
                  public void configure() throws Exception {
                      from("file:/tmp?fileName=example1.parquet&noop=true")
                          .unmarshal(format)
                          .marshal(format)
                          .log("${body}");
                  }
              });
              main.run();
          }
      }
      $ jbang example.java
      
      ...
      
      [Camel (camel-1) thread #1 - file:///tmp] ERROR org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery for (MessageId: 356E1287483C55C-0000000000000000 on ExchangeId: 356E1287483C55C-0000000000000000). Exhausted after delivery attempt: 1 caught: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
      
      Message History (source location and message history is disabled)
      ---------------------------------------------------------------------------------------------------------------------------------------
      Source                                   ID                             Processor                                          Elapsed (ms)
                                               route1/route1                  from[file:///tmp?fileName=example1.parquet&noop=tr     89591203
      	...
                                               route1/marshal1                marshal[org.apache.camel.model.DataFormatDefinitio            0
      
      Stacktrace
      ---------------------------------------------------------------------------------------------------------------------------------------
      org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
      	at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:403)
      	at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:734)
      	at org.apache.avro.specific.SpecificData.lambda$new$3(SpecificData.java:337)
      	at org.apache.avro.util.internal.ClassValueCache$1.computeValue(ClassValueCache.java:35)
      	at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
      	at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
      	at java.base/java.lang.ClassValue.get(ClassValue.java:116)
      	at org.apache.avro.util.internal.ClassValueCache.apply(ClassValueCache.java:45)
      	at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346)
      	at org.apache.camel.dataformat.parquet.avro.ParquetAvroDataFormat.marshal(ParquetAvroDataFormat.java:70)
      	at org.apache.camel.support.processor.MarshalProcessor.process(MarshalProcessor.java:64)
      	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:164)
      	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:379)
      	at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:491)
      	at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:244)
      	at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:205)
      	at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202)
      	at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
      	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
      	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      	at java.base/java.lang.Thread.run(Thread.java:833)
      

      So I'd like to propose a new feature to unmarshal Parquet data into Avro's GenericRecord (and vice versa) if POJO is not specified as unmarshalType.

      Attachments

        Issue Links

          Activity

            People

              sekikn Kengo Seki
              sekikn Kengo Seki
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: