Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-374

Need to be able to change SSP Grouper

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Won't Fix
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: container
    • Labels:
      None

      Description

      I recently ran a job with checkpointing enabled. The default grouper was used (group by partition). I then decided that I wanted to increase parallelism, so I set the grouper to group by SSP. This cause the container to get wedged in this loop:

      2014-08-08 18:56:06 VerifiableProperties [INFO] Verifying properties
      2014-08-08 18:56:06 VerifiableProperties [INFO] Property client.id is overridden to samza_checkpoint_manager-repartition_by_treeid-i001-1407524103759-2
      2014-08-08 18:56:06 VerifiableProperties [INFO] Property metadata.broker.list is overridden to kafka-vip-e:10251
      2014-08-08 18:56:06 VerifiableProperties [INFO] Property request.timeout.ms is overridden to 60000
      2014-08-08 18:56:06 ClientUtils$ [INFO] Fetching metadata from broker id:0,host:kafka-vip-e,port:10251 with correlation id 7 for 1 topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
      2014-08-08 18:56:06 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 60000 (requested 60000), SO_RCVBUF = 43690 (requested -1), SO_SNDBUF = 102400 (requested 102400).
      2014-08-08 18:56:06 SyncProducer [INFO] Connected to kafka-vip-e:10251 for producing
      2014-08-08 18:56:06 SyncProducer [INFO] Disconnecting from kafka-vip-e:10251
      2014-08-08 18:56:06 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
      2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Connecting to leader app196.:10251 for topic __samza_checkpoint_ver_1_for_my-job_i001 and to fetch all checkpoint messages.
      2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Got offset 13 for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0. Attempting to fetch messages for changelog partition mapping.
      2014-08-08 18:56:06 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
      2014-08-08 18:56:06 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 60000 (requested 60000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 8192 (requested -1).
      2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Get latest offset 80626 for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0.
      2014-08-08 18:56:07 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
      2014-08-08 18:56:07 KafkaCheckpointManager [WARN] While trying to read last changelog partition mapping entry for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0: org.apache.samza.SamzaException: Exception while deserializing checkpoint key. Retrying.
      2014-08-08 18:56:07 KafkaCheckpointManager [DEBUG] Exception detail:
      org.apache.samza.SamzaException: Exception while deserializing checkpoint key
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:177)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:300)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:292)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at kafka.message.MessageSet.foreach(MessageSet.scala:67)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:292)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:254)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:253)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:240)
      	at org.apache.samza.util.Util$.getTaskNameToChangeLogPartitionMapping(Util.scala:280)
      	at org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79)
      	at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
      	at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
      Caused by: org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues: Checkpoint key's SystemStreamPartition Grouper factory (org.apache.samza.container.grouper.stream.GroupByPartitionFactory) does not match value from current configuration (org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory).  This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported.
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:170)
      	... 15 more
      2014-08-08 18:56:17 VerifiableProperties [INFO] Verifying properties
      2014-08-08 18:56:17 VerifiableProperties [INFO] Property client.id is overridden to samza_checkpoint_manager-repartition_by_treeid-i001-1407524103759-2
      2014-08-08 18:56:17 VerifiableProperties [INFO] Property metadata.broker.list is overridden to kafka-vip-e:10251
      2014-08-08 18:56:17 VerifiableProperties [INFO] Property request.timeout.ms is overridden to 60000
      2014-08-08 18:56:17 ClientUtils$ [INFO] Fetching metadata from broker id:0,host:kafka-vip-e,port:10251 with correlation id 8 for 1 topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
      2014-08-08 18:56:17 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 60000 (requested 60000), SO_RCVBUF = 43690 (requested -1), SO_SNDBUF = 102400 (requested 102400).
      2014-08-08 18:56:17 SyncProducer [INFO] Connected to kafka-vip-e:10251 for producing
      2014-08-08 18:56:17 SyncProducer [INFO] Disconnecting from kafka-vip-e:10251
      2014-08-08 18:56:17 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
      2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Connecting to leader app196.:10251 for topic __samza_checkpoint_ver_1_for_my-job_i001 and to fetch all checkpoint messages.
      2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Got offset 14 for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0. Attempting to fetch messages for changelog partition mapping.
      2014-08-08 18:56:17 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
      2014-08-08 18:56:17 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 60000 (requested 60000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 8192 (requested -1).
      2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Get latest offset 80626 for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0.
      2014-08-08 18:56:17 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
      2014-08-08 18:56:17 KafkaCheckpointManager [WARN] While trying to read last changelog partition mapping entry for topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0: org.apache.samza.SamzaException: Exception while deserializing checkpoint key. Retrying.
      2014-08-08 18:56:17 KafkaCheckpointManager [DEBUG] Exception detail:
      org.apache.samza.SamzaException: Exception while deserializing checkpoint key
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:177)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:300)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:292)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at kafka.message.MessageSet.foreach(MessageSet.scala:67)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:292)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:254)
      	at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:253)
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:240)
      	at org.apache.samza.util.Util$.getTaskNameToChangeLogPartitionMapping(Util.scala:280)
      	at org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79)
      	at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
      	at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
      Caused by: org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues: Checkpoint key's SystemStreamPartition Grouper factory (org.apache.samza.container.grouper.stream.GroupByPartitionFactory) does not match value from current configuration (org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory).  This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported.
      	at org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:170)
      	... 15 more
      

      I'm knowingly breaking grouping semantics because my job doesn't need it. As I recall, this was discussed in SAMZA-123, and we were all worried about people accidentally breaking their state/grouping, so we hard fail when the grouper is changed. The problem is, I can't change the checkpoint topic name, nor is it easy for me to delete the checkpoint messages in the topic, so I'm kind of stuck.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                criccomini Chris Riccomini
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: