Pseudo code of memory blowup problem in Apache Ignite CacheContinuousQueryHandler ================================================================================= CacheContinuousQueryHandler.CacheContinuousQueryListener.onEntryUpdated() { if (The update is on primary) { o CacheContinuousQueryHandler.onEntryUpdate() { if (Listener is on this node) { // Case #1: Update on a primary node and sending to a local listener o CacheContinuousQueryHandler.handleEvent() { o Collect update eventEntries to notify by calling CacheContinuousQueryPartitionRecovery.collectEntries() { if (entry.updateCounter() > lastFiredEvt) { // This event is not fired yet o Put the entry update event in pendingEvts; } if (pendingEvts.size() >= MAX_BUFF_SIZE, default 10000) { // Too many pending events o Return and flush 10% of update events in pendingEvts. In this case, Fundy needs to tolerate some dropped events; } else { for (each event e in pendingEvts) { if (e == lastFiredEvt+1) { // e is right next to lastFiredEvt o Move e from pendingEvts to eventEntries, and increment lastFiredEvt; } else { o Return eventEntries; } } } } } } else { Case #2: Update on a primary node and sending to a remote listener o CacheContinuousQueryHandler.handleEntry() { o Collect update eventEntries to notify by calling CacheContinuousQueryEventBuffer.processEntry() { o Process update events one batch at a time CacheContinuousQueryEventBuffer.Batch.entries[BUF_SIZE], default BUF_SIZE is 1000, and the batch is bound by batch.startCntr and batch.endCntr; if (the current event entry e.updateCounter() <= batch.endCntr) { // Inside the current batch o Call CacheContinuousQueryEventBuffer.Batch.processEntry0 to process this entry) { o Put e into the batch; If (e's pos == batch.lastProc + 1) { // e is next to the last processed event batch.lastProc for (i = pos; i < Batch.entries.length; i++) { if (entries[i] != null) { // This event has arrived o Add e into eventEntries, and set pos = i; } else { // This event has NOT arrived yet o break; } } o batch.lastProc = pos; if (pos == entries.length - 1) { // It is end of batch.entries[], and has been processed o Start a new batch; } } else { // If we never see (next == pos) and (pos == entries.length - 1), we may never create a new batch, // which may blow up ConcurrentSkipListMap CacheContinuousQueryEventBuffer.pending // because new entries are all added there. We should add some exception handling to prevent this, // e.g. adding a limit in CacheContinuousQueryEventBuffer.pending like MAX_BUFF_SIZE above // to flush 10% of update events there. o Return eventEntries; } } } else { // the current event entry e is outside the current batch o Put e into CacheContinuousQueryEventBuffer.pending; if (The current batch is a new batch started in CacheContinuousQueryEventBuffer.Batch.processEntry0) { o Call CacheContinuousQueryEventBuffer.processPending() to process the pending entries in CacheContinuousQueryEventBuffer.pending with the new batch; } } } } } } } else { Case #3: Update on a backup node o CacheContinuousQueryHandler.handleBackupEntry() { o Collect update eventEntries to notify by calling CacheContinuousQueryEventBuffer.processEntry(backup=true) { o Similar to case #2 above } } } }