Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7414

RabbitMqMessage can't be serialized due to LongString in headers

Details

    Description

      When trying to read messages from RabbitMq, I get systematic

      java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@234080e1}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(org.apache.beam.sdk.coders.SerializableCoder@206641ef)'. org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125) org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) java.util.HashMap.internalWriteEntries(HashMap.java:1785) java.util.HashMap.writeObject(HashMap.java:1362) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:183) org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53) org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105) org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:99) org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81) org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297) org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125) org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)

      When using Dataflow runner, due to the fact that RabbitMqMessage headers may contain LongString values, which don't implement the Serializable java interface

      Attachments

        Issue Links

          Activity

            People

              riduidel Nicolas Delsaux
              riduidel Nicolas Delsaux
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h 10m
                  3h 10m