Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3275

Socket receiver can not recover when the socket server restarted

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.0.2
    • Fix Version/s: 1.1.0
    • Component/s: DStreams
    • Labels:

      Description

      To reproduce this issue:
      1. create a application with a socket dstream
      2. start the socket server and start the application
      3. restart the socket server
      4. the socket dstream will fail to reconnect (it will close the connection after a successful connect)

      The main issue should be the status in SocketReceiver and ReceiverSupervisor is incorrect after the reconnect:
      In SocketReceiver ::receive() the while loop will never be entered after reconnect since the isStopped will returns true:
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext)

      { store(iterator.next) }

      logInfo("Stopped receiving")
      restart("Retrying connecting to " + host + ":" + port)

      That is caused by the status flag "receiverState" in ReceiverSupervisor will be set to Stopped when the connection losses, but it is reset after the call of Receiver start method:

      def startReceiver(): Unit = synchronized {
      try

      { logInfo("Starting receiver") receiver.onStart() logInfo("Called receiver onStart") onReceiverStart() receiverState = Started }

      catch

      { case t: Throwable => stop("Error starting receiver " + streamId, Some(t)) }

      }

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              jhu Jack Hu
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: