Qpid
  1. Qpid
  2. QPID-3562

the 0-10 client path does not act as expected with asynchronous consumers using a prefetch of 1 on transacted sessions

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.12
    • Fix Version/s: 0.13
    • Component/s: Java Client
    • Labels:
      None

      Description

      the 0-10 client path does not act as expected with asynchronous consumers using a prefetch of 1 on transacted sessions, as the client is able to hold a 2nd message during processing of a slow onMessage handler. This is because completions are sent to ensure credit does not dry up, allowing a large-than-prefetch number of messages to be consume in a given transaction. However, this is done prior to delivery to the client application, which causes the client to get sent a 2nd message by the broker.

      This should isntead occur after delivery has completed (if it is necessary: the client may have committed, which sends accepting completions anyway) to give the expected behaviour.

        Activity

        Hide
        Rajith Attapattu added a comment -

        Thanks for the feedback. I was trying to get some feel for the test cases. I will fix the discrepancy in the java doc.

        Sorry should have mentioned that this was not really to cover this specific JIRA but rather as a starting point for a general test case for prefetch=0 and prefetch=1. In hindsight I should have opened a separate JIRA for prefetch=1 and prefetch=0 and put it there (was a bit lazy and used this).

        Also as correctly pointed out by you we need tests for synchronous and asynchronous consumers and also test the various ACK modes including transactions, as the behaviour does change for those different combinations. I am hoping to get a base test case and then see if I could use that with the different combinations. Ex client with sync and client-ack with async etc..

        And as you said, perhaps it's better to have separate tests for prefetch=0 and prefetch=1. I started with prefetch=0, but realized that I needed prefetch=1 to get it going

        Thanks again for your feedback/comments, as always highly appreciated.

        Show
        Rajith Attapattu added a comment - Thanks for the feedback. I was trying to get some feel for the test cases. I will fix the discrepancy in the java doc. Sorry should have mentioned that this was not really to cover this specific JIRA but rather as a starting point for a general test case for prefetch=0 and prefetch=1. In hindsight I should have opened a separate JIRA for prefetch=1 and prefetch=0 and put it there (was a bit lazy and used this). Also as correctly pointed out by you we need tests for synchronous and asynchronous consumers and also test the various ACK modes including transactions, as the behaviour does change for those different combinations. I am hoping to get a base test case and then see if I could use that with the different combinations. Ex client with sync and client-ack with async etc.. And as you said, perhaps it's better to have separate tests for prefetch=0 and prefetch=1. I started with prefetch=0, but realized that I needed prefetch=1 to get it going Thanks again for your feedback/comments, as always highly appreciated.
        Hide
        Robbie Gemmell added a comment -

        I would probably have chosen to test prefetch=0 and preftech=1 in isolation rather than in the same test, but it seems reasonable enough (I would consume 'Msg3' for completeness though). The Javadoc mentions 3 more messages but the test sends 5 more.

        I dont think it covers this JIRA at all though, given the use of synchronous consumers and the different handling of prefetch from onMessage that goes along with that being what prompted the JIRA to be raised.

        Show
        Robbie Gemmell added a comment - I would probably have chosen to test prefetch=0 and preftech=1 in isolation rather than in the same test, but it seems reasonable enough (I would consume 'Msg3' for completeness though). The Javadoc mentions 3 more messages but the test sends 5 more. I dont think it covers this JIRA at all though, given the use of synchronous consumers and the different handling of prefetch from onMessage that goes along with that being what prompted the JIRA to be raised.
        Hide
        Rajith Attapattu added a comment -

        I have the following test case for prefetch=0 & prefetch=1
        I don't believe it covers all cases. I'd appreciate some feedback on the test.

            /**
             * Test Goal : For prefetch 0, verify that the client does not get any messages unless receive() is called.
             *             For prefetch 1, verify that the client gets only 1 message and not more than that. 
             *             
             * Test Strategy: Create two consumers on the same queue, one with prefetch disabled (zero) and one with prefetch=1.
             *                Note : Created the one with prefetch disabled first.
             *                Send a single message to the queue and verify that the message only goes to the one with prefetch=1.
             *                
             *                Next, send 3 messages
             */
            public void testPrefetchZeroAndOneWithSyncReceive() throws Exception
            {
                setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "0");
                Connection con = getConnection();
                con.start();
                Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination destWithZeroCapacity = ssn.createQueue("ADDR:my-queue;{create: always}");
                
                MessageProducer prod = ssn.createProducer(destWithZeroCapacity);
                prod.send(ssn.createTextMessage("Msg"));
                
                MessageConsumer consumerWithZeroPrefetch = ssn.createConsumer(destWithZeroCapacity);
                
                Destination destWithPrefetchEnabled = ssn.createQueue("ADDR:my-queue;{create: always, link:{capacity:1}}");
                MessageConsumer consumerWithPretch = ssn.createConsumer(destWithPrefetchEnabled);
                TextMessage msg = (TextMessage)consumerWithPretch.receive(1000);
                assertNotNull("The message should only go to the consumer with prefetch enabled",msg);
                        
                for (int i=0; i<5;i++) 
                {
                   prod.send(ssn.createTextMessage("Msg" + i));
                }
                
                msg = (TextMessage)consumerWithZeroPrefetch.receive(1000);
                assertNotNull("When receive is called a message should be fetched from the broker",msg);
                assertEquals("Consumer with zero prefetch should receive Msg1, " +
                		"as Msg 0 is given to the consumer with prefetch=1",msg.getText(),"Msg1");
                
                // Verify that the other 2 messages have not been given to the consumer with zero prefetch.
                for (int i=0; i<2; i++)
                {
                    int j = i + i*1; // done to get Msg0 & Msg2, skipping Msg1.
                    msg = (TextMessage)consumerWithPretch.receive(1000);
                    assertNotNull("The consumer with prefetch enabled should receive the message",msg);
                    assertEquals("The consumer with prefetch enabled should receive Msg" + j,msg.getText(),"Msg"+j);
                }
                
                // Verify that the prefetch=1 consumer has not accumulated more messages than it should due to errors.
                // Note Msg3 should be in "consumer with prefetch=1" (in it's prefetch buffer), but Msg4 should not be.
                msg = (TextMessage)consumerWithZeroPrefetch.receive(1000);
                assertNotNull("When receive is called a message should be fetched from the broker",msg);
                assertEquals("Consumer with zero prefetch should receive Msg4 ",msg.getText(),"Msg4");
            }
        
        Show
        Rajith Attapattu added a comment - I have the following test case for prefetch=0 & prefetch=1 I don't believe it covers all cases. I'd appreciate some feedback on the test. /** * Test Goal : For prefetch 0, verify that the client does not get any messages unless receive() is called. * For prefetch 1, verify that the client gets only 1 message and not more than that. * * Test Strategy: Create two consumers on the same queue, one with prefetch disabled (zero) and one with prefetch=1. * Note : Created the one with prefetch disabled first. * Send a single message to the queue and verify that the message only goes to the one with prefetch=1. * * Next, send 3 messages */ public void testPrefetchZeroAndOneWithSyncReceive() throws Exception { setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "0" ); Connection con = getConnection(); con.start(); Session ssn = con.createSession( false , Session.AUTO_ACKNOWLEDGE); Destination destWithZeroCapacity = ssn.createQueue( "ADDR:my-queue;{create: always}" ); MessageProducer prod = ssn.createProducer(destWithZeroCapacity); prod.send(ssn.createTextMessage( "Msg" )); MessageConsumer consumerWithZeroPrefetch = ssn.createConsumer(destWithZeroCapacity); Destination destWithPrefetchEnabled = ssn.createQueue( "ADDR:my-queue;{create: always, link:{capacity:1}}" ); MessageConsumer consumerWithPretch = ssn.createConsumer(destWithPrefetchEnabled); TextMessage msg = (TextMessage)consumerWithPretch.receive(1000); assertNotNull( "The message should only go to the consumer with prefetch enabled" ,msg); for ( int i=0; i<5;i++) { prod.send(ssn.createTextMessage( "Msg" + i)); } msg = (TextMessage)consumerWithZeroPrefetch.receive(1000); assertNotNull( "When receive is called a message should be fetched from the broker" ,msg); assertEquals( "Consumer with zero prefetch should receive Msg1, " + "as Msg 0 is given to the consumer with prefetch=1" ,msg.getText(), "Msg1" ); // Verify that the other 2 messages have not been given to the consumer with zero prefetch. for ( int i=0; i<2; i++) { int j = i + i*1; // done to get Msg0 & Msg2, skipping Msg1. msg = (TextMessage)consumerWithPretch.receive(1000); assertNotNull( "The consumer with prefetch enabled should receive the message" ,msg); assertEquals( "The consumer with prefetch enabled should receive Msg" + j,msg.getText(), "Msg" +j); } // Verify that the prefetch=1 consumer has not accumulated more messages than it should due to errors. // Note Msg3 should be in "consumer with prefetch=1" (in it's prefetch buffer), but Msg4 should not be. msg = (TextMessage)consumerWithZeroPrefetch.receive(1000); assertNotNull( "When receive is called a message should be fetched from the broker" ,msg); assertEquals( "Consumer with zero prefetch should receive Msg4 " ,msg.getText(), "Msg4" ); }
        Hide
        Robbie Gemmell added a comment -

        Completion sending moved to postDeliver, and in addition to previous restrictions only occurs if the transaction wasnt committed (in the asynchronous delivery case).

        Show
        Robbie Gemmell added a comment - Completion sending moved to postDeliver, and in addition to previous restrictions only occurs if the transaction wasnt committed (in the asynchronous delivery case).

          People

          • Assignee:
            Robbie Gemmell
            Reporter:
            Robbie Gemmell
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development