Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17152

Spark Flume sink fails with begin() called when transaction is OPEN

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.0.0
    • None
    • DStreams

    Description

      I'm running Flume and Spark on 2 EC2 instances (one for flume, one for spark).

      Flume config:

      agent.channels = ch1
      agent.channels.ch1.type = SPILLABLEMEMORY
      agent.channels.ch1.capacity = 10000
      agent.channels.ch1.transactionCapacity=10
      
      agent.sources = seq
      agent.sources.seq.type = seq
      agent.sources.seq.channels = ch1
      
      agent.sinks = spark
      agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
      agent.sinks.spark.hostname = 0.0.0.0
      agent.sinks.spark.port = 5005
      agent.sinks.spark.channel = ch1
      

      I run flume with:

      ./bin/flume-ng agent -Xmx512m --name agent -c conf/ -f rabbit.conf -Dflume.root.logger=INFO,console -C plugins.d/flume-ng-rabbitmq-master/target/flume-rabbitmq-channel-1.0-SNAPSHOT.jar:plugins.d/spark-streaming-flume-sink_2.11-2.0.0.jar

      Then from the other machine I try to consume Spark Sink:

      spark-2.0.0-bin-hadoop2.7$ bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.0.0 flume_wordcount.py 52.210.48.242 5005

      with flume_wordcount.py variation (utilizing Polling Stream):

      from __future__ import print_function
      
      import sys
      
      from pyspark import SparkContext
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.flume import FlumeUtils
      
      if __name__ == "__main__":
          if len(sys.argv) != 3:
              print("Usage: flume_wordcount.py <hostname> <port> <batch_size>", file=sys.stderr)
              exit(-1)
      
          hostname, port= sys.argv[1:]
      
          sc = SparkContext(appName="PythonStreamingFlumeWordCount")
          ssc = StreamingContext(sc, 10)
      
          kvs = FlumeUtils.createPollingStream(ssc, [[hostname, int(port)]], maxBatchSize=10, parallelism=1)
          lines = kvs.map(lambda x: x[1])
          counts = lines.flatMap(lambda line: line.split(" ")) \
              .map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a+b)
          counts.pprint()
      
          ssc.start()
          ssc.awaitTermination()
      

      Whatever I do, I always get this on Flume:

      2016-08-19 14:04:48,679 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7d257a9f, /78.10.176.10:57551 => /10.28.230.185:5005] BOUND: /10.28.230.185:5005
      2016-08-19 14:04:48,679 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7d257a9f, /78.10.176.10:57551 => /10.28.230.185:5005] CONNECTED: /78.10.176.10:57551
      2016-08-19 14:04:49,401 (New I/O  worker #1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Received an error batch - no events were received from channel! 
      
      2016-08-19 14:04:50,154 (Spark Sink Processor Thread - 1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:80)] Error while processing transaction.
      java.lang.IllegalStateException: begin() called when transaction is OPEN!
              at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
              at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
              at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.begin(SpillableMemoryChannel.java:305)
              at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
              at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
              at scala.Option.foreach(Option.scala:236)
              at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
              at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
              at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      2016-08-19 14:04:50,156 (Spark Sink Processor Thread - 1) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Spark was unable to successfully process the events. Transaction is being rolled back.
      2016-08-19 14:04:50,156 (Spark Sink Processor Thread - 1) [ERROR - org.apache.spark.streaming.flume.sink.Logging$class.logError(Logging.scala:84)] Error rolling back transaction. Rollback may have failed!
      

      I'm using latest tgz downloaded from spark and flume. Additional packages for Flume are downloaded from maven central as jars.

      Can anyone help with this?

      Attachments

        Activity

          People

            Unassigned Unassigned
            wojciech.sznapka@cherrygroup.com Wojciech Sznapka
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: