Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.9, 0.10.0
-
None
Description
Failing the production of a result partition marks the respective partition as failed with a ProducerFailedException.
The cause of this exception can be a user defined class, which can only be loaded by the user code class loader. The network stack fails the shuffle with a RemoteTransportException, which has the user exception as a cause. When the consuming task receives this exception, this leads to a class not found exception, because the network stack tries to load the class with the system class loader.
+----------+ | FAILING | | PRODUCER | +----------+ || \/ ProducerFailedException(CAUSE) via network || \/ +----------+ | RECEIVER | +----------+
CAUSE is only loadable by the user code class loader.
When trying to deserialize this, RECEIVER fails with a LocalTransportException, which is super confusing, because the error is not local, but remote.
Thanks to rmetzger for reporting and debugging the issue with the following stack trace:
Flat Map (26/120) 14:03:00,343 ERROR org.apache.flink.streaming.runtime.tasks.OneInputStreamTask - Flat Map (26/120) failed java.lang.RuntimeException: Could not read next record. at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more Caused by: io.netty.handler.codec.DecoderException: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ... 12 more Caused by: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.flink.runtime.io.network.netty.NettyMessage$ErrorResponse.readFrom(NettyMessage.java:338) at org.apache.flink.runtime.io.network.netty.NettyMessage$ .decode(NettyMessage.java:161) at org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:123) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89) ... 13 more