ActiveMQBytesMessage类型的消息在特殊情况下会丢失数据,就是在被拷贝前设置消息的某个属性。下面是测试代码:
producer代码
MessageProducer producer; //initialize Connection, Session, Producer ...... byte[] bs = "bytes message".getBytes(); BytesMessage message = session.createBytesMessage(); message.writeBytes(bs); for(int i=0; i< 10; i++){ message.setLongProperty("sendTime", System.currentTimeMillis()); try{ producer.send(message); }catch(Exception e){ e.printStackTrace(); } }
Consumer代码
MessageConsumer consumer //initailize Connection, Session, MessageConsumer for(int i=0; i<10; i++){ ActiveMQBytesMessage message = (ActiveMQBytesMessage)consumer.receive(60*1000); long sendTime = message.getLongProperty("sendTime"); System.out.println("sendtime:" + sendTime); ByteSequence bs = message.getMessage().getContent(); System.out.println("bytes data:" + new String(bs.getData())); }
期待的结果:
consumer在所有接收到的消息中得到bytes数据
实际结果:
只有第一条消息有bytes数据,其他消息都丢失了bytes数据,而long property的值都没有丢失。
分析:
ActiveMQ在发送消息的时候会拷贝消息,实际发送的是消息的拷贝。在拷贝之前会调用storeContent() ,ActiveMQBytesMessage中的属性DataOutputStream dataOut 会被关闭,dataOut 会被置为null,dataOut中的值会被set到content属性中。如果不设置消息的属性,这个逻辑是没有问题的。当设置消息属性时,setLongProperty 方法中会调用setObjectProperty() ,然后调用initializeWriting(),在initializeWriting()中DataOutputStream dataOut会再次被创建。这样,当消息第二次被拷贝的时候,DataOutputStream dataOut不是null,而是EMPTY。由于不是null,dataOut的值会被set到content,这样content的值就被清空了。
根据JMS规范的要求:
3.9 Access to Sent Messages After sending a message, a client may retain and modify it without affecting the message that has been sent. The same message object may be sent multiple times. During the execution of its sending method, the message must not be changed by the client. If it is modified, the result of the send is undefined.
消息发送后,原消息不应该受到影响。根据这个要求,目前的实现是个bug
建议:
在initializeWriting()中把content的数据设置到DataOutputStream dataOut中,这样就保持消息中bytes数据不变。
我的fix代码
ActiveMQBytesMessage : private void initializeWriting() throws JMSException { 669 The original code ...... 701 //fix code if(this.content !=null && this.content.length >0){ try { this.dataOut.write(this.content.getData()); } catch(IOException ioe) { throw JMSExceptionSupport.create(ioe); } } 702 }
这个bug 已经提交给APACHE,还没有被修复。
更新:该bug已经被修复,最新的Patch:
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java index 6de35aa..923e0e1 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java @@ -856,6 +856,42 @@ } this.dataOut = new DataOutputStream(os); } + + restoreOldContent(); + } + + private void restoreOldContent() throws JMSException { + // For a message that already had a body and was sent we need to restore the content + // if the message is used again without having its clearBody method called. + if (this.content != null && this.content.length > 0) { + try { + ByteSequence toRestore = this.content; + if (compressed) { + InputStream is = new ByteArrayInputStream(toRestore); + int length = 0; + try { + DataInputStream dis = new DataInputStream(is); + length = dis.readInt(); + dis.close(); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + is = new InflaterInputStream(is); + DataInputStream input = new DataInputStream(is); + + byte[] buffer = new byte[length]; + input.readFully(buffer); + toRestore = new ByteSequence(buffer); + } + + this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength()); + // Free up the buffer from the old content, will be re-written when + // the message is sent again and storeContent() is called. + this.content = null; + } catch (IOException ioe) { + throw JMSExceptionSupport.create(ioe); + } + } } protected void checkWriteOnlyBody() throws MessageNotReadableException { diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java index f9dda6c..e809d03 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java @@ -118,6 +118,7 @@ protected transient DataInputStream dataIn; protected transient int remainingBytes = -1; + @Override public Message copy() { ActiveMQStreamMessage copy = new ActiveMQStreamMessage(); copy(copy); @@ -132,6 +133,7 @@ copy.dataIn = null; } + @Override public void onSend() throws JMSException { super.onSend(); storeContent(); @@ -151,10 +153,12 @@ } } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public String getJMSXMimeType() { return "jms/stream-message"; } @@ -171,6 +175,7 @@ * due to some internal error. */ + @Override public void clearBody() throws JMSException { super.clearBody(); this.dataOut = null; @@ -191,6 +196,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public boolean readBoolean() throws JMSException { initializeReading(); try { @@ -233,6 +239,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public byte readByte() throws JMSException { initializeReading(); try { @@ -282,6 +289,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public short readShort() throws JMSException { initializeReading(); try { @@ -335,6 +343,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public char readChar() throws JMSException { initializeReading(); try { @@ -382,6 +391,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public int readInt() throws JMSException { initializeReading(); try { @@ -438,6 +448,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public long readLong() throws JMSException { initializeReading(); try { @@ -496,6 +507,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public float readFloat() throws JMSException { initializeReading(); try { @@ -544,6 +556,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public double readDouble() throws JMSException { initializeReading(); try { @@ -596,6 +609,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public String readString() throws JMSException { initializeReading(); try { @@ -696,6 +710,7 @@ * @see #readObject() */ + @Override public int readBytes(byte[] value) throws JMSException { initializeReading(); @@ -769,6 +784,7 @@ * @see #readBytes(byte[] value) */ + @Override public Object readObject() throws JMSException { initializeReading(); try { @@ -849,6 +865,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBoolean(boolean value) throws JMSException { initializeWriting(); try { @@ -867,6 +884,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeByte(byte value) throws JMSException { initializeWriting(); try { @@ -885,6 +903,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeShort(short value) throws JMSException { initializeWriting(); try { @@ -903,6 +922,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeChar(char value) throws JMSException { initializeWriting(); try { @@ -921,6 +941,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeInt(int value) throws JMSException { initializeWriting(); try { @@ -939,6 +960,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeLong(long value) throws JMSException { initializeWriting(); try { @@ -957,6 +979,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeFloat(float value) throws JMSException { initializeWriting(); try { @@ -975,6 +998,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeDouble(double value) throws JMSException { initializeWriting(); try { @@ -993,6 +1017,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeString(String value) throws JMSException { initializeWriting(); try { @@ -1019,6 +1044,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBytes(byte[] value) throws JMSException { writeBytes(value, 0, value.length); } @@ -1039,6 +1065,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBytes(byte[] value, int offset, int length) throws JMSException { initializeWriting(); try { @@ -1062,6 +1089,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeObject(Object value) throws JMSException { initializeWriting(); if (value == null) { @@ -1102,6 +1130,7 @@ * @throws JMSException if an internal error occurs */ + @Override public void reset() throws JMSException { storeContent(); this.bytesOut = null; @@ -1111,7 +1140,7 @@ setReadOnlyBody(true); } - private void initializeWriting() throws MessageNotWriteableException { + private void initializeWriting() throws JMSException { checkReadOnlyBody(); if (this.dataOut == null) { this.bytesOut = new ByteArrayOutputStream(); @@ -1122,6 +1151,19 @@ os = new DeflaterOutputStream(os); } this.dataOut = new DataOutputStream(os); + } + + // For a message that already had a body and was sent we need to restore the content + // if the message is used again without having its clearBody method called. + if (this.content != null && this.content.length > 0) { + try { + this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength()); + // Free up the buffer from the old content, will be re-written when + // tbe message is sent again and storeContent() is called. + this.content = null; + } catch (IOException ioe) { + throw JMSExceptionSupport.create(ioe); + } } } @@ -1153,6 +1195,7 @@ super.compress(); } + @Override public String toString() { return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; }
相关推荐
详细描述了ActiveMQ消息过期-时间设置和自动清除解决方案。
activemq书籍及工具 activemq书籍及工具 activemq书籍及工具 activemq书籍及工具 activemq书籍及工具
NULL 博文链接:https://7wolfs.iteye.com/blog/1246365
ActiveMQ队列消息过期时间设置和自动清除解决方案.docx
activemq 传送数据流发送文件,仅供参考
ActiveMQ开发规范及方案
本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可; 请将本maven项目引入你自己的maven项目中(在你自己的pom.xml文件中配置这个项目的gourp和id以及版本号)...
activemq activeMq笔记.docx
ActiveMQ(中文)参考手册 大名鼎鼎的 JMS 实现 Apache ActiveMQ 介绍文档
ActiveMQ_in_Action_中文ActiveMQ_in_Action_中文ActiveMQ_in_Action_中文ActiveMQ_in_Action_中文
支持持久化的采用Spring整合activeMQ与quartz的JMS数据同步实例,包含依赖的jar包
ActiveMQ中文手册,可以帮助你的开发。
apache-activemq Linux版本
项目nettythird为springboot整合activemq,项目nettyserver为开启一个netty服务,简单进行了超时问题的解决,以及如何向客户端定向发送数据问题,可以当作一个样例,编辑可直接运行,测试类在server项目下,需要一个...
使用C# 下activemq 接收数据流的例子
最新activemq-cpp开发手册!
包括1、ActiveMQ java实例 2、ActiveMQ Spring结合实例 3、代码亲测,无问题。 4、资源分5分绝对值 注意:请先安装ActiveMQ 服务。
赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...
activeMQ-API.rar