Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3275

Socket receiver can not recover when the socket server restarted

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.0.2
    • 1.1.0
    • DStreams

    Description

      To reproduce this issue:
      1. create a application with a socket dstream
      2. start the socket server and start the application
      3. restart the socket server
      4. the socket dstream will fail to reconnect (it will close the connection after a successful connect)

      The main issue should be the status in SocketReceiver and ReceiverSupervisor is incorrect after the reconnect:
      In SocketReceiver ::receive() the while loop will never be entered after reconnect since the isStopped will returns true:
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext)

      { store(iterator.next) }

      logInfo("Stopped receiving")
      restart("Retrying connecting to " + host + ":" + port)

      That is caused by the status flag "receiverState" in ReceiverSupervisor will be set to Stopped when the connection losses, but it is reset after the call of Receiver start method:

      def startReceiver(): Unit = synchronized {
      try

      { logInfo("Starting receiver") receiver.onStart() logInfo("Called receiver onStart") onReceiverStart() receiverState = Started }

      catch

      { case t: Throwable => stop("Error starting receiver " + streamId, Some(t)) }

      }

      Attachments

        Activity

          People

            Unassigned Unassigned
            jhu Jack Hu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: