`

activemq 持久化topic处理过程及其消息游标轮转问题的解决方案

阅读更多

    如果消息是持久化的,activemq收到消息后会存储在持久性cursor中。对于非持久化消息,会存储在File Cursor中。从名称上File Cursor是持久性cursor,实际上activemq把FilePendingMessageCursor作为非持久性cursor。File Cursor首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中,这样就不会因为有大量消息没有被消费而导致OOM。当消息consumer启动时,mq收到消息会激活broker上的DurableTopicSubscrition,该subscription会检查当前的消息cursor里是否有消息,如果有就会dispatch这些消息,这样consumer就能收到之前发送的消息了。

 

    由于同一个topic会有多个消息cursor,所以判断cursor是否有消息时会轮询cursor队列,具体代码:

StoreDurableSubscriberCursor 

 

public synchronized boolean hasNext() {
        boolean result = true;
        if (result) {
            try {
                currentCursor = getNextCursor();    //A
            } catch (Exception e) {
                LOG.error("Failed to get current cursor ", e);
                throw new RuntimeException(e);
            }
            result = currentCursor != null ? currentCursor.hasNext() : false;
        }
        return result;
    }
 

 

代码逻辑分析:得到cursor队列中下一个cursor,然后判断该cursor是否有消息。下面看getNextCursor的代码:

 

 

protected synchronized PendingMessageCursor getNextCursor() throws Exception {
        if (currentCursor == null || currentCursor.isEmpty()) {  // E
            currentCursor = null;
            for (PendingMessageCursor tsp : storePrefetches) {
                if (tsp.hasNext()) {                              
                    currentCursor = tsp;
                    break;
                }
            }
            // round-robin
            if (storePrefetches.size()>1) {
                PendingMessageCursor first = storePrefetches.remove(0);
                storePrefetches.add(first);
            }
        }
        return currentCursor;
    }
 

 

代码逻辑:判断cursor是否为空,如果为null或者empty,如果是,则从cursor队列中获得下一个PendingMessageCursor;如果该cursor的hasNext()为true,则跳出循环。下面会对cursor队列做Load balance处理,将第一个cursor放到队列尾部。这样设计的目的是保证所有的cursor都会被访问到。

 

下面看isEmpty()的实现:

 

AbstractStoreCursor

 public final synchronized boolean isEmpty() {

        return size == 0;
    }

 

下面看hasNext()的实现: 

AbstractStoreCursor

public final synchronized boolean hasNext() {
        if (batchList.isEmpty()) {
            try {
                fillBatch();
            } catch (Exception e) {
                LOG.error(this + " - Failed to fill batch", e);
                throw new RuntimeException(e);
            }
        }
        ensureIterator();
        return this.iterator.hasNext();
    }

 

 从实现来看,isEmpty和hasNext的判断方式不同,在极端情况下两者的返回值可能会不同,可能isEmpty为true而hasNext为false。

 

在激活DurableTopicSubscrition的时候,执行顺序是先检查cursor中消息数量,然后dispatch消息。由于在代码E处可能发现当前cursor不是empty,于是将该cursor返回,这样在代码A处会获得持久cursor,该cursor中可能会没有消息,不会继续dispatch消息。这样就导致在cursor队列中的非持久cursor始终无法获得执行的机会,导致非持久消息不会被dispatch。

 

解决方案,在getNextCursor()中增加hasNext()的判断,保证cursor在null或者empty或者没有元素的情况下会轮转到下一cursor:

 

protected synchronized PendingMessageCursor getNextCursor() throws Exception {
        if (currentCursor == null || currentCursor.isEmpty() || !currentCursor.hasNext()) {
            ……
        }
        return currentCursor;
    }

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics