WebSphere MQ 接收發(fā)送
我們提供的服務有:成都做網(wǎng)站、成都網(wǎng)站制作、成都外貿(mào)網(wǎng)站建設、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認證、成安ssl等。為千余家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務,是有科學管理、有技術的成安網(wǎng)站制作公司
添加mq jar
類介紹:
SendMSG:消息發(fā)送類。
Main():主方法。
SendMSG():消息發(fā)送方法。
方法描述:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package test;
public class SendMSG{
MQEnvironment.hostname = "192.168.10.201";
//通道類型為服務器連接通道
MQEnvironment.channel = "tongdao";
MQEnvironment.CCSID = 1381;
//消息隊列端口號
MQEnvironment.port = 10618;
try{
//建立隊列管理器QM_SERVER為隊列管理器名稱
MQQueueManager qMgr = new MQQueueManager("test");
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUTMQC.MQOO_INQUIRE;//建立隊列INITQ隊列名稱INITQ為本地隊列
MQQueue queue = qMgr.accessQueue("wanghui",openOptions,null,null,null);
System.out.println("成功建立通道");
MQMessage message = new MQMessage();
message.format = MQC.MQFMT_STRING;
message.characterSet = 1381;
message.writeString("王輝");
message.expiry = -1;//設置消息用不過期
queue.put(message);//將消息放入隊列
queue.close();//關閉隊列
qMgr.disconnect();//斷開連接
}catch(EOFExceptione){
e.printStackTrace();
}catch(MQExceptione){
e.printStackTrace();
}catch(Exceptione){
e.printStackTrace();
}
}
ReceiveMSG:消息接收類。
Main():主方法。
ReceiveMSG():消息接收方法。
public class ReceiveMSG {
MQEnvironment.hostname="192.168.10.201";//通道類型為服務器連接通道
MQEnvironment.channel="tongdao";
MQEnvironment.CCSID=1381;
MQEnvironment.port=10618;
try{
//建立隊列管理器QM_SERVER為隊列管理器名稱
MQQueueManager qMgr = new MQQueueManager("test");
int openOptions=MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUT|MQC.MQOO_INQUIRE;//建立隊列INITQ隊列名稱INITQ為本地隊列
MQQueue queue=qMgr.accessQueue("wanghui",openOptions,null,null,null);
System.out.println("成功建立通道");
MQMessage message= new MQMessage();
message.format=MQC.MQFMT_STRING;
message.characterSet=1381;
//從隊列中獲取消息
MQGetMessage Optionspmo=new MQGetMessageOptions();
queue.get(message,pmo);
Stringchars=message.readLine();
System.out.println(chars);
queue.close();//關閉隊列
qMgr.disconnect();//斷開連接
}catch(EOFExceptione){
e.printStackTrace();
}catch(MQExceptione){
e.printStackTrace();
}catch(Exceptione){
e.printStackTrace();
}
}
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.fxun.plant.vo.CommandVO;
public class ProducerTool extends Thread {
private Destination destination;
// private int messageCount = 500;
long sleepTime = 0;
// private boolean verbose = true;
// private int messageSize = 255;
private long timeToLive = 0; // 消息存活時間
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject; // subject的名字,默認是TOOL.DEFAULT
// private boolean topic;
private boolean transacted = false; // 是否采用事務
// private boolean persistent = false;
private P2PQueue p2pQueue;
public ProducerTool(String user, String password, String url, String subject) {
this.user = user;
this.password = password;
this.url = url;
this.subject = subject;
}
public void run() {
Connection connection = null;
try {
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
// Create the session
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
// Create the producer.
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(timeToLive);
CommandVO commandVO = null;
int size = 0;
while (true) {
size = p2pQueue.getSize();
if (size 0) {
BytesMessage message = session.createBytesMessage();
message.writeInt(size);
for (int i = 0; i size; i++) {
commandVO = p2pQueue.pool();
if(commandVO == null) {
message.writeInt(0);
} else {
message.writeInt(commandVO.getCountSize());
message.writeInt(commandVO.getCommand()); // 指令
message.writeBytes(commandVO.getContent()); // 內(nèi)容
}
}
producer.send(message);
}
Thread.sleep(300);
}
// Use the ActiveMQConnection interface to dump the connection
// stats.
// ActiveMQConnection c = (ActiveMQConnection) connection;
// c.getConnectionStats().dump(new IndentPrinter());
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
public P2PQueue getP2pQueue() {
return p2pQueue;
}
public void setP2pQueue(P2PQueue p2pQueue) {
this.p2pQueue = p2pQueue;
}
}
原代碼都發(fā)給你
下面是RabbitMQ的消息確認機制:“為了確保消息不會丟失,RabbitMQ支持消息確認機制??蛻舳嗽诮邮艿较⒉⑻幚硗旰?,可以發(fā)送一個ack消息給RabbitMQ,告訴它該消息可以安全的刪除了。假如客戶端在發(fā)送ack之前意外死掉了,那么RabbitMQ會將消息投遞到下一個consumer客戶端。如果有多個consumer客戶端,RabbitMQ在投遞消息時是輪詢的。RabbitMQ如何判斷客戶端死掉了?唯一根據(jù)是客戶端連接是否斷開。這里沒有超時機制,也就是說客戶端可以處理一個消息很長時間,只要沒斷開連接,RabbitMQ就一直等待ack消息?!蔽椰F(xiàn)在遇到的問題是這樣的:我這邊有幾條線程去消息隊列里取數(shù)據(jù),但是會有異常數(shù)據(jù)導致線程掛掉,就是上邊的“客戶端在發(fā)送ack之前意外死掉了”,RabbitMQ會將消息投遞到下一個consumer客戶端,這樣一條異常數(shù)據(jù)會把我的所有線程掛掉,我現(xiàn)在想實現(xiàn)這樣的功能:如果有異常數(shù)據(jù)導致進程掛掉,那么我不讓RabbitMQ將這條消息投遞到下一個consumer客戶端,而是放到另一個地方或者另外處理,請問該如何實現(xiàn)呢?
{
//前面是準備管理器和隊列
MQQueueManager qMgr = new MQQueueManager(qManager);
int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;
MQQueue queue = qMgr.accessQueue(qName, openOptions);
MQMessage rcvMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;
//讀取五秒超時,這里目的是要有個讀取阻塞,和Socket編程類似。
gmo.waitInterval = 5000;
queue.get(rcvMessage, gmo);
//后面就是操作消息的部分【略】
}catch(Exception e){{
//前面是準備管理器和隊列
MQQueueManager qMgr = new MQQueueManager(qManager);
int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;
MQQueue queue = qMgr.accessQueue(qName, openOptions);
MQMessage rcvMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;
//讀取五秒超時,這里目的是要有個讀取阻塞,和Socket編程類似。
gmo.waitInterval = 5000;
queue.get(rcvMessage, gmo);
//后面就是操作消息的部分【略】
}catch(Exception e){