Kafka
  1. Kafka
  2. KAFKA-1196

java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

    Details

    • Type: Bug Bug
    • Status: Patch Available
    • Priority: Blocker Blocker
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.9.0
    • Component/s: consumer
    • Labels:
    • Environment:
      running java 1.7, linux and kafka compiled against scala 2.9.2

      Description

      I have 6 topics each with 8 partitions spread over 4 kafka servers.
      the servers are 24 core 72 gig ram.

      While consuming from the topics I get an IlegalArgumentException and all consumption stops, the error keeps on throwing.

      I've tracked it down to FectchResponse.scala line 33

      The error happens when the FetchResponsePartitionData object's readFrom method calls:
      messageSetBuffer.limit(messageSetSize)

      I put in some debug code the the messageSetSize is 671758648, while the buffer.capacity() gives 155733313, for some reason the buffer is smaller than the required message size.

      I don't know the consumer code enough to debug this. It doesn't matter if compression is used or not.

      1. KAFKA-1196.patch
        13 kB
        Ewen Cheslack-Postava

        Activity

        Hide
        Ewen Cheslack-Postava added a comment -

        This is a wip patch to fix this issue, which previous discussion suggests was due to the FetchResponse exceeding 2GB. My approach to triggering the issue, however, doesn't exhibit exactly the same issue but does cause an unrecoverable error that causes the consumer connection to terminate. (For reference, it causes the server to fail when FetchResponseSend.writeTo calls expectIncomplete and sendSize is negative due to overflow. This confuses the server since it looks like the message is already done sending and the server forcibly closes the consumer's connection.)

        The patch addresses the core issue by ensuring the returned message doesn't exceed 2GB by dropping parts of it in a way that otherwise shouldn't affect the consumer. But there are a lot of points that still need to be addressed:

        • I started by building an integration test to trigger the issue, included in PrimitiveApiTest. However, since we necessarily need to have > 2GB data to trigger the issue, it's probably too expensive to include in this way. Offline discussion suggests maybe a system test would be a better place to include this. It's still included here for completeness.
        • The implementation filters to a subset of the data in FetchResponse. The main reason for this is that this process needs to know the exact (or at least conservative estimate) size of serialized data, which only FetchResponse knows. But it's also a bit weird compared to other message classes, which are case classes and don't modify those inputs.
        • Algorithm for choosing subset to return: initial approach is to remove random elements until we get below the limit. This is simple to understand and avoids starvation of specific TopicAndPartitions. Any concerns with this basic approach?
        • I'm pretty sure I've managed to keep the < 2GB case to effectively the same computational cost (computing the serialized size, grouped data, etc. exactly once as before). However, for the > 2GB case I've only ensured correctness. In particular, the progressive removal and reevaluation of serialized size could potentially be very bad for very large data sets (e.g. starting a mirror maker against a large data set with large # of partitions from scratch).
        • Note that the algorithm never deals with the actual message data, only metadata about what messages are available. This is relevant since this is what suggested the approach in the patch could still be performant – ReplicaManager.readMessageSets processes the entire FetchRequest and filters it down because the metadata involved is relatively small.
        • Based on the previous two points, this really needs some more realistic large scale system tests to make sure this approach is not only correct, but provides reasonable performance (or indicates we need to revise the algorithm for selecting a subset of the data).
        • Testing isn't really complete – I triggered the issue with 4 topics * 600 MB/topic, which is > 2GB. Another obvious case to check is when some partitions contain > 2GB on their own.
        • I'd like someone to help sanity check the exact maximum FetchResponse serialized size we limit messages to. It's not Int.MaxValue because the FetchResponseSend class adds 4 + FetchResponse.sizeInBytes for it's own size. I'd like a sanity check that the extra 4 bytes is enough – is there any additional wrapping we might need to account for? Getting a test to hit exactly that narrow range could be tricky.
        • The tests include both immediate-response and purgatory paths, but the purgatory version requires a timeout in the test, which could end up being flaky + wasting time, but it doesn't look like there's a great way to mock that right now. Maybe this doesn't matter if it moves to a system test?
        • One case this doesn't handle yet is when the data reaches > 2GB after it's in the purgatory. The result is correct, but the response is not sent as soon as that condition is satisfied. This is because it looks like evaluating this exactly would require calling readMessageSets and evaluating the size of the message for every DelayedFetch.isSatisifed call. This sounds like it could end up being pretty expensive. Maybe there's a better way, perhaps an approximate scheme?
        • The test requires some extra bytes in the fetchSize for each partition, presumably for overhead in encoding. I haven't tracked down exactly how big that should be, but I'm guessing it could end up affecting the results of more comprehensive tests.
        Show
        Ewen Cheslack-Postava added a comment - This is a wip patch to fix this issue, which previous discussion suggests was due to the FetchResponse exceeding 2GB. My approach to triggering the issue, however, doesn't exhibit exactly the same issue but does cause an unrecoverable error that causes the consumer connection to terminate. (For reference, it causes the server to fail when FetchResponseSend.writeTo calls expectIncomplete and sendSize is negative due to overflow. This confuses the server since it looks like the message is already done sending and the server forcibly closes the consumer's connection.) The patch addresses the core issue by ensuring the returned message doesn't exceed 2GB by dropping parts of it in a way that otherwise shouldn't affect the consumer. But there are a lot of points that still need to be addressed: I started by building an integration test to trigger the issue, included in PrimitiveApiTest. However, since we necessarily need to have > 2GB data to trigger the issue, it's probably too expensive to include in this way. Offline discussion suggests maybe a system test would be a better place to include this. It's still included here for completeness. The implementation filters to a subset of the data in FetchResponse. The main reason for this is that this process needs to know the exact (or at least conservative estimate) size of serialized data, which only FetchResponse knows. But it's also a bit weird compared to other message classes, which are case classes and don't modify those inputs. Algorithm for choosing subset to return: initial approach is to remove random elements until we get below the limit. This is simple to understand and avoids starvation of specific TopicAndPartitions. Any concerns with this basic approach? I'm pretty sure I've managed to keep the < 2GB case to effectively the same computational cost (computing the serialized size, grouped data, etc. exactly once as before). However, for the > 2GB case I've only ensured correctness. In particular, the progressive removal and reevaluation of serialized size could potentially be very bad for very large data sets (e.g. starting a mirror maker against a large data set with large # of partitions from scratch). Note that the algorithm never deals with the actual message data, only metadata about what messages are available. This is relevant since this is what suggested the approach in the patch could still be performant – ReplicaManager.readMessageSets processes the entire FetchRequest and filters it down because the metadata involved is relatively small. Based on the previous two points, this really needs some more realistic large scale system tests to make sure this approach is not only correct, but provides reasonable performance (or indicates we need to revise the algorithm for selecting a subset of the data). Testing isn't really complete – I triggered the issue with 4 topics * 600 MB/topic, which is > 2GB. Another obvious case to check is when some partitions contain > 2GB on their own. I'd like someone to help sanity check the exact maximum FetchResponse serialized size we limit messages to. It's not Int.MaxValue because the FetchResponseSend class adds 4 + FetchResponse.sizeInBytes for it's own size. I'd like a sanity check that the extra 4 bytes is enough – is there any additional wrapping we might need to account for? Getting a test to hit exactly that narrow range could be tricky. The tests include both immediate-response and purgatory paths, but the purgatory version requires a timeout in the test, which could end up being flaky + wasting time, but it doesn't look like there's a great way to mock that right now. Maybe this doesn't matter if it moves to a system test? One case this doesn't handle yet is when the data reaches > 2GB after it's in the purgatory. The result is correct, but the response is not sent as soon as that condition is satisfied. This is because it looks like evaluating this exactly would require calling readMessageSets and evaluating the size of the message for every DelayedFetch.isSatisifed call. This sounds like it could end up being pretty expensive. Maybe there's a better way, perhaps an approximate scheme? The test requires some extra bytes in the fetchSize for each partition, presumably for overhead in encoding. I haven't tracked down exactly how big that should be, but I'm guessing it could end up affecting the results of more comprehensive tests.
        Hide
        Ewen Cheslack-Postava added a comment -

        Created reviewboard https://reviews.apache.org/r/26811/diff/
        against branch origin/trunk

        Show
        Ewen Cheslack-Postava added a comment - Created reviewboard https://reviews.apache.org/r/26811/diff/ against branch origin/trunk
        Hide
        DashengJu added a comment -

        Thanks for your reply.
        I will try to increase num.replica.fetchers in our production environment.
        do you have any plan to fixing the issue?

        Show
        DashengJu added a comment - Thanks for your reply. I will try to increase num.replica.fetchers in our production environment. do you have any plan to fixing the issue?
        Hide
        Jun Rao added a comment -

        You can do one or more of the following to get around this.

        1. Increase num.replica.fetchers (http://kafka.apache.org/documentation.html#brokerconfigs)
        2. Reduce replica.fetch.max.bytes, but make sure it's still >= message.max.bytes.

        Show
        Jun Rao added a comment - You can do one or more of the following to get around this. 1. Increase num.replica.fetchers ( http://kafka.apache.org/documentation.html#brokerconfigs ) 2. Reduce replica.fetch.max.bytes, but make sure it's still >= message.max.bytes.
        Hide
        DashengJu added a comment -

        I have encounter this problem.

        I have many topics and partitions, every topic have 2 replicas. Then one broker needs to consumer data from another broker. It seems the broker to fetch many replication, and the sync was failed, because of the error above. So the ISR list is always just the "preferred replica".

        I think this is a big problem.

        ============= below is the error log ====================================
        [2014-06-28 10:00:03,434] ERROR [ReplicaFetcherThread-0-1], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 710; ClientId: ReplicaFetcherThread-0-1; ReplicaId: 5; MaxWait: 3000 ms; MinBytes: 1 bytes; RequestInfo: [org.wasp_card_log,12] -> PartitionFetchInfo(0,67108864),[org.mobile_push_netevent,1] -> PartitionFetchInfo(492788,67108864),[org.mobile,8] -> PartitionFetchInfo(59767,67108864),[app.newbuyer,7] -> PartitionFetchInfo(42,67108864),[binlog.wwwdeal,6] -> PartitionFetchInfo(1718,67108864),[org.b,6] -> PartitionFetchInfo(150548,67108864),[org.filestorage,14] -> PartitionFetchInfo(11,67108864),[org.feedbackchange,6] -> PartitionFetchInfo(277,67108864),[org.eventlog,6] -> PartitionFetchInfo(101049,67108864),[org.bizappapi,10] -> PartitionFetchInfo(0,67108864),[org.data_resys_routerres,5] -> PartitionFetchInfo(606925,67108864),[log.mobile,5] -> PartitionFetchInfo(0,67108864),[org.mtrace,0] -> PartitionFetchInfo(75581,67108864),[org.mobile,3] -> PartitionFetchInfo(2062782,67108864),[org.data_resys_geo,10] -> PartitionFetchInfo(678,67108864),[org.data_asyncquerier_tablecolumn,12] -> PartitionFetchInfo(1,67108864),[org.mobile_push_applist,10] -> PartitionFetchInfo(2438334,67108864),[org.mobile_hotellog,12] -> PartitionFetchInfo(623,67108864),[org.mobile,15] -> PartitionFetchInfo(2376788,67108864),[org.stormlog,0] -> PartitionFetchInfo(0,67108864),[org.mtcrm,14] -> PartitionFetchInfo(250,67108864),[binlog.coupon,4] -> PartitionFetchInfo(149681,67108864),[org.mtrace,1] -> PartitionFetchInfo(1025277,67108864),[org.cos_sso,15] -> PartitionFetchInfo(3301,67108864),[org.data_resys_router,6] -> PartitionFetchInfo(1031,67108864),[log.mobile,3] -> PartitionFetchInfo(17453,67108864),[log.loginlog,7] -> PartitionFetchInfo(5932,67108864),[org.data_search_queryserver,13] -> PartitionFetchInfo(891,67108864),[org.mobile_push_applist,5] -> PartitionFetchInfo(180204,67108864),[org.data_resys_hotquery,4] -> PartitionFetchInfo(1153,67108864),[org.wasp_device,0] -> PartitionFetchInfo(10,67108864),[org.mobilerpc,9] -> PartitionFetchInfo(42137,67108864),[org.mtrace,9] -> PartitionFetchInfo(1783580,67108864),[org.mobile_push_applist,8] -> PartitionFetchInfo(2471375,67108864),[org.mobile_eventlog,1] -> PartitionFetchInfo(311414,67108864),[org.extendapply,15] -> PartitionFetchInfo(0,67108864),[app.newbuyer,14] -> PartitionFetchInfo(0,67108864),[org.mobile_push_record,13] -> PartitionFetchInfo(11,67108864),[org.mtrace,3] -> PartitionFetchInfo(1578588,67108864),[log.orderlog,8] -> PartitionFetchInfo(0,67108864),[org.mtrace,4] -> PartitionFetchInfo(122482,67108864),[org.tmplog,13] -> PartitionFetchInfo(0,67108864),[org.ecomlogin,0] -> PartitionFetchInfo(2,67108864),[org.payrequest,6] -> PartitionFetchInfo(6207,67108864),[org.dealrank,14] -> PartitionFetchInfo(3699,67108864),[org.wm_submitorder,15] -> PartitionFetchInfo(0,67108864),[org.web_seckill_ticketlog,2] -> PartitionFetchInfo(0,67108864),[org.nginx,4] -> PartitionFetchInfo(3122673,67108864),[org.mobile_group_push,11] -> PartitionFetchInfo(0,67108864),[org.data_resys_routerreq,11] -> PartitionFetchInfo(162226,67108864),[binlog.coupon,10] -> PartitionFetchInfo(32224,67108864),[log.eventlog,10] -> PartitionFetchInfo(31378,67108864),[log.intranet_nginx,2] -> PartitionFetchInfo(0,67108864),[org.extendapply,13] -> PartitionFetchInfo(1,67108864),[log.accesslog,0] -> PartitionFetchInfo(149156,67108864),[org.mobile,4] -> PartitionFetchInfo(51119,67108864),[org.ecomlogin,14] -> PartitionFetchInfo(655,67108864),[org.mobileapp,4] -> PartitionFetchInfo(150715,67108864),[org.mobile_push_netevent,14] -> PartitionFetchInfo(12863099,67108864),[org.wasp_card_log,4] -> PartitionFetchInfo(11,67108864),[org.data_resys_router,4] -> PartitionFetchInfo(35192,67108864),[org.nginx,15] -> PartitionFetchInfo(92880,67108864),[org.mobile_groupapi_search,6] -> PartitionFetchInfo(408,67108864),[org.wasp_page_visit_record,0] -> PartitionFetchInfo(0,67108864),
        [log.couponverify,4] -> PartitionFetchInfo(0,67108864),[org.nginx,9] -> PartitionFetchInfo(100270,67108864),[org.mtcrm,7] -> PartitionFetchInfo(5078,67108864),[org.data_resys_hotquery,6] -> PartitionFetchInfo(83107,67108864),[org.mobile_nginx,6] -> PartitionFetchInfo(157869,67108864),[org.mobile,9] -> PartitionFetchInfo(1831700,67108864),[org.intranet_nginx,9] -> PartitionFetchInfo(132779,67108864),[org.mobile_push_applist,4] -> PartitionFetchInfo(2279570,67108864),[org.dealrank,0] -> PartitionFetchInfo(103,67108864),[org.mobile_networklog,3] -> PartitionFetchInfo(1723358,67108864),[org.mobile_movie_statistics,5] -> PartitionFetchInfo(6309,67108864),[org.mtrace,2] -> PartitionFetchInfo(94002,67108864),[org.data_asyncquerier_sql,7] -> PartitionFetchInfo(1,67108864),[org.spamlogin,12] -> PartitionFetchInfo(2480,67108864),[org.search_dealrank,8] -> PartitionFetchInfo(24118,67108864),[log.orderlog,1] -> PartitionFetchInfo(7231,67108864),[log.b,12] -> PartitionFetchInfo(0,67108864),[org.mobile_push_netevent,11] -> PartitionFetchInfo(659425,67108864),[org.mobile_networklog,9] -> PartitionFetchInfo(1251021,67108864),[org.data_resys_combinerec,11] -> PartitionFetchInfo(21870,67108864),[org.paytobiznotify,9] -> PartitionFetchInfo(0,67108864),[org.mobile_push_applist,12] -> PartitionFetchInfo(3099746,67108864),[org.mtrace,7] -> PartitionFetchInfo(1190072,67108864),[org.mobile_nginx,0] -> PartitionFetchInfo(174905,67108864),[org.web_user_message_op,8] -> PartitionFetchInfo(4529,67108864),[binlog.coupon,12] -> PartitionFetchInfo(23996,67108864),[org.tmplog,15] -> PartitionFetchInfo(0,67108864),[org.paylog,1] -> PartitionFetchInfo(489,67108864),[org.data_resys_routerres,7] -> PartitionFetchInfo(10502,67108864),[binlog.coupon,0] -> PartitionFetchInfo(25215,67108864),[org.nginxerrorlog,9] -> PartitionFetchInfo(29483,67108864),[org.mobile_push_applist,11] -> PartitionFetchInfo(103347,67108864),[org.basync,7] -> PartitionFetchInfo(299174,67108864),[org.mobile_push_netevent,0] -> PartitionFetchInfo(12720799,67108864),[org.basync,5] -> PartitionFetchInfo(4184,67108864),[org.mobile_nginx,12] -> PartitionFetchInfo(181939,67108864),[org.cos_errorlog,0] -> PartitionFetchInfo(189,67108864),[org.nginxerrorlog,15] -> PartitionFetchInfo(27555,67108864),[org.captcha,4] -> PartitionFetchInfo(14233,67108864),[org.data_search_smartbox,11] -> PartitionFetchInfo(0,67108864),[org.mobile_push_applist,2] -> PartitionFetchInfo(2371719,67108864),[org.mobile,5] -> PartitionFetchInfo(1906756,67108864),[org.mobile_movielog,3] -> PartitionFetchInfo(126780,67108864),[org.mobile_push_netevent,8] -> PartitionFetchInfo(10731244,67108864),[org.nginxerrorlog,14] -> PartitionFetchInfo(1525,67108864),[org.bdecomaction,2] -> PartitionFetchInfo(0,67108864),[org.cos_errorlog,12] -> PartitionFetchInfo(123,67108864),[org.mobile_networklog,2] -> PartitionFetchInfo(62332,67108864),[org.mobile_xmlog,8] -> PartitionFetchInfo(478,67108864),[org.mobile_eventlog,15] -> PartitionFetchInfo(8429,67108864),[org.cos_errorlog,10] -> PartitionFetchInfo(2569,67108864),[org.eventlog,4] -> PartitionFetchInfo(935,67108864),[org.data_asyncquerier_sql,14] -> PartitionFetchInfo(0,67108864),[org.dealrank,12] -> PartitionFetchInfo(93,67108864),[org.search_queryserver,1] -> PartitionFetchInfo(0,67108864),[binlog.user,5] -> PartitionFetchInfo(5950,67108864),[org.mobile_group_push,9] -> PartitionFetchInfo(25002,67108864),[org.mobile,13] -> PartitionFetchInfo(2117661,67108864),[org.nginx,11] -> PartitionFetchInfo(140093,67108864),[org.mobile_push_applist,7] -> PartitionFetchInfo(171384,67108864),[org.mobile_push_netevent,13] -> PartitionFetchInfo(531815,67108864),[org.mobile_eventlog,3] -> PartitionFetchInfo(14430,67108864),[org.captcha,6] -> PartitionFetchInfo(88,67108864),[org.mobile_push_netevent,5] -> PartitionFetchInfo(460960,67108864),[org.mobile_order_nomsg,4] -> PartitionFetchInfo(3,67108864),[org.data_xmltableview_access,2] -> PartitionFetchInfo(7,67108864),[org.payrequest,13] -> PartitionFetchInfo(101,67108864),[org.mtrace,8] -> Partiti
        ...
        4),[org.mobile_mars_report,10] -> PartitionFetchInfo(2926,67108864),[org.mobile,12] -> PartitionFetchInfo(55550,67108864),[org.mobile_movielog,2] -> PartitionFetchInfo(1519,67108864),[org.commonsubscribe,3] -> PartitionFetchInfo(0,67108864),[org.apacheerrorlog,14] -> PartitionFetchInfo(1366,67108864),[org.wm_submitorder,13] -> PartitionFetchInfo(3,67108864),[org.data_resys_routerreq,9] -> PartitionFetchInfo(1582,67108864),[org.mobile_push_applist,1] -> PartitionFetchInfo(228829,67108864),[org.mtrace,11] -> PartitionFetchInfo(1232471,67108864),[org.404log,12] -> PartitionFetchInfo(0,67108864),[org.mtsg,4] -> PartitionFetchInfo(4,67108864),[org.applog,4] -> PartitionFetchInfo(607,67108864),[org.mobile_movielog,8] -> PartitionFetchInfo(3223,67108864),[org.mobile_push_netevent,6] -> PartitionFetchInfo(13081128,67108864),[org.cmsdealapi,11] -> PartitionFetchInfo(0,67108864),[org.mobile,2] -> PartitionFetchInfo(53029,67108864),[org.mobilerebindinfo,6] -> PartitionFetchInfo(32,67108864),[org.nginx,3] -> PartitionFetchInfo(105261,67108864),[org.mobile_push_applist,3] -> PartitionFetchInfo(162462,67108864),[org.mobile_xmlog,10] -> PartitionFetchInfo(21754,67108864),[org.mobile,6] -> PartitionFetchInfo(65015,67108864),[org.mobile_movielog,9] -> PartitionFetchInfo(132672,67108864),[org.intranet_nginx,11] -> PartitionFetchInfo(4738,67108864),[binlog.user,0] -> PartitionFetchInfo(225872,67108864) (kafka.server.ReplicaFetcherThread)
        java.lang.IllegalArgumentException
        at java.nio.Buffer.limit(Buffer.java:267)
        at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:33)
        at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:83)
        at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:81)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Range.foreach(Range.scala:141)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.api.TopicData$.readFrom(FetchResponse.scala:81)
        at kafka.api.FetchResponse$$anonfun$3.apply(FetchResponse.scala:142)
        at kafka.api.FetchResponse$$anonfun$3.apply(FetchResponse.scala:141)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Range.foreach(Range.scala:141)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:141)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:112)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

        Show
        DashengJu added a comment - I have encounter this problem. I have many topics and partitions, every topic have 2 replicas. Then one broker needs to consumer data from another broker. It seems the broker to fetch many replication, and the sync was failed, because of the error above. So the ISR list is always just the "preferred replica". I think this is a big problem. ============= below is the error log ==================================== [2014-06-28 10:00:03,434] ERROR [ReplicaFetcherThread-0-1] , Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 710; ClientId: ReplicaFetcherThread-0-1; ReplicaId: 5; MaxWait: 3000 ms; MinBytes: 1 bytes; RequestInfo: [org.wasp_card_log,12] -> PartitionFetchInfo(0,67108864), [org.mobile_push_netevent,1] -> PartitionFetchInfo(492788,67108864), [org.mobile,8] -> PartitionFetchInfo(59767,67108864), [app.newbuyer,7] -> PartitionFetchInfo(42,67108864), [binlog.wwwdeal,6] -> PartitionFetchInfo(1718,67108864), [org.b,6] -> PartitionFetchInfo(150548,67108864), [org.filestorage,14] -> PartitionFetchInfo(11,67108864), [org.feedbackchange,6] -> PartitionFetchInfo(277,67108864), [org.eventlog,6] -> PartitionFetchInfo(101049,67108864), [org.bizappapi,10] -> PartitionFetchInfo(0,67108864), [org.data_resys_routerres,5] -> PartitionFetchInfo(606925,67108864), [log.mobile,5] -> PartitionFetchInfo(0,67108864), [org.mtrace,0] -> PartitionFetchInfo(75581,67108864), [org.mobile,3] -> PartitionFetchInfo(2062782,67108864), [org.data_resys_geo,10] -> PartitionFetchInfo(678,67108864), [org.data_asyncquerier_tablecolumn,12] -> PartitionFetchInfo(1,67108864), [org.mobile_push_applist,10] -> PartitionFetchInfo(2438334,67108864), [org.mobile_hotellog,12] -> PartitionFetchInfo(623,67108864), [org.mobile,15] -> PartitionFetchInfo(2376788,67108864), [org.stormlog,0] -> PartitionFetchInfo(0,67108864), [org.mtcrm,14] -> PartitionFetchInfo(250,67108864), [binlog.coupon,4] -> PartitionFetchInfo(149681,67108864), [org.mtrace,1] -> PartitionFetchInfo(1025277,67108864), [org.cos_sso,15] -> PartitionFetchInfo(3301,67108864), [org.data_resys_router,6] -> PartitionFetchInfo(1031,67108864), [log.mobile,3] -> PartitionFetchInfo(17453,67108864), [log.loginlog,7] -> PartitionFetchInfo(5932,67108864), [org.data_search_queryserver,13] -> PartitionFetchInfo(891,67108864), [org.mobile_push_applist,5] -> PartitionFetchInfo(180204,67108864), [org.data_resys_hotquery,4] -> PartitionFetchInfo(1153,67108864), [org.wasp_device,0] -> PartitionFetchInfo(10,67108864), [org.mobilerpc,9] -> PartitionFetchInfo(42137,67108864), [org.mtrace,9] -> PartitionFetchInfo(1783580,67108864), [org.mobile_push_applist,8] -> PartitionFetchInfo(2471375,67108864), [org.mobile_eventlog,1] -> PartitionFetchInfo(311414,67108864), [org.extendapply,15] -> PartitionFetchInfo(0,67108864), [app.newbuyer,14] -> PartitionFetchInfo(0,67108864), [org.mobile_push_record,13] -> PartitionFetchInfo(11,67108864), [org.mtrace,3] -> PartitionFetchInfo(1578588,67108864), [log.orderlog,8] -> PartitionFetchInfo(0,67108864), [org.mtrace,4] -> PartitionFetchInfo(122482,67108864), [org.tmplog,13] -> PartitionFetchInfo(0,67108864), [org.ecomlogin,0] -> PartitionFetchInfo(2,67108864), [org.payrequest,6] -> PartitionFetchInfo(6207,67108864), [org.dealrank,14] -> PartitionFetchInfo(3699,67108864), [org.wm_submitorder,15] -> PartitionFetchInfo(0,67108864), [org.web_seckill_ticketlog,2] -> PartitionFetchInfo(0,67108864), [org.nginx,4] -> PartitionFetchInfo(3122673,67108864), [org.mobile_group_push,11] -> PartitionFetchInfo(0,67108864), [org.data_resys_routerreq,11] -> PartitionFetchInfo(162226,67108864), [binlog.coupon,10] -> PartitionFetchInfo(32224,67108864), [log.eventlog,10] -> PartitionFetchInfo(31378,67108864), [log.intranet_nginx,2] -> PartitionFetchInfo(0,67108864), [org.extendapply,13] -> PartitionFetchInfo(1,67108864), [log.accesslog,0] -> PartitionFetchInfo(149156,67108864), [org.mobile,4] -> PartitionFetchInfo(51119,67108864), [org.ecomlogin,14] -> PartitionFetchInfo(655,67108864), [org.mobileapp,4] -> PartitionFetchInfo(150715,67108864), [org.mobile_push_netevent,14] -> PartitionFetchInfo(12863099,67108864), [org.wasp_card_log,4] -> PartitionFetchInfo(11,67108864), [org.data_resys_router,4] -> PartitionFetchInfo(35192,67108864), [org.nginx,15] -> PartitionFetchInfo(92880,67108864), [org.mobile_groupapi_search,6] -> PartitionFetchInfo(408,67108864), [org.wasp_page_visit_record,0] -> PartitionFetchInfo(0,67108864), [log.couponverify,4] -> PartitionFetchInfo(0,67108864), [org.nginx,9] -> PartitionFetchInfo(100270,67108864), [org.mtcrm,7] -> PartitionFetchInfo(5078,67108864), [org.data_resys_hotquery,6] -> PartitionFetchInfo(83107,67108864), [org.mobile_nginx,6] -> PartitionFetchInfo(157869,67108864), [org.mobile,9] -> PartitionFetchInfo(1831700,67108864), [org.intranet_nginx,9] -> PartitionFetchInfo(132779,67108864), [org.mobile_push_applist,4] -> PartitionFetchInfo(2279570,67108864), [org.dealrank,0] -> PartitionFetchInfo(103,67108864), [org.mobile_networklog,3] -> PartitionFetchInfo(1723358,67108864), [org.mobile_movie_statistics,5] -> PartitionFetchInfo(6309,67108864), [org.mtrace,2] -> PartitionFetchInfo(94002,67108864), [org.data_asyncquerier_sql,7] -> PartitionFetchInfo(1,67108864), [org.spamlogin,12] -> PartitionFetchInfo(2480,67108864), [org.search_dealrank,8] -> PartitionFetchInfo(24118,67108864), [log.orderlog,1] -> PartitionFetchInfo(7231,67108864), [log.b,12] -> PartitionFetchInfo(0,67108864), [org.mobile_push_netevent,11] -> PartitionFetchInfo(659425,67108864), [org.mobile_networklog,9] -> PartitionFetchInfo(1251021,67108864), [org.data_resys_combinerec,11] -> PartitionFetchInfo(21870,67108864), [org.paytobiznotify,9] -> PartitionFetchInfo(0,67108864), [org.mobile_push_applist,12] -> PartitionFetchInfo(3099746,67108864), [org.mtrace,7] -> PartitionFetchInfo(1190072,67108864), [org.mobile_nginx,0] -> PartitionFetchInfo(174905,67108864), [org.web_user_message_op,8] -> PartitionFetchInfo(4529,67108864), [binlog.coupon,12] -> PartitionFetchInfo(23996,67108864), [org.tmplog,15] -> PartitionFetchInfo(0,67108864), [org.paylog,1] -> PartitionFetchInfo(489,67108864), [org.data_resys_routerres,7] -> PartitionFetchInfo(10502,67108864), [binlog.coupon,0] -> PartitionFetchInfo(25215,67108864), [org.nginxerrorlog,9] -> PartitionFetchInfo(29483,67108864), [org.mobile_push_applist,11] -> PartitionFetchInfo(103347,67108864), [org.basync,7] -> PartitionFetchInfo(299174,67108864), [org.mobile_push_netevent,0] -> PartitionFetchInfo(12720799,67108864), [org.basync,5] -> PartitionFetchInfo(4184,67108864), [org.mobile_nginx,12] -> PartitionFetchInfo(181939,67108864), [org.cos_errorlog,0] -> PartitionFetchInfo(189,67108864), [org.nginxerrorlog,15] -> PartitionFetchInfo(27555,67108864), [org.captcha,4] -> PartitionFetchInfo(14233,67108864), [org.data_search_smartbox,11] -> PartitionFetchInfo(0,67108864), [org.mobile_push_applist,2] -> PartitionFetchInfo(2371719,67108864), [org.mobile,5] -> PartitionFetchInfo(1906756,67108864), [org.mobile_movielog,3] -> PartitionFetchInfo(126780,67108864), [org.mobile_push_netevent,8] -> PartitionFetchInfo(10731244,67108864), [org.nginxerrorlog,14] -> PartitionFetchInfo(1525,67108864), [org.bdecomaction,2] -> PartitionFetchInfo(0,67108864), [org.cos_errorlog,12] -> PartitionFetchInfo(123,67108864), [org.mobile_networklog,2] -> PartitionFetchInfo(62332,67108864), [org.mobile_xmlog,8] -> PartitionFetchInfo(478,67108864), [org.mobile_eventlog,15] -> PartitionFetchInfo(8429,67108864), [org.cos_errorlog,10] -> PartitionFetchInfo(2569,67108864), [org.eventlog,4] -> PartitionFetchInfo(935,67108864), [org.data_asyncquerier_sql,14] -> PartitionFetchInfo(0,67108864), [org.dealrank,12] -> PartitionFetchInfo(93,67108864), [org.search_queryserver,1] -> PartitionFetchInfo(0,67108864), [binlog.user,5] -> PartitionFetchInfo(5950,67108864), [org.mobile_group_push,9] -> PartitionFetchInfo(25002,67108864), [org.mobile,13] -> PartitionFetchInfo(2117661,67108864), [org.nginx,11] -> PartitionFetchInfo(140093,67108864), [org.mobile_push_applist,7] -> PartitionFetchInfo(171384,67108864), [org.mobile_push_netevent,13] -> PartitionFetchInfo(531815,67108864), [org.mobile_eventlog,3] -> PartitionFetchInfo(14430,67108864), [org.captcha,6] -> PartitionFetchInfo(88,67108864), [org.mobile_push_netevent,5] -> PartitionFetchInfo(460960,67108864), [org.mobile_order_nomsg,4] -> PartitionFetchInfo(3,67108864), [org.data_xmltableview_access,2] -> PartitionFetchInfo(7,67108864), [org.payrequest,13] -> PartitionFetchInfo(101,67108864), [org.mtrace,8] -> Partiti ... 4), [org.mobile_mars_report,10] -> PartitionFetchInfo(2926,67108864), [org.mobile,12] -> PartitionFetchInfo(55550,67108864), [org.mobile_movielog,2] -> PartitionFetchInfo(1519,67108864), [org.commonsubscribe,3] -> PartitionFetchInfo(0,67108864), [org.apacheerrorlog,14] -> PartitionFetchInfo(1366,67108864), [org.wm_submitorder,13] -> PartitionFetchInfo(3,67108864), [org.data_resys_routerreq,9] -> PartitionFetchInfo(1582,67108864), [org.mobile_push_applist,1] -> PartitionFetchInfo(228829,67108864), [org.mtrace,11] -> PartitionFetchInfo(1232471,67108864), [org.404log,12] -> PartitionFetchInfo(0,67108864), [org.mtsg,4] -> PartitionFetchInfo(4,67108864), [org.applog,4] -> PartitionFetchInfo(607,67108864), [org.mobile_movielog,8] -> PartitionFetchInfo(3223,67108864), [org.mobile_push_netevent,6] -> PartitionFetchInfo(13081128,67108864), [org.cmsdealapi,11] -> PartitionFetchInfo(0,67108864), [org.mobile,2] -> PartitionFetchInfo(53029,67108864), [org.mobilerebindinfo,6] -> PartitionFetchInfo(32,67108864), [org.nginx,3] -> PartitionFetchInfo(105261,67108864), [org.mobile_push_applist,3] -> PartitionFetchInfo(162462,67108864), [org.mobile_xmlog,10] -> PartitionFetchInfo(21754,67108864), [org.mobile,6] -> PartitionFetchInfo(65015,67108864), [org.mobile_movielog,9] -> PartitionFetchInfo(132672,67108864), [org.intranet_nginx,11] -> PartitionFetchInfo(4738,67108864), [binlog.user,0] -> PartitionFetchInfo(225872,67108864) (kafka.server.ReplicaFetcherThread) java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:267) at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:33) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:83) at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.TopicData$.readFrom(FetchResponse.scala:81) at kafka.api.FetchResponse$$anonfun$3.apply(FetchResponse.scala:142) at kafka.api.FetchResponse$$anonfun$3.apply(FetchResponse.scala:141) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:141) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:112) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
        Hide
        Gerrit Jansen van Vuuren added a comment -

        I've written a consumer https://github.com/gerritjvv/kafka-fast, its running on the same dataset implemented what I said in the previous comment, I do not see any errors yet.

        Show
        Gerrit Jansen van Vuuren added a comment - I've written a consumer https://github.com/gerritjvv/kafka-fast , its running on the same dataset implemented what I said in the previous comment, I do not see any errors yet.
        Hide
        Gerrit Jansen van Vuuren added a comment -

        , it doesn't seem like there is an easy fix,
        I'm trying to write my own consumer that would try and work around this error, i.e. ignore the initial message length, and still read the message sets, even though the actual topic/partition/messageset sequences might go over the initial 4 byte int message size. will comment on how this goes.

        Show
        Gerrit Jansen van Vuuren added a comment - , it doesn't seem like there is an easy fix, I'm trying to write my own consumer that would try and work around this error, i.e. ignore the initial message length, and still read the message sets, even though the actual topic/partition/messageset sequences might go over the initial 4 byte int message size. will comment on how this goes.
        Hide
        Jun Rao added a comment -

        Yes, I wasn't thinking of fixing the issue since I can't think of an easy way. I was just thinking of giving the client the right error message so that the client is not confused btw this case and an actual log corruption.

        Show
        Jun Rao added a comment - Yes, I wasn't thinking of fixing the issue since I can't think of an easy way. I was just thinking of giving the client the right error message so that the client is not confused btw this case and an actual log corruption.
        Hide
        Gerrit Jansen van Vuuren added a comment -

        If I throw an error then what? How can I consume the data in the first place? I'm using all the defaults and the provided stock consumer/producer.

        The problem here is that I've not setup anywhere in the configuration to consume more than 2GB, its the way that the consumer does its fetch that causes the data to go over 2GB.

        So it means that: If you use the broker and consumer and for some reason the consumer does a fetch over 2GB at any time, I'll be unable to consume the data ever, even though no single message is even over the 100-200 mb. The only solution left to me is then either write my own client, or delete the topic data every time I see this, which is about 5 seconds after I removed and recreated the topic.

        Show
        Gerrit Jansen van Vuuren added a comment - If I throw an error then what? How can I consume the data in the first place? I'm using all the defaults and the provided stock consumer/producer. The problem here is that I've not setup anywhere in the configuration to consume more than 2GB, its the way that the consumer does its fetch that causes the data to go over 2GB. So it means that: If you use the broker and consumer and for some reason the consumer does a fetch over 2GB at any time, I'll be unable to consume the data ever, even though no single message is even over the 100-200 mb. The only solution left to me is then either write my own client, or delete the topic data every time I see this, which is about 5 seconds after I removed and recreated the topic.
        Hide
        Jun Rao added a comment -

        To really support fetching more than 2GB of data in a single fetch response requires wire protocol change. One simple thing that we can do immediately is to sanity check the response size. If it's more than 2GB, just throw an error to the client.

        Show
        Jun Rao added a comment - To really support fetching more than 2GB of data in a single fetch response requires wire protocol change. One simple thing that we can do immediately is to sanity check the response size. If it's more than 2GB, just throw an error to the client.
        Hide
        Gerrit Jansen van Vuuren added a comment -

        Hi, thanks for the response, but this is not really a solution.

        Its obvious that how the api does the call is then flawed that or the broker should take care of it. I cannot add more consumers just for this issue because my data will grow maybe it doubles in a year, then I'd run again into the same problem, which is in the end an integer overflow problem and not a resource problem.

        i.e. beyond a certain amount of data the api fails and I should add more consumers? I've opted for writing my own api that does calls on a topic,partition basis to each broker, so far its working.

        Show
        Gerrit Jansen van Vuuren added a comment - Hi, thanks for the response, but this is not really a solution. Its obvious that how the api does the call is then flawed that or the broker should take care of it. I cannot add more consumers just for this issue because my data will grow maybe it doubles in a year, then I'd run again into the same problem, which is in the end an integer overflow problem and not a resource problem. i.e. beyond a certain amount of data the api fails and I should add more consumers? I've opted for writing my own api that does calls on a topic,partition basis to each broker, so far its working.
        Hide
        Neha Narkhede added a comment -

        As suggested by Jun -

        If a broker is the leader of multiple partitions of a topic, the high level
        consumer will fetch all those partitions in a single fetch request. Then
        the aggregate of the fetched data from multiple partitions could be more
        than 2GB.

        You can try using more consumers in the same consumer group to reduce #
        partitions fetched per consumer.

        Thanks,

        Jun

        Show
        Neha Narkhede added a comment - As suggested by Jun - If a broker is the leader of multiple partitions of a topic, the high level consumer will fetch all those partitions in a single fetch request. Then the aggregate of the fetched data from multiple partitions could be more than 2GB. You can try using more consumers in the same consumer group to reduce # partitions fetched per consumer. Thanks, Jun

          People

          • Assignee:
            Ewen Cheslack-Postava
            Reporter:
            Gerrit Jansen van Vuuren
          • Votes:
            1 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

            • Created:
              Updated:

              Development