Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-374

Need to be able to change SSP Grouper

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • 0.8.0
    • 0.8.0
    • container
    • None

    Description

      I recently ran a job with checkpointing enabled. The default grouper was used (group by partition). I then decided that I wanted to increase parallelism, so I set the grouper to group by SSP. This cause the container to get wedged in this loop:

      2014-08-08 18:56:06 VerifiableProperties [INFO] Verifying properties
      2014-08-08 18:56:06 VerifiableProperties [INFO] Property client.id is overridden to samza_checkpoint_manager-repartition_by_treeid-i001-1407524103759-2
      2014-08-08 18:56:06 VerifiableProperties [INFO] Property metadata.broker.list is overridden to kafka-vip-e:10251
      2014-08-08 18:56:06 VerifiableProperties [INFO] Property request.timeout.ms is overridden to 60000
      2014-08-08 18:56:06 ClientUtils$ [INFO] Fetching metadata from broker id:0,host:kafka-vip-e,port:10251 with correlation id 7 for 1 topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
      2014-08-08 18:56:06 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 60000 (requested 60000), SO_RCVBUF = 43690 (requested -1), SO_SNDBUF = 102400 (requested 102400).
      2014-08-08 18:56:06 SyncProducer [INFO] Connected to kafka-vip-e:10251 for producing
      2014-08-08 18:56:06 SyncProducer [INFO] Disconnecting from kafka-vip-e:10251
      2014-08-08 18:56:06 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
      2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Connecting to leader app196.:10251 for topic __samza_checkpoint_ver_1_for_my-job_i001 and to fetch all checkpoint messages.
      2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Got offset 13 for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0. Attempting to fetch messages for changelog partition mapping.
      2014-08-08 18:56:06 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
      2014-08-08 18:56:06 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 60000 (requested 60000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 8192 (requested -1).
      2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Get latest offset 80626 for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0.
      2014-08-08 18:56:07 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
      2014-08-08 18:56:07 KafkaCheckpointManager [WARN] While trying to read last changelog partition mapping entry for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0: org.apache.samza.SamzaException: Exception while deserializing checkpoint key. Retrying.
      2014-08-08 18:56:07 KafkaCheckpointManager [DEBUG] Exception detail:
      org.apache.samza.SamzaException: Exception while deserializing checkpoint key
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:177)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:300)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:292)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at kafka.message.MessageSet.foreach(MessageSet.scala:67)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:292)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:254)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:253)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:240)
      	at org.apache.samza.util.Util$.getTaskNameToChangeLogPartitionMapping(Util.scala:280)
      	at org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79)
      	at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
      	at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
      Caused by: org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues: Checkpoint key's SystemStreamPartition Grouper factory (org.apache.samza.container.grouper.stream.GroupByPartitionFactory) does not match value from current configuration (org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory).  This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported.
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:170)
      	... 15 more
      2014-08-08 18:56:17 VerifiableProperties [INFO] Verifying properties
      2014-08-08 18:56:17 VerifiableProperties [INFO] Property client.id is overridden to samza_checkpoint_manager-repartition_by_treeid-i001-1407524103759-2
      2014-08-08 18:56:17 VerifiableProperties [INFO] Property metadata.broker.list is overridden to kafka-vip-e:10251
      2014-08-08 18:56:17 VerifiableProperties [INFO] Property request.timeout.ms is overridden to 60000
      2014-08-08 18:56:17 ClientUtils$ [INFO] Fetching metadata from broker id:0,host:kafka-vip-e,port:10251 with correlation id 8 for 1 topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
      2014-08-08 18:56:17 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 60000 (requested 60000), SO_RCVBUF = 43690 (requested -1), SO_SNDBUF = 102400 (requested 102400).
      2014-08-08 18:56:17 SyncProducer [INFO] Connected to kafka-vip-e:10251 for producing
      2014-08-08 18:56:17 SyncProducer [INFO] Disconnecting from kafka-vip-e:10251
      2014-08-08 18:56:17 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
      2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Connecting to leader app196.:10251 for topic __samza_checkpoint_ver_1_for_my-job_i001 and to fetch all checkpoint messages.
      2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Got offset 14 for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0. Attempting to fetch messages for changelog partition mapping.
      2014-08-08 18:56:17 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
      2014-08-08 18:56:17 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 60000 (requested 60000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 8192 (requested -1).
      2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Get latest offset 80626 for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0.
      2014-08-08 18:56:17 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
      2014-08-08 18:56:17 KafkaCheckpointManager [WARN] While trying to read last changelog partition mapping entry for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0: org.apache.samza.SamzaException: Exception while deserializing checkpoint key. Retrying.
      2014-08-08 18:56:17 KafkaCheckpointManager [DEBUG] Exception detail:
      org.apache.samza.SamzaException: Exception while deserializing checkpoint key
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:177)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:300)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:292)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at kafka.message.MessageSet.foreach(MessageSet.scala:67)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:292)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:254)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:253)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:240)
      	at org.apache.samza.util.Util$.getTaskNameToChangeLogPartitionMapping(Util.scala:280)
      	at org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79)
      	at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
      	at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
      Caused by: org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues: Checkpoint key's SystemStreamPartition Grouper factory (org.apache.samza.container.grouper.stream.GroupByPartitionFactory) does not match value from current configuration (org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory).  This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported.
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:170)
      	... 15 more
      

      I'm knowingly breaking grouping semantics because my job doesn't need it. As I recall, this was discussed in SAMZA-123, and we were all worried about people accidentally breaking their state/grouping, so we hard fail when the grouper is changed. The problem is, I can't change the checkpoint topic name, nor is it easy for me to delete the checkpoint messages in the topic, so I'm kind of stuck.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              criccomini Chris Riccomini
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: