如果消息是持久化的,activemq收到消息后会存储在持久性cursor中。对于非持久化消息,会存储在File Cursor中。从名称上File Cursor是持久性cursor,实际上activemq把FilePendingMessageCursor作为非持久性cursor。File Cursor首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中,这样就不会因为有大量消息没有被消费而导致OOM。当消息consumer启动时,mq收到消息会激活broker上的DurableTopicSubscrition,该subscription会检查当前的消息cursor里是否有消息,如果有就会dispatch这些消息,这样consumer就能收到之前发送的消息了。
由于同一个topic会有多个消息cursor,所以判断cursor是否有消息时会轮询cursor队列,具体代码:
StoreDurableSubscriberCursor
public synchronized boolean hasNext() { boolean result = true; if (result) { try { currentCursor = getNextCursor(); //A } catch (Exception e) { LOG.error("Failed to get current cursor ", e); throw new RuntimeException(e); } result = currentCursor != null ? currentCursor.hasNext() : false; } return result; }
代码逻辑分析:得到cursor队列中下一个cursor,然后判断该cursor是否有消息。下面看getNextCursor的代码:
protected synchronized PendingMessageCursor getNextCursor() throws Exception { if (currentCursor == null || currentCursor.isEmpty()) { // E currentCursor = null; for (PendingMessageCursor tsp : storePrefetches) { if (tsp.hasNext()) { currentCursor = tsp; break; } } // round-robin if (storePrefetches.size()>1) { PendingMessageCursor first = storePrefetches.remove(0); storePrefetches.add(first); } } return currentCursor; }
代码逻辑:判断cursor是否为空,如果为null或者empty,如果是,则从cursor队列中获得下一个PendingMessageCursor;如果该cursor的hasNext()为true,则跳出循环。下面会对cursor队列做Load balance处理,将第一个cursor放到队列尾部。这样设计的目的是保证所有的cursor都会被访问到。
下面看isEmpty()的实现:
AbstractStoreCursor
public final synchronized boolean isEmpty() { return size == 0; }
下面看hasNext()的实现:
AbstractStoreCursor
public final synchronized boolean hasNext() { if (batchList.isEmpty()) { try { fillBatch(); } catch (Exception e) { LOG.error(this + " - Failed to fill batch", e); throw new RuntimeException(e); } } ensureIterator(); return this.iterator.hasNext(); }
从实现来看,isEmpty和hasNext的判断方式不同,在极端情况下两者的返回值可能会不同,可能isEmpty为true而hasNext为false。
在激活DurableTopicSubscrition的时候,执行顺序是先检查cursor中消息数量,然后dispatch消息。由于在代码E处可能发现当前cursor不是empty,于是将该cursor返回,这样在代码A处会获得持久cursor,该cursor中可能会没有消息,不会继续dispatch消息。这样就导致在cursor队列中的非持久cursor始终无法获得执行的机会,导致非持久消息不会被dispatch。
解决方案,在getNextCursor()中增加hasNext()的判断,保证cursor在null或者empty或者没有元素的情况下会轮转到下一cursor:
protected synchronized PendingMessageCursor getNextCursor() throws Exception { if (currentCursor == null || currentCursor.isEmpty() || !currentCursor.hasNext()) { …… } return currentCursor; }
相关推荐
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
本人在学习activemq,然后 测试完成的demo, 包含了queue,topic,持久化到mysql,订阅模式,包好用
在网上找了很多的topic持久化的Demo做了很多的测试,现把熟肉呈上。
详细描述了ActiveMQ消息过期-时间设置和自动清除解决方案。
该文档详细描述了linux环境下的 Activemq 持久化、集群环境的搭建步骤,以及测试步骤
NULL 博文链接:https://showlike.iteye.com/blog/2000117
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
ActiveMQ配置Mysql8为持久化方式所需Jar包
activemq消息持久化所需Jar包,详情请参见博文:http://blog.csdn.net/l1028386804/article/details/68997105
activemq消息持久化所需Jar包,亲测可用https://blog.csdn.net/weixin_42109071/article/details/91349406
activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml
ActiveMQ队列消息过期时间设置和自动清除解决方案.docx
ActiveMQ高并发处理方案ActiveMQ高并发处理方案 超级字数补丁超级字数补丁
主要介绍了ActiveMQ持久化机制代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
一个订阅通道,支持多个客户端监听,当某个客户端掉线后,再上线的时候可以收到它没有接收到的消息。
Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明
——学习参考资料:仅用于个人学习使用! 本代码仅作学习交流,切勿用于商业用途,否则后果自负。若涉及侵权,请联系,会尽快处理! 未进行详尽测试,请自行调试!
NULL 博文链接:https://768992698.iteye.com/blog/2343328
activemq 虚拟topic配置,可以将一个 topic转发为多个队列和多个topic或者将一个队列转发为多个topic和多个队列