Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-17466

Shut repair task executor down without interruption to avoid compromising shared channel proxies

    XMLWordPrintableJSON

Details

    Description

      If a RepairJob gets past validation, it builds a list of SyncTask items and fires them off. If any one of those fails, we grab the relevant exception and throw it up from RepairJob to RepairSession.

      ERROR 2022-03-09T23:53:36,721 [Stream-Deserializer-/10.246.3.102:7000-d97958c4] org.apache.cassandra.streaming.StreamSession:1110 - [Stream #07c55da0-a047-11ec-8122-ab911c7a993f] Remote peer /10.246.3.102:7000 failed stream session.
      

      RepairSession then marks itself as being terminated and clears its internal maps of active validations and sync tasks, but immediately before it does that, it calls shutdownNow() on the executor that executes those tasks. In the case of our failing stream session, we may still have other running stream tasks whose threads' interrupt flag has been set, and this can have some unintended negative consequences, because any ChannelProxy interrupted in the middle of a blocking operation will both be closed and throw a ClosedByInterruptException. (Keep in mind that we share ChannelProxy instances outside a few specific cases, like those introduced in CASSANDRA-15666.)

      We've seen this manifest in production in a couple ways, both of them while trying to read from the peers_v2 system table:

      Exception in thread Thread[RepairJobTask:23,5,main]"^M
      exception="FSReadError in .../data/system/peers_v2-c4325fbb8e5e3bafbd070f9250ed818e/system-peers_v2-nb-99-big-Data.db
              at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:143)
              at org.apache.cassandra.io.util.CompressedChunkReader$Standard.readChunk(CompressedChunkReader.java:115)
              at org.apache.cassandra.io.util.BufferManagingRebufferer.rebuffer(BufferManagingRebufferer.java:79)
              at org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:68)
              at org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:210)
              at org.apache.cassandra.io.util.FileHandle.createReader(FileHandle.java:151)
              at org.apache.cassandra.io.sstable.format.SSTableReader.getFileDataInput(SSTableReader.java:1628)
              at org.apache.cassandra.db.columniterator.AbstractSSTableIterator.<init>(AbstractSSTableIterator.java:96)
              at org.apache.cassandra.db.columniterator.SSTableIterator.<init>(SSTableIterator.java:48)
              at org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:75)
              at org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:67)
              at org.apache.cassandra.db.StorageHook$1.makeRowIterator(StorageHook.java:87)
              at org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndSSTablesInTimestampOrder(SinglePartitionReadCommand.java:897)
              at org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDiskInternal(SinglePartitionReadCommand.java:605)
              at org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDisk(SinglePartitionReadCommand.java:578)
              at org.apache.cassandra.db.SinglePartitionReadCommand.queryStorage(SinglePartitionReadCommand.java:412)
              at org.apache.cassandra.db.ReadCommand.executeLocally(ReadCommand.java:414)
              at org.apache.cassandra.db.SinglePartitionReadQuery$Group.executeLocally(SinglePartitionReadQuery.java:242)
              at org.apache.cassandra.db.SinglePartitionReadQuery$Group.executeInternal(SinglePartitionReadQuery.java:216)
              at org.apache.cassandra.cql3.statements.SelectStatement.executeInternal(SelectStatement.java:458)
              at org.apache.cassandra.cql3.statements.SelectStatement.executeLocally(SelectStatement.java:442)
              at org.apache.cassandra.cql3.statements.SelectStatement.executeLocally(SelectStatement.java:96)
              at org.apache.cassandra.cql3.QueryProcessor.executeInternal(QueryProcessor.java:334)
              at org.apache.cassandra.db.SystemKeyspace.getPreferredIP(SystemKeyspace.java:973)
              at org.apache.cassandra.net.OutboundConnectionSettings.connectTo(OutboundConnectionSettings.java:455)
              at org.apache.cassandra.net.OutboundConnectionSettings.withDefaults(OutboundConnectionSettings.java:484)
              at org.apache.cassandra.streaming.DefaultConnectionFactory.createConnection(DefaultConnectionFactory.java:49)
              at org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createChannel(NettyStreamingMessageSender.java:199)
              at org.apache.cassandra.streaming.async.NettyStreamingMessageSender.setupControlMessageChannel(NettyStreamingMessageSender.java:180)
              at org.apache.cassandra.streaming.async.NettyStreamingMessageSender.sendMessage(NettyStreamingMessageSender.java:245)
              at org.apache.cassandra.streaming.async.NettyStreamingMessageSender.initialize(NettyStreamingMessageSender.java:149)
              at org.apache.cassandra.streaming.StreamSession.start(StreamSession.java:372)
              at org.apache.cassandra.streaming.StreamCoordinator.startSession(StreamCoordinator.java:262)
              at org.apache.cassandra.streaming.StreamCoordinator.access$700(StreamCoordinator.java:36)
              at org.apache.cassandra.streaming.StreamCoordinator$HostStreamingData.connectAllStreamSessions(StreamCoordinator.java:308)
              at org.apache.cassandra.streaming.StreamCoordinator.connectAllStreamSessions(StreamCoordinator.java:107)
              at org.apache.cassandra.streaming.StreamCoordinator.connect(StreamCoordinator.java:101)
              at org.apache.cassandra.streaming.StreamResultFuture.createInitiator(StreamResultFuture.java:98)
              at org.apache.cassandra.streaming.StreamPlan.execute(StreamPlan.java:179)
              at org.apache.cassandra.repair.LocalSyncTask.startSync(LocalSyncTask.java:113)
              at org.apache.cassandra.repair.SyncTask.run(SyncTask.java:89)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
              at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
              at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
              at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: java.nio.channels.ClosedByInterruptException
              at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:199)
              at java.base/sun.nio.ch.FileChannelImpl.endBlocking(FileChannelImpl.java:162)
              at java.base/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:816)
              at java.base/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796)
              at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:139)
              ... 48 more
      

      In this case, the SyncTask is trying to read the preferred remote IP for the node it wants to communicate with, but when it reads the peers_v2 table, it finds that it's already been closed by an interrupt, and StorageProxy#read() wraps the ClosedByInterruptException in a FSReadError, which triggers the disk failure policy, and kills the node.

      message="Exception in thread Thread[CompactionExecutor:1690,1,main]"^M
      exception="FSReadError in …/data/system/peers_v2-c4325fbb8e5e3bafbd070f9250ed818e/system-peers_v2-nb-99-big-Data.db
              at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:143)
              at org.apache.cassandra.io.util.CompressedChunkReader$Standard.readChunk(CompressedChunkReader.java:115)
              at org.apache.cassandra.io.util.BufferManagingRebufferer.rebuffer(BufferManagingRebufferer.java:79)
              at org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:68)
              at org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:210)
              at org.apache.cassandra.io.sstable.format.big.BigTableScanner.seekToCurrentRangeStart(BigTableScanner.java:196)
              at org.apache.cassandra.io.sstable.format.big.BigTableScanner.access$400(BigTableScanner.java:52)
              at org.apache.cassandra.io.sstable.format.big.BigTableScanner$KeyScanningIterator.computeNext(BigTableScanner.java:305)
              at org.apache.cassandra.io.sstable.format.big.BigTableScanner$KeyScanningIterator.computeNext(BigTableScanner.java:282)
              at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:46)
              at org.apache.cassandra.io.sstable.format.big.BigTableScanner.hasNext(BigTableScanner.java:261)
              at org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:376)
              at org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:188)
              at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:157)
              at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:46)
              at org.apache.cassandra.db.partitions.UnfilteredPartitionIterators$2.hasNext(UnfilteredPartitionIterators.java:169)
              at org.apache.cassandra.db.transform.BasePartitions.hasNext(BasePartitions.java:93)
              at org.apache.cassandra.db.compaction.CompactionIterator.hasNext(CompactionIterator.java:254)
              at org.apache.cassandra.db.compaction.CompactionTask.runMayThrow(CompactionTask.java:202)
              at org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:28)
              at org.apache.cassandra.db.compaction.CompactionTask.executeInternal(CompactionTask.java:78)
              at org.apache.cassandra.db.compaction.AbstractCompactionTask.execute(AbstractCompactionTask.java:100)
              at org.apache.cassandra.db.compaction.CompactionManager$BackgroundCompactionCandidate.run(CompactionManager.java:363)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
              at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: java.nio.channels.ClosedChannelException
              at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
              at java.base/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:790)
              at org.apache.cassandra.io.util.ChannelProxy.read(ChannelProxy.java:139)
              ... 28 more
      

      In this case, we've been lucky enough to terminate the repair itself without any issues, but when we try to read from the ChannelProxy from another thread, we get a ClosedChannelException, which is also wrapped in a FSReadError, which triggers the disk failure policy and kills the node. A lot of violence here. Note that, in this case, while the channel is closed, we don't see a ClosedByInterruptException, because the repair job task thread, not the compaction thread, is interrupted.

      (Note: This reproduces easiest w/ disk_access_mode: mmap_index_only.)

      Attachments

        Issue Links

          Activity

            People

              maedhroz Caleb Rackliffe
              maedhroz Caleb Rackliffe
              Caleb Rackliffe
              David Capwell
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h
                  2h