Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2685

TaskManager deadlock on NetworkBufferPool

    XMLWordPrintableJSON

Details

    Description

      This deadlock occurs intermittently. I have a join followed by a chain<join,filter> followed by a reduceGroup. Stack traces and local variables from one each of the join threads below.

      The join's are waiting on a buffer to become available (networkBufferPool.availableMemorySegments.count=0). Both LocalBufferPool's have been given extra capacity (currentPoolSize=60 > numberOfRequiredMemorySegments=32). The first join is at full capacity (currentPoolSize=numberOfRequestedMemorySegments=60) yet the second join has not acquired any (numberOfRequestedMemorySegments=0).

      LocalBufferPool.returnExcessMemorySegments only recycles MemorySegment's from its availableMemorySegments, so any requested Buffer's will only be released when explicitly recycled.

      First join stack trace and variable values from LocalBufferPool.requestBuffer:

      owns: SpanningRecordSerializer<T>  (id=723)	
      waiting for: ArrayDeque<E>  (id=724)	
      Object.wait(long) line: not available [native method]	
      LocalBufferPool.requestBuffer(boolean) line: 163	
      LocalBufferPool.requestBufferBlocking() line: 133	
      RecordWriter<T>.emit(T) line: 92	
      OutputCollector<T>.collect(T) line: 65	
      JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088	
      ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137	
      JoinDriver<IT1,IT2,OT>.run() line: 208	
      RegularPactTask<S,OT>.run() line: 489	
      RegularPactTask<S,OT>.invoke() line: 354	
      Task.run() line: 581	
      Thread.run() line: 745	
      
      this	LocalBufferPool  (id=403)	
      	availableMemorySegments	ArrayDeque<E>  (id=398)	
      		elements	Object[16]  (id=422)	
      		head	14	
      		tail	14	
      	currentPoolSize	60	
      	isDestroyed	false	
      	networkBufferPool	NetworkBufferPool  (id=354)	
      		allBufferPools	HashSet<E>  (id=424)	
      		availableMemorySegments	ArrayBlockingQueue<E>  (id=427)	
      			count	0	
      			items	Object[10240]  (id=674)	
      			itrs	null	
      			lock	ReentrantLock  (id=675)	
      			notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=678)	
      			notFull	AbstractQueuedSynchronizer$ConditionObject  (id=679)	
      			putIndex	6954	
      			takeIndex	6954	
      		factoryLock	Object  (id=430)	
      		isDestroyed	false	
      		managedBufferPools	HashSet<E>  (id=431)	
      		memorySegmentSize	32768	
      		numTotalRequiredBuffers	3226	
      		totalNumberOfMemorySegments	10240	
      	numberOfRequestedMemorySegments	60	
      	numberOfRequiredMemorySegments	32	
      	owner	null	
      	registeredListeners	ArrayDeque<E>  (id=421)	
      		elements	Object[16]  (id=685)	
      		head	0	
      		tail	0	
      askToRecycle	false	
      isBlocking	true	
      

      Second join stack trace and variable values from SingleInputGate.getNextBufferOrEvent:

      Unsafe.park(boolean, long) line: not available [native method]	
      LockSupport.parkNanos(Object, long) line: 215	
      AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078	
      LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467	
      SingleInputGate.getNextBufferOrEvent() line: 414	
      MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79	
      MutableRecordReader<T>.next(T) line: 34	
      ReaderIterator<T>.next(T) line: 59	
      MutableHashTable$ProbeIterator<PT>.next() line: 1581	
      MutableHashTable<BT,PT>.processProbeIter() line: 457	
      MutableHashTable<BT,PT>.nextRecord() line: 555	
      ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110	
      JoinDriver<IT1,IT2,OT>.run() line: 208	
      RegularPactTask<S,OT>.run() line: 489	
      RegularPactTask<S,OT>.invoke() line: 354	
      Task.run() line: 581	
      Thread.run() line: 745	
      
      this	SingleInputGate  (id=693)	
      	bufferPool	LocalBufferPool  (id=706)	
      		availableMemorySegments	ArrayDeque<E>  (id=716)	
      			elements	Object[16]  (id=717)	
      			head	0	
      			tail	0	
      		currentPoolSize	60	
      		isDestroyed	false	
      		networkBufferPool	NetworkBufferPool  (id=354)	
      			allBufferPools	HashSet<E>  (id=424)	
      			availableMemorySegments	ArrayBlockingQueue<E>  (id=427)	
      				count	0	
      				items	Object[10240]  (id=674)	
      				itrs	null	
      				lock	ReentrantLock  (id=675)	
      				notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=678)	
      				notFull	AbstractQueuedSynchronizer$ConditionObject  (id=679)	
      				putIndex	6954	
      				takeIndex	6954	
      			factoryLock	Object  (id=430)	
      			isDestroyed	false	
      			managedBufferPools	HashSet<E>  (id=431)	
      			memorySegmentSize	32768	
      			numTotalRequiredBuffers	3226	
      			totalNumberOfMemorySegments	10240	
      		numberOfRequestedMemorySegments	0	
      		numberOfRequiredMemorySegments	32	
      		owner	null	
      		registeredListeners	ArrayDeque<E>  (id=718)	
      	channelsWithEndOfPartitionEvents	BitSet  (id=707)	
      	consumedResultId	IntermediateDataSetID  (id=708)	
      	consumedSubpartitionIndex	24	
      	executionId	ExecutionAttemptID  (id=709)	
      	hasReceivedAllEndOfPartitionEvents	false	
      	inputChannels	HashMap<K,V>  (id=710)	
      	inputChannelsWithData	LinkedBlockingQueue<E>  (id=692)	
      		capacity	2147483647	
      		count	AtomicInteger  (id=698)	
      			value	0	
      		head	LinkedBlockingQueue$Node<E>  (id=701)	
      		last	LinkedBlockingQueue$Node<E>  (id=701)	
      		notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=691)	
      		notFull	AbstractQueuedSynchronizer$ConditionObject  (id=703)	
      		putLock	ReentrantLock  (id=704)	
      		takeLock	ReentrantLock  (id=705)	
      	isReleased	false	
      	jobId	JobID  (id=711)	
      	numberOfInputChannels	32	
      	numberOfUninitializedChannels	0	
      	owningTaskName	"Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)	
      	partitionStateChecker	NetworkEnvironment$JobManagerPartitionStateChecker  (id=363)	
      	pendingEvents	ArrayList<E>  (id=713)	
      	registeredListeners	CopyOnWriteArrayList<E>  (id=714)	
      	requestedPartitionsFlag	true	
      	requestLock	Object  (id=715)	
      	retriggerLocalRequestTimer	null	
      currentChannel	null	
      

      Attachments

        1. job_manager_19_feb_15_30_running
          40 kB
          Amit Jain
        2. task_manager_19_feb_15_30_running
          45 kB
          Amit Jain

        Activity

          People

            uce Ufuk Celebi
            greghogan Greg Hogan
            Votes:
            1 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: