Spark
  1. Spark
  2. SPARK-1478

Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Streaming
    • Labels:
      None
    • Target Version/s:

      Description

      Flume-1915 added support for compression over the wire from avro sink to avro source. I would like to add this functionality to the FlumeReceiver.

        Issue Links

          Activity

          Hide
          Tathagata Das added a comment -

          After much hoops, this was the PR that merged this feature.

          https://github.com/apache/spark/pull/1347

          Show
          Tathagata Das added a comment - After much hoops, this was the PR that merged this feature. https://github.com/apache/spark/pull/1347
          Hide
          Ted Malaska added a comment -

          OK I have made the changes requested. But I had to do it in a different pull request. Here is the new pull request link

          https://github.com/apache/spark/pull/1168

          Show
          Ted Malaska added a comment - OK I have made the changes requested. But I had to do it in a different pull request. Here is the new pull request link https://github.com/apache/spark/pull/1168
          Hide
          Ted Malaska added a comment -

          OK I submitted the pull request: https://github.com/apache/spark/pull/566

          Also I was able to recreate the race connection every 4-5 times I ran the tests. It felt like it would happen when my compute was busy doing other things.

          I reviewed the Flume test code and didn't see anything majorly different then the Spark version. I did add an additional 1000 wait and I was unable to get the exception after that. I know that that isn't a solution but at least it stopped the problem for now.

          Show
          Ted Malaska added a comment - OK I submitted the pull request: https://github.com/apache/spark/pull/566 Also I was able to recreate the race connection every 4-5 times I ran the tests. It felt like it would happen when my compute was busy doing other things. I reviewed the Flume test code and didn't see anything majorly different then the Spark version. I did add an additional 1000 wait and I was unable to get the exception after that. I know that that isn't a solution but at least it stopped the problem for now.
          Hide
          Tathagata Das added a comment -

          Haha, yeah, lazy vals are super useful in difficult situations but can lead to difficult situations themselves if not careful.

          I am not sure what the flakiness is coming from, but that really needs to be solved. Flakiness can really be a major headache in our automated tests in Jenkins, etc. Suffering from flakiness myself in two PRs.

          Let me know how I can help in this.

          Show
          Tathagata Das added a comment - Haha, yeah, lazy vals are super useful in difficult situations but can lead to difficult situations themselves if not careful. I am not sure what the flakiness is coming from, but that really needs to be solved. Flakiness can really be a major headache in our automated tests in Jenkins, etc. Suffering from flakiness myself in two PRs. Let me know how I can help in this.
          Hide
          Ted Malaska added a comment -

          No worries that error was caused by me. Still learning Scala. It was the difference between using a lazy val and a var.

          I have all three test cases working now and I will do one last review before submitting it tomorrow.

          Now there is also one more odd thing going on that I haven't figured out yet. Sometime (seeming randomly) my tests will fail with the following exception:

          [info] - flume input stream *** FAILED *** (10 seconds, 332 milliseconds)
          [info] 0 did not equal 1 (FlumeStreamSuite.scala:104)
          [info] org.scalatest.exceptions.TestFailedException:

          Then I will rerun the test with no code changes and they will success. It feels very much like a race condition. Note I found this so odd that I did a fresh git clone and tested the latest branch and I also was able to get the exception.

          I will look into this tomorrow. I would assume at this point that something is odd in my environment until I find evidence of it being anything else.

          Thank you again for the help.

          Show
          Ted Malaska added a comment - No worries that error was caused by me. Still learning Scala. It was the difference between using a lazy val and a var. I have all three test cases working now and I will do one last review before submitting it tomorrow. Now there is also one more odd thing going on that I haven't figured out yet. Sometime (seeming randomly) my tests will fail with the following exception: [info] - flume input stream *** FAILED *** (10 seconds, 332 milliseconds) [info] 0 did not equal 1 (FlumeStreamSuite.scala:104) [info] org.scalatest.exceptions.TestFailedException: Then I will rerun the test with no code changes and they will success. It feels very much like a race condition. Note I found this so odd that I did a fresh git clone and tested the latest branch and I also was able to get the exception. I will look into this tomorrow. I would assume at this point that something is odd in my environment until I find evidence of it being anything else. Thank you again for the help.
          Hide
          Tathagata Das added a comment -

          Thats definitely new code. Came in PR #300. What was the situation that led to that exception? I find it alarming, that should never happen normally!

          Show
          Tathagata Das added a comment - Thats definitely new code. Came in PR #300. What was the situation that led to that exception? I find it alarming, that should never happen normally!
          Hide
          Ted Malaska added a comment -

          never mind it is working now.

          Show
          Ted Malaska added a comment - never mind it is working now.
          Hide
          Ted Malaska added a comment -

          I re cloned the code and ran a test. There is a bug with the current Github branch. I'm looking into it now.

          Show
          Ted Malaska added a comment - I re cloned the code and ran a test. There is a bug with the current Github branch. I'm looking into it now.
          Hide
          Ted Malaska added a comment -

          Having some issues

          "Executor has not been attached to this receiver" error being thrown. Looks like new code added to Spark. Going to see how to populate this now.

          Show
          Ted Malaska added a comment - Having some issues "Executor has not been attached to this receiver" error being thrown. Looks like new code added to Spark. Going to see how to populate this now.
          Hide
          Ted Malaska added a comment -

          Spark-1584 is done and so is PR #300. So final we are ready for this Jira. I will start development today.

          Show
          Ted Malaska added a comment - Spark-1584 is done and so is PR #300. So final we are ready for this Jira. I will start development today.
          Hide
          Ted Malaska added a comment -

          Chated with tdas. He reviewed the code, but he has a patch coming with over lapping changes.

          I will wait until he submits that patch then I will resync my changes and resubmit.

          Thanks again.

          Show
          Ted Malaska added a comment - Chated with tdas. He reviewed the code, but he has a patch coming with over lapping changes. I will wait until he submits that patch then I will resync my changes and resubmit. Thanks again.
          Hide
          Ted Malaska added a comment -

          I sent the initial patch. Here is the link.

          https://github.com/apache/spark/pull/405/files

          Let me know what I need to do next. This is my first time.

          Show
          Ted Malaska added a comment - I sent the initial patch. Here is the link. https://github.com/apache/spark/pull/405/files Let me know what I need to do next. This is my first time.
          Hide
          Ted Malaska added a comment -

          After reviewing the code and I found that Spark Streaming is focused on Flume 1.2.0 and Avro-ipc 1.6.3.

          Cloudera, Hortenworks, and even MapR are using Flume 1.4.0 in their current releases and that version of Flume uses Avro-ipc 1.7.3 which allows for us to apply our own ChannelPipeLine, which allows us to use compression and encryption.

          I don't think I can complete do this Jira until I get permission to up the versions of Flume in the pom file from 1.2.0 to at least 1.3.0. Flume version 1.3 is more then 2 years old so that should be ok for Spark Users and Flume 1.3.0 uses Avro-ipc 1.7.3, which is what we need.

          As for now, I will continue as if I had the permission, and I will do a pull request with the change from Flume from 1.2.0 to 1.3.0.

          Thanks

          Show
          Ted Malaska added a comment - After reviewing the code and I found that Spark Streaming is focused on Flume 1.2.0 and Avro-ipc 1.6.3. Cloudera, Hortenworks, and even MapR are using Flume 1.4.0 in their current releases and that version of Flume uses Avro-ipc 1.7.3 which allows for us to apply our own ChannelPipeLine, which allows us to use compression and encryption. I don't think I can complete do this Jira until I get permission to up the versions of Flume in the pom file from 1.2.0 to at least 1.3.0. Flume version 1.3 is more then 2 years old so that should be ok for Spark Users and Flume 1.3.0 uses Avro-ipc 1.7.3, which is what we need. As for now, I will continue as if I had the permission, and I will do a pull request with the change from Flume from 1.2.0 to 1.3.0. Thanks

            People

            • Assignee:
              Ted Malaska
              Reporter:
              Ted Malaska
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development