Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.0.0
-
None
Description
I'm running Flume and Spark on 2 EC2 instances (one for flume, one for spark).
Flume config:
agent.channels = ch1 agent.channels.ch1.type = SPILLABLEMEMORY agent.channels.ch1.capacity = 10000 agent.channels.ch1.transactionCapacity=10 agent.sources = seq agent.sources.seq.type = seq agent.sources.seq.channels = ch1 agent.sinks = spark agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink agent.sinks.spark.hostname = 0.0.0.0 agent.sinks.spark.port = 5005 agent.sinks.spark.channel = ch1
I run flume with:
./bin/flume-ng agent -Xmx512m --name agent -c conf/ -f rabbit.conf -Dflume.root.logger=INFO,console -C plugins.d/flume-ng-rabbitmq-master/target/flume-rabbitmq-channel-1.0-SNAPSHOT.jar:plugins.d/spark-streaming-flume-sink_2.11-2.0.0.jar
Then from the other machine I try to consume Spark Sink:
spark-2.0.0-bin-hadoop2.7$ bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.0.0 flume_wordcount.py 52.210.48.242 5005
with flume_wordcount.py variation (utilizing Polling Stream):
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.flume import FlumeUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: flume_wordcount.py <hostname> <port> <batch_size>", file=sys.stderr) exit(-1) hostname, port= sys.argv[1:] sc = SparkContext(appName="PythonStreamingFlumeWordCount") ssc = StreamingContext(sc, 10) kvs = FlumeUtils.createPollingStream(ssc, [[hostname, int(port)]], maxBatchSize=10, parallelism=1) lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
Whatever I do, I always get this on Flume:
2016-08-19 14:04:48,679 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7d257a9f, /78.10.176.10:57551 => /10.28.230.185:5005] BOUND: /10.28.230.185:5005 2016-08-19 14:04:48,679 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7d257a9f, /78.10.176.10:57551 => /10.28.230.185:5005] CONNECTED: /78.10.176.10:57551 2016-08-19 14:04:49,401 (New I/O worker #1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Received an error batch - no events were received from channel! 2016-08-19 14:04:50,154 (Spark Sink Processor Thread - 1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:80)] Error while processing transaction. java.lang.IllegalStateException: begin() called when transaction is OPEN! at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.begin(SpillableMemoryChannel.java:305) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113) at scala.Option.foreach(Option.scala:236) at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2016-08-19 14:04:50,156 (Spark Sink Processor Thread - 1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Spark was unable to successfully process the events. Transaction is being rolled back. 2016-08-19 14:04:50,156 (Spark Sink Processor Thread - 1) [ERROR - org.apache.spark.streaming.flume.sink.Logging$class.logError(Logging.scala:84)] Error rolling back transaction. Rollback may have failed!
I'm using latest tgz downloaded from spark and flume. Additional packages for Flume are downloaded from maven central as jars.
Can anyone help with this?