Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.10.0, 1.10.1
-
None
-
None
-
linux
-
Important
Description
Here is my code:
@Override public Sink.Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); logger.debug("open transaction for: {}", channel.getName()); transaction.begin(); List<Event> eventList = new ArrayList<>(); try { while (eventList.size() < maxBatchSize) { Event event = channel.take(); if (event == null) { if (eventList.isEmpty()) { transaction.commit(); logger.debug("No data in channel!"); return Sink.Status.BACKOFF; } else { break; } } logger.trace(EventHelper.eventBodyToString(event)); eventList.add(event); } handleEvent(eventList); logger.debug("commit for: {}", channel.getName()); transaction.commit(); sinkCounter.addToEventDrainSuccessCount(eventList.size()); } catch (Exception ex) { logger.debug("rollback for: {}", channel.getName()); transaction.rollback(); sinkCounter.incrementChannelReadFail(); throw new EventDeliveryException("Failed to log event: " + ex.getMessage(), ex); } finally { logger.debug("close for: {}", channel.getName()); transaction.close(); } return Sink.Status.READY; }
I found that the program will goto the close without commit or rollback while the running time of handleEvent() is too long(or maybe orther reason). It seens that the run() function has been forced to stop by any other things. This problem cause the sink process fail because the transaction is not commit/rollback correctly.