Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.4.0
-
None
-
Using Elasticsearch 5.1.2 in a docker environment
Flink is deployed on a different docker
Description
I have a Elasticsearch sink configured. When a job is submitted, It goes into fail status in a few seconds.
Following is the Exception from the Job screen:
java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes! at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:80) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748)
In the logs, Following stack trace is shown.
2018-01-01 12:15:14,432 INFO org.elasticsearch.client.transport.TransportClientNodesService - failed to get node info for {#transport#-1}{8IZTMPcSRCyKRynhfyN2fA}{192.168.99.100}{192.168.99.100:9300}, disconnecting... NodeDisconnectedException[[][192.168.99.100:9300][cluster:monitor/nodes/liveness] disconnected] 2018-01-01 12:15:19,433 ERROR org.elasticsearch.transport.netty3.Netty3Utils - fatal error on the network layer at org.elasticsearch.transport.netty3.Netty3Utils.maybeDie(Netty3Utils.java:195) at org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:82) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:291) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:292) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2018-01-01 12:15:19,448 WARN org.elasticsearch.transport.netty3.Netty3Transport - exception caught on transport layer [[id: 0xef889995, /172.17.0.4:48450 => /192.168.99.100:9300]], closing connection ElasticsearchException[java.lang.NoClassDefFoundError: org/jboss/netty/channel/socket/nio/SocketSendBufferPool$GatheringSendBuffer]; nested: NoClassDefFoundError[org/jboss/netty/channel/socket/nio/SocketSendBufferPool$GatheringSendBuffer]; nested: ClassNotFoundException[org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer]; at org.elasticsearch.transport.netty3.Netty3Transport.exceptionCaught(Netty3Transport.java:325) at org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:83) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:291) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:292) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoClassDefFoundError: org/jboss/netty/channel/socket/nio/SocketSendBufferPool$GatheringSendBuffer at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:70) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:193) ... 11 more Caused by: java.lang.ClassNotFoundException: org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 14 more
It looks like a dependency issues with netty.
Relevant sections in POM
... <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.4.0</flink.version> <slf4j.version>1.7.7</slf4j.version> <log4j.version>1.2.17</log4j.version> <scala.binary.version>2.11</scala.binary.version> </properties> ... <dependencies> ... <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId> <version>1.4.0</version> </dependency> ... </dependencies> <profiles> <profile> <!-- Profile for packaging correct JAR files --> <id>build-jar</id> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId> <version>1.4.0</version> </dependency> </dependencies> <build> <plugins> <!-- disable the exclusion rules --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> <artifactSet> <excludes combine.self="override"></excludes> </artifactSet> </configuration> </execution> </executions> </plugin> </plugins> </build> ...
Elasticsearch is working as I am able to create indices and connected through kibana(in a separate docker).