我的需求是这样的:
需要用ActiveMQ 来在客户端与客户端之间来传递大文件,我写了两个客户端,一个客户端用JMS创建了一个BlobMessage来发送文件,另一个客户端用ActiveMQ提供的C++接口---CMS,来接收文件数据,发送端(producer)已经显示成功发送,在ActiveMQ的broker上能看到相应的Queue名。我用接收端(consumer)也能接收到发送过来的BlobMessage的properties。但是却无法得到文件内容。getContent函数返回为空,不知道是什么原因。但是getMarshalledProperties()函数都能得到发送过来的properties。
JMS代码:
Sender sf = new Sender ("10.1.72.35", 41112);
String str = sf.connect();
System.out.print(str);
MessageHeader header = new MessageHeader("BABJ", 3600, "QueneName", 200, "1030", 300, "dataID", "20150825", "SendText.txt", "D:/data");
File f1 =new File("D:\\data\\SendText.txt");
str = sf.sendMessage(header, "FILE", f1);
System.out.print(str);
str = sf.disconnect();
System.out.print(str);
CMS端onMessage函数的代码:
void CMQReceiver::RecvMessage()
{
MessageHeader header;
long nFileSize = 0;
// 消费者没有创建成功
if (consumer == NULL)
{
return;
}
try
{
Message *message = consumer->receiveNoWait();//consumer->receive();
if (message == NULL)
{
return;
}
//commands::ActiveMQBlobMessage *testMsg = new commands::ActiveMQBlobMessage();
//bool bFlg = util::ActiveMQMessageTransformation::transformMessage(message, amqConnection, (commands::Message **)&testMsg);
//if (bFlg)
//{
// string strName = testMsg->getName();
//}
//ActiveMQConsumer *mqConsumer = dynamic_cast<ActiveMQConsumer*>(consumer);
//Message *message = mqConsumer->receiveNoWait();
/*************************************** BLOB MESSAGE **************************************************/
commands::ActiveMQBlobMessage* blobMessage = dynamic_cast<commands::ActiveMQBlobMessage*>(message);
if (blobMessage)
{
commands::Message* cmdMsg = (commands::Message*)blobMessage;
if (cmdMsg)
{
std::vector<unsigned char> properties = cmdMsg->getMarshalledProperties();
LIST_PROPERTIES lstProperties;
int nErr = ParseProperties(properties, lstProperties);
if (nErr == 0)
{
// 解析属性消息
getMessageHeader(lstProperties, header, nFileSize);
// 获取数据
if (nFileSize > 0)
{
// 获取数据
//cms::Message* cmsMsg = (cms::Message*)blobMessage->clone();
//BytesMessage *byteMsg = (BytesMessage*)cmsMsg;
//unsigned char * body = byteMsg->getBodyBytes();
//if (body)
//{
// int a = 0;
//}
std::vector<unsigned char> fileContent;
fileContent = blobMessage->getContent();
}
}
}
}
/****************************************OBJCET MESSAGE ******************************************************/
commands::ActiveMQObjectMessage* objectMessage = dynamic_cast<commands::ActiveMQObjectMessage*>(message);
if (objectMessage)
{
commands::Message* cmdMsg = (commands::Message*)objectMessage;
if (cmdMsg)
{
// 属性
std::vector<unsigned char> properties = cmdMsg->getMarshalledProperties();
LIST_PROPERTIES lstProperties;
int nErr = ParseProperties(properties, lstProperties);
if (nErr == 0)
{
// 解析属性消息
getMessageHeader(lstProperties, header, nFileSize);
// 获取Bean数据
}
// Bean内容(得到的内容是序列化的数据)
std::vector<unsigned char> content = cmdMsg->getContent();
// 执行反序列化解析
}
}
}
catch (CMSException& e)
{
e.printStackTrace();
string msg = e.getMessage();
}
}
references:
http://activemq.apache.org/can-i-send-really-large-files-over-activemq.html
http://activemq.apache.org/blob-messages.html
http://markmail.org/message/7cwjpekvbgdoigdg
http://activemq.apache.org/cms/example.html
http://activemq.apache.org/cms/cms-api-overview.html