Details

    • Epic Color:
      ghx-label-5

      Description

      I've found a possible cause for the row batch corruption to happen. Not 100% sure if this actually is the cause though. It's a pretty complicated race, which happens only under very far-fetched conditions, which may explain why we see it so rarely. It's a race in the DataStreamSender.

      A DataStreamSender::Channel has two OutboundProtoRowBatch objects (namely proto_batch[1,2]). We populate one before sending the first RPC, and while that RPC is being sent asynchronously (i.e. sitting in a send queue in KRPC), we can serialize and populate the second OutboundProtoRowBatch. We always wait for the first RPC to complete (i.e. have the RPC callback invoked) before sending the second RPC with the second OutboundProtoRowBatch.

      The timeline is like this:

      DataStreamSender::Channel thread Asynchronous KRPC thread
      RPC1 .
      Serialize proto_batch_[1] .
      Send proto_batch_[1] (asynchronously) .
      . OutboundTransfer enqueued with sidecar pointing to proto_batch_[1]
      RPC2: .
      Serialize proto_batch_[2] .
      WaitForClearChannel() .

      WaitForClearChannel() waits on a condition variable 'rpc_done_cv_', which is notified when RPC1 completes, i.e. it is notified in its callback.

      WaitForClearChannel():
      https://github.com/michaelhkw/incubator-impala/blob/krpc/be/src/runtime/data-stream-sender.cc#L280

      CV notification in callback:
      https://github.com/michaelhkw/incubator-impala/blob/krpc/be/src/runtime/data-stream-sender.cc#L236

      Also, there is always only one thread using a DataStreamSender, which means that if RPC1 is enqueued inside KRPC, the DSS::Channel thread is waiting in WaitForClearChannel().

      This means that in the general case, there can be no RPC3 that overwrites proto_batch_[1] before RPC1 is sent, which means that the sidecar is intact.

      However, the 'rpc_done_cv_' wait also wakes up periodically and checks for certain conditions like whether the query is cancelled, or if the DataStreamSender somehow had Close() called on it, so that it doesn't hang forever in the case that the receiver never responds to RPC1.
      https://github.com/michaelhkw/incubator-impala/blob/krpc/be/src/runtime/data-stream-sender.cc#L276

      This is where the bug shows up. If the query is cancelled, and the sender frag-instance's RuntimeState has a 'cancelled' state set (it can be set by another thread), WaitForClearChannel() will notice that during its periodic wake up, and eventually return without sending RPC2:
      https://github.com/michaelhkw/incubator-impala/blob/krpc/be/src/runtime/data-stream-sender.cc#L192-L193

      Now, if the partition type is 'HASH_PARTITIONED', we hash-partition the RowBatch's rows across channels and therefore add rows individually and distribute it to the available channels:
      https://github.com/michaelhkw/incubator-impala/blob/krpc/be/src/runtime/data-stream-sender.cc#L472

      and actually send an RPC when that channel's RowBatch hits a capacity we deem good enough for sending:
      https://github.com/michaelhkw/incubator-impala/blob/krpc/be/src/runtime/data-stream-sender.cc#L290-L292

      Note that we never check for cancellation during this whole time:
      https://github.com/michaelhkw/incubator-impala/blob/krpc/be/src/runtime/data-stream-sender.cc#L451-L473

      Also, RPC1 will never be notified that the query is being cancelled since we don't have cancellation for RPCs in KRPC yet, so RPC1 will continue to be in the queue for KRPC to send.

      So technically, RPC3 can reach capacity and send on the same channel that RPC1 was sent in before RPC1 is actually sent. So then RPC3 starts overwriting and serializing proto_batch_[1], and in parallel, KRPC sends RPC1. This means that the sidecar that RPC1 is pointing to (i.e. proto_batch_[1]) is corrupt, because RPC3 is overwriting it and RPC1 is sending it.

      The receiver side can end up processing the corrupt RPC1 payload even though the query was cancelled, because the receiver side can receive the cancellation notification later than the sender side (I verified this with the core file, since we have that for the receiver node).

      So the final timeline looks like this:

      DataStreamSender::Channel thread Asynchronous KRPC thread
      RPC1: .
      Serialize proto_batch_[1] .
      Send proto_batch_[1] (asynchronously) .
      . OutboundTransfer enqueued with sidecar pointing to proto_batch_[1]
      RPC2: .
      Serialize proto_batch_[2] .
      WaitForClearChannel() .
      QUERY CANCEL .
      Return without sending RPC2 .
      AddRow() reaches capacity .
      RPC3: .
      Serialize proto_batch_[1] (RACE) OutboundTransfer::SendBuffer() for RPC1 with sidecar proto_batch_[1] (RACE)

        Issue Links

          Activity

          There are no comments yet on this issue.

            People

            • Assignee:
              sailesh Sailesh Mukil
              Reporter:
              sailesh Sailesh Mukil
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development