Uploaded image for project: 'Apache RocketMQ'
  1. Apache RocketMQ
  2. ROCKETMQ-309

add new consumer implement

    XMLWordPrintableJSON

    Details

    • Type: New Feature
    • Status: In Progress
    • Priority: Trivial
    • Resolution: Unresolved
    • Affects Version/s: 4.1.0-incubating
    • Fix Version/s: None
    • Component/s: rocketmq-client
    • Docs Text:
      Hide
      package org.apache.rocketmq.client.consumer;

      import org.apache.rocketmq.client.exception.MQClientException;
      import org.apache.rocketmq.client.log.ClientLogger;
      import org.apache.rocketmq.common.ThreadFactoryImpl;
      import org.apache.rocketmq.common.message.MessageQueue;
      import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
      import org.slf4j.Logger;

      import java.util.Iterator;
      import java.util.Map.Entry;
      import java.util.Set;
      import java.util.concurrent.*;

      /**
       * Schedule service for pull consumer,which use broadcast model and consume message from last offset
       */
      public class MQBroadcastFromLastConsumer {
          private final Logger log = ClientLogger.getLog();

          private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
          private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
                  new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
          private DefaultMQPullConsumer defaultMQPullConsumer;
          private int pullThreadNums;
          private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =
                  new ConcurrentHashMap<String, PullTaskCallback>();
          private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

          private Executor rebalanceExecutor = Executors.newSingleThreadExecutor();

          public MQBroadcastFromLastConsumer(final String consumerGroup, int pullThreadNums) {
              this.pullThreadNums = pullThreadNums;
              this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
              this.defaultMQPullConsumer.setMessageModel(MessageModel.BROADCASTING);
          }

          public void putTask(final String topic, final Set<MessageQueue> mqNewSet) {
              rebalanceExecutor.execute(new Runnable() {
                  @Override
                  public void run() {
                      Iterator<Entry<MessageQueue, PullTaskImpl>> it = MQBroadcastFromLastConsumer.this.taskTable.entrySet().iterator();
                      while (it.hasNext()) {
                          Entry<MessageQueue, PullTaskImpl> next = it.next();
                          if (next.getKey().getTopic().equals(topic)) {
                              if (!mqNewSet.contains(next.getKey())) {
                                  next.getValue().setCancelled(true);
                                  it.remove();
                              }
                          }
                      }

                      for (MessageQueue mq : mqNewSet) {
                          if (!taskTable.containsKey(mq)) {
                              long offset = 0;
                              try {
                                  offset = defaultMQPullConsumer.maxOffset(mq);
                              } catch (MQClientException e) {
                                  log.error("get max offset error:{}", e.getMessage());
                              }
                              if (offset < 0) {
                                  offset = 0;
                              }

                              MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getOffsetStore().updateOffset(mq, offset, false);
                              PullTaskImpl command = new PullTaskImpl(mq);
                              MQBroadcastFromLastConsumer.this.taskTable.put(mq, command);
                              MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS);
                          }
                      }
                  }
              });
          }

          public void start() throws MQClientException {
              final String group = this.defaultMQPullConsumer.getConsumerGroup();
              this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
                      this.pullThreadNums,
                      new ThreadFactoryImpl("PullMsgThread-" + group)
              );

              this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);

              this.defaultMQPullConsumer.start();

              log.info("MQBroadcastFromLastConsumer start OK, {} {}",
                      this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);
          }

          public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
              this.callbackTable.put(topic, callback);
              this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
          }

          public void shutdown() {
              if (this.scheduledThreadPoolExecutor != null) {
                  this.scheduledThreadPoolExecutor.shutdown();
              }

              if (this.defaultMQPullConsumer != null) {
                  this.defaultMQPullConsumer.shutdown();
              }
          }

          class MessageQueueListenerImpl implements MessageQueueListener {
              @Override
              public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
                  log.info("messageQueueChanged,topic:[{}]", topic);
                  MessageModel messageModel =
                          MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getMessageModel();
                  switch (messageModel) {
                      case BROADCASTING:
                          MQBroadcastFromLastConsumer.this.putTask(topic, mqAll);
                          break;
                      case CLUSTERING:
                          MQBroadcastFromLastConsumer.this.putTask(topic, mqDivided);
                          break;
                      default:
                          break;
                  }
              }
          }

          public void setNamesrvAddr(String namesrvAddr) {
              this.defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
          }

          public void setInstanceName(String instanceName) {
              this.defaultMQPullConsumer.setInstanceName(instanceName);
          }

          class PullTaskImpl implements Runnable {
              private final MessageQueue messageQueue;

              private volatile boolean cancelled = false;

              public PullTaskImpl(final MessageQueue messageQueue) {
                  this.messageQueue = messageQueue;
              }

              @Override
              public void run() {
                  //log.debug("run pull task, if cancel:{}", this.cancelled);
                  String topic = this.messageQueue.getTopic();
                  if (!this.isCancelled()) {
                      PullTaskCallback pullTaskCallback =
                              MQBroadcastFromLastConsumer.this.callbackTable.get(topic);
                      if (pullTaskCallback != null) {
                          final PullTaskContext context = new PullTaskContext();
                          context.setPullConsumer(MQBroadcastFromLastConsumer.this.defaultMQPullConsumer);
                          try {
                              pullTaskCallback.doPullTask(this.messageQueue, context);
                          } catch (Throwable e) {
                              context.setPullNextDelayTimeMillis(1000);
                              log.error("doPullTask Exception", e);
                          }

                          if (!this.isCancelled()) {
                              MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(this,
                                      context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
                          } else {
                              log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
                          }
                      } else {
                          log.warn("Pull Task Callback not exist , {}", topic);
                      }
                  } else {
                      log.warn("The Pull Task is cancelled, {}", messageQueue);
                  }
              }

              public boolean isCancelled() {
                  return cancelled;
              }

              public void setCancelled(boolean cancelled) {
                  this.cancelled = cancelled;
              }
          }
      }
      Show
      package org.apache.rocketmq.client.consumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.*; /**  * Schedule service for pull consumer,which use broadcast model and consume message from last offset  */ public class MQBroadcastFromLastConsumer {     private final Logger log = ClientLogger.getLog();     private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();     private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =             new ConcurrentHashMap<MessageQueue, PullTaskImpl>();     private DefaultMQPullConsumer defaultMQPullConsumer;     private int pullThreadNums;     private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =             new ConcurrentHashMap<String, PullTaskCallback>();     private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;     private Executor rebalanceExecutor = Executors.newSingleThreadExecutor();     public MQBroadcastFromLastConsumer(final String consumerGroup, int pullThreadNums) {         this.pullThreadNums = pullThreadNums;         this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);         this.defaultMQPullConsumer.setMessageModel(MessageModel.BROADCASTING);     }     public void putTask(final String topic, final Set<MessageQueue> mqNewSet) {         rebalanceExecutor.execute(new Runnable() {             @Override             public void run() {                 Iterator<Entry<MessageQueue, PullTaskImpl>> it = MQBroadcastFromLastConsumer.this.taskTable.entrySet().iterator();                 while (it.hasNext()) {                     Entry<MessageQueue, PullTaskImpl> next = it.next();                     if (next.getKey().getTopic().equals(topic)) {                         if (!mqNewSet.contains(next.getKey())) {                             next.getValue().setCancelled(true);                             it.remove();                         }                     }                 }                 for (MessageQueue mq : mqNewSet) {                     if (!taskTable.containsKey(mq)) {                         long offset = 0;                         try {                             offset = defaultMQPullConsumer.maxOffset(mq);                         } catch (MQClientException e) {                             log.error("get max offset error:{}", e.getMessage());                         }                         if (offset < 0) {                             offset = 0;                         }                         MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getOffsetStore().updateOffset(mq, offset, false);                         PullTaskImpl command = new PullTaskImpl(mq);                         MQBroadcastFromLastConsumer.this.taskTable.put(mq, command);                         MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS);                     }                 }             }         });     }     public void start() throws MQClientException {         final String group = this.defaultMQPullConsumer.getConsumerGroup();         this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(                 this.pullThreadNums,                 new ThreadFactoryImpl("PullMsgThread-" + group)         );         this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);         this.defaultMQPullConsumer.start();         log.info("MQBroadcastFromLastConsumer start OK, {} {}",                 this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);     }     public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {         this.callbackTable.put(topic, callback);         this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);     }     public void shutdown() {         if (this.scheduledThreadPoolExecutor != null) {             this.scheduledThreadPoolExecutor.shutdown();         }         if (this.defaultMQPullConsumer != null) {             this.defaultMQPullConsumer.shutdown();         }     }     class MessageQueueListenerImpl implements MessageQueueListener {         @Override         public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {             log.info("messageQueueChanged,topic:[{}]", topic);             MessageModel messageModel =                     MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getMessageModel();             switch (messageModel) {                 case BROADCASTING:                     MQBroadcastFromLastConsumer.this.putTask(topic, mqAll);                     break;                 case CLUSTERING:                     MQBroadcastFromLastConsumer.this.putTask(topic, mqDivided);                     break;                 default:                     break;             }         }     }     public void setNamesrvAddr(String namesrvAddr) {         this.defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);     }     public void setInstanceName(String instanceName) {         this.defaultMQPullConsumer.setInstanceName(instanceName);     }     class PullTaskImpl implements Runnable {         private final MessageQueue messageQueue;         private volatile boolean cancelled = false;         public PullTaskImpl(final MessageQueue messageQueue) {             this.messageQueue = messageQueue;         }         @Override         public void run() {             //log.debug("run pull task, if cancel:{}", this.cancelled);             String topic = this.messageQueue.getTopic();             if (!this.isCancelled()) {                 PullTaskCallback pullTaskCallback =                         MQBroadcastFromLastConsumer.this.callbackTable.get(topic);                 if (pullTaskCallback != null) {                     final PullTaskContext context = new PullTaskContext();                     context.setPullConsumer(MQBroadcastFromLastConsumer.this.defaultMQPullConsumer);                     try {                         pullTaskCallback.doPullTask(this.messageQueue, context);                     } catch (Throwable e) {                         context.setPullNextDelayTimeMillis(1000);                         log.error("doPullTask Exception", e);                     }                     if (!this.isCancelled()) {                         MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(this,                                 context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);                     } else {                         log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);                     }                 } else {                     log.warn("Pull Task Callback not exist , {}", topic);                 }             } else {                 log.warn("The Pull Task is cancelled, {}", messageQueue);             }         }         public boolean isCancelled() {             return cancelled;         }         public void setCancelled(boolean cancelled) {             this.cancelled = cancelled;         }     } }

      Description

      When application use consume client in Broadcast model which runs in the docker,we can pull message only from fist offset, because docker allocate the apps at random and cause the store file do not reuse. However, we only want to pull message from last offset sometimes even though the apps runs in the docker, and we can use MQBroadcastFromLastConsumer in this case.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                yukon Xinyu Zhou
                Reporter:
                l_yy laiyiyu
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated: