Uploaded image for project: 'Apache RocketMQ'
  1. Apache RocketMQ
  2. ROCKETMQ-309

add new consumer implement

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: In Progress
    • Trivial
    • Resolution: Unresolved
    • 4.1.0-incubating
    • None
    • rocketmq-client
    • Hide
      package org.apache.rocketmq.client.consumer;

      import org.apache.rocketmq.client.exception.MQClientException;
      import org.apache.rocketmq.client.log.ClientLogger;
      import org.apache.rocketmq.common.ThreadFactoryImpl;
      import org.apache.rocketmq.common.message.MessageQueue;
      import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
      import org.slf4j.Logger;

      import java.util.Iterator;
      import java.util.Map.Entry;
      import java.util.Set;
      import java.util.concurrent.*;

      /**
       * Schedule service for pull consumer,which use broadcast model and consume message from last offset
       */
      public class MQBroadcastFromLastConsumer {
          private final Logger log = ClientLogger.getLog();

          private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
          private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
                  new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
          private DefaultMQPullConsumer defaultMQPullConsumer;
          private int pullThreadNums;
          private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =
                  new ConcurrentHashMap<String, PullTaskCallback>();
          private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

          private Executor rebalanceExecutor = Executors.newSingleThreadExecutor();

          public MQBroadcastFromLastConsumer(final String consumerGroup, int pullThreadNums) {
              this.pullThreadNums = pullThreadNums;
              this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
              this.defaultMQPullConsumer.setMessageModel(MessageModel.BROADCASTING);
          }

          public void putTask(final String topic, final Set<MessageQueue> mqNewSet) {
              rebalanceExecutor.execute(new Runnable() {
                  @Override
                  public void run() {
                      Iterator<Entry<MessageQueue, PullTaskImpl>> it = MQBroadcastFromLastConsumer.this.taskTable.entrySet().iterator();
                      while (it.hasNext()) {
                          Entry<MessageQueue, PullTaskImpl> next = it.next();
                          if (next.getKey().getTopic().equals(topic)) {
                              if (!mqNewSet.contains(next.getKey())) {
                                  next.getValue().setCancelled(true);
                                  it.remove();
                              }
                          }
                      }

                      for (MessageQueue mq : mqNewSet) {
                          if (!taskTable.containsKey(mq)) {
                              long offset = 0;
                              try {
                                  offset = defaultMQPullConsumer.maxOffset(mq);
                              } catch (MQClientException e) {
                                  log.error("get max offset error:{}", e.getMessage());
                              }
                              if (offset < 0) {
                                  offset = 0;
                              }

                              MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getOffsetStore().updateOffset(mq, offset, false);
                              PullTaskImpl command = new PullTaskImpl(mq);
                              MQBroadcastFromLastConsumer.this.taskTable.put(mq, command);
                              MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS);
                          }
                      }
                  }
              });
          }

          public void start() throws MQClientException {
              final String group = this.defaultMQPullConsumer.getConsumerGroup();
              this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
                      this.pullThreadNums,
                      new ThreadFactoryImpl("PullMsgThread-" + group)
              );

              this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);

              this.defaultMQPullConsumer.start();

              log.info("MQBroadcastFromLastConsumer start OK, {} {}",
                      this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);
          }

          public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
              this.callbackTable.put(topic, callback);
              this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
          }

          public void shutdown() {
              if (this.scheduledThreadPoolExecutor != null) {
                  this.scheduledThreadPoolExecutor.shutdown();
              }

              if (this.defaultMQPullConsumer != null) {
                  this.defaultMQPullConsumer.shutdown();
              }
          }

          class MessageQueueListenerImpl implements MessageQueueListener {
              @Override
              public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                  log.info("messageQueueChanged,topic:[{}]", topic);
                  MessageModel messageModel =
                          MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getMessageModel();
                  switch (messageModel) {
                      case BROADCASTING:
                          MQBroadcastFromLastConsumer.this.putTask(topic, mqAll);
                          break;
                      case CLUSTERING:
                          MQBroadcastFromLastConsumer.this.putTask(topic, mqDivided);
                          break;
                      default:
                          break;
                  }
              }
          }

          public void setNamesrvAddr(String namesrvAddr) {
              this.defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
          }

          public void setInstanceName(String instanceName) {
              this.defaultMQPullConsumer.setInstanceName(instanceName);
          }

          class PullTaskImpl implements Runnable {
              private final MessageQueue messageQueue;

              private volatile boolean cancelled = false;

              public PullTaskImpl(final MessageQueue messageQueue) {
                  this.messageQueue = messageQueue;
              }

              @Override
              public void run() {
                  //log.debug("run pull task, if cancel:{}", this.cancelled);
                  String topic = this.messageQueue.getTopic();
                  if (!this.isCancelled()) {
                      PullTaskCallback pullTaskCallback =
                              MQBroadcastFromLastConsumer.this.callbackTable.get(topic);
                      if (pullTaskCallback != null) {
                          final PullTaskContext context = new PullTaskContext();
                          context.setPullConsumer(MQBroadcastFromLastConsumer.this.defaultMQPullConsumer);
                          try {
                              pullTaskCallback.doPullTask(this.messageQueue, context);
                          } catch (Throwable e) {
                              context.setPullNextDelayTimeMillis(1000);
                              log.error("doPullTask Exception", e);
                          }

                          if (!this.isCancelled()) {
                              MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(this,
                                      context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
                          } else {
                              log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
                          }
                      } else {
                          log.warn("Pull Task Callback not exist , {}", topic);
                      }
                  } else {
                      log.warn("The Pull Task is cancelled, {}", messageQueue);
                  }
              }

              public boolean isCancelled() {
                  return cancelled;
              }

              public void setCancelled(boolean cancelled) {
                  this.cancelled = cancelled;
              }
          }
      }
      Show
      package org.apache.rocketmq.client.consumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.*; /**  * Schedule service for pull consumer,which use broadcast model and consume message from last offset  */ public class MQBroadcastFromLastConsumer {     private final Logger log = ClientLogger.getLog();     private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();     private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =             new ConcurrentHashMap<MessageQueue, PullTaskImpl>();     private DefaultMQPullConsumer defaultMQPullConsumer;     private int pullThreadNums;     private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =             new ConcurrentHashMap<String, PullTaskCallback>();     private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;     private Executor rebalanceExecutor = Executors.newSingleThreadExecutor();     public MQBroadcastFromLastConsumer(final String consumerGroup, int pullThreadNums) {         this.pullThreadNums = pullThreadNums;         this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);         this.defaultMQPullConsumer.setMessageModel(MessageModel.BROADCASTING);     }     public void putTask(final String topic, final Set<MessageQueue> mqNewSet) {         rebalanceExecutor.execute(new Runnable() {             @Override             public void run() {                 Iterator<Entry<MessageQueue, PullTaskImpl>> it = MQBroadcastFromLastConsumer.this.taskTable.entrySet().iterator();                 while (it.hasNext()) {                     Entry<MessageQueue, PullTaskImpl> next = it.next();                     if (next.getKey().getTopic().equals(topic)) {                         if (!mqNewSet.contains(next.getKey())) {                             next.getValue().setCancelled(true);                             it.remove();                         }                     }                 }                 for (MessageQueue mq : mqNewSet) {                     if (!taskTable.containsKey(mq)) {                         long offset = 0;                         try {                             offset = defaultMQPullConsumer.maxOffset(mq);                         } catch (MQClientException e) {                             log.error("get max offset error:{}", e.getMessage());                         }                         if (offset < 0) {                             offset = 0;                         }                         MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getOffsetStore().updateOffset(mq, offset, false);                         PullTaskImpl command = new PullTaskImpl(mq);                         MQBroadcastFromLastConsumer.this.taskTable.put(mq, command);                         MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS);                     }                 }             }         });     }     public void start() throws MQClientException {         final String group = this.defaultMQPullConsumer.getConsumerGroup();         this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(                 this.pullThreadNums,                 new ThreadFactoryImpl("PullMsgThread-" + group)         );         this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);         this.defaultMQPullConsumer.start();         log.info("MQBroadcastFromLastConsumer start OK, {} {}",                 this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);     }     public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {         this.callbackTable.put(topic, callback);         this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);     }     public void shutdown() {         if (this.scheduledThreadPoolExecutor != null) {             this.scheduledThreadPoolExecutor.shutdown();         }         if (this.defaultMQPullConsumer != null) {             this.defaultMQPullConsumer.shutdown();         }     }     class MessageQueueListenerImpl implements MessageQueueListener {         @Override         public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {             log.info("messageQueueChanged,topic:[{}]", topic);             MessageModel messageModel =                     MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getMessageModel();             switch (messageModel) {                 case BROADCASTING:                     MQBroadcastFromLastConsumer.this.putTask(topic, mqAll);                     break;                 case CLUSTERING:                     MQBroadcastFromLastConsumer.this.putTask(topic, mqDivided);                     break;                 default:                     break;             }         }     }     public void setNamesrvAddr(String namesrvAddr) {         this.defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);     }     public void setInstanceName(String instanceName) {         this.defaultMQPullConsumer.setInstanceName(instanceName);     }     class PullTaskImpl implements Runnable {         private final MessageQueue messageQueue;         private volatile boolean cancelled = false;         public PullTaskImpl(final MessageQueue messageQueue) {             this.messageQueue = messageQueue;         }         @Override         public void run() {             //log.debug("run pull task, if cancel:{}", this.cancelled);             String topic = this.messageQueue.getTopic();             if (!this.isCancelled()) {                 PullTaskCallback pullTaskCallback =                         MQBroadcastFromLastConsumer.this.callbackTable.get(topic);                 if (pullTaskCallback != null) {                     final PullTaskContext context = new PullTaskContext();                     context.setPullConsumer(MQBroadcastFromLastConsumer.this.defaultMQPullConsumer);                     try {                         pullTaskCallback.doPullTask(this.messageQueue, context);                     } catch (Throwable e) {                         context.setPullNextDelayTimeMillis(1000);                         log.error("doPullTask Exception", e);                     }                     if (!this.isCancelled()) {                         MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(this,                                 context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);                     } else {                         log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);                     }                 } else {                     log.warn("Pull Task Callback not exist , {}", topic);                 }             } else {                 log.warn("The Pull Task is cancelled, {}", messageQueue);             }         }         public boolean isCancelled() {             return cancelled;         }         public void setCancelled(boolean cancelled) {             this.cancelled = cancelled;         }     } }

    Description

      When application use consume client in Broadcast model which runs in the docker,we can pull message only from fist offset, because docker allocate the apps at random and cause the store file do not reuse. However, we only want to pull message from last offset sometimes even though the apps runs in the docker, and we can use MQBroadcastFromLastConsumer in this case.

      Attachments

        Issue Links

          Activity

            People

              yukon Xinyu Zhou
              l_yy laiyiyu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: