activimq怎样开启rabbitmq 优先级级

Posts - 52,
Articles - 0,
Comments - 1
08:24 by 安松, ... 阅读,
1. ActiviMQ是实现JMS接口和规范的消息中间件(Provider),
2. JMS,Java Message Service, java消息服务,是JavaEE中的一个技术
3. JMS规范:JMS定义了Java中访问消息中间件的接口,并没有给予实现,实现JMS接口的消息中间件成为JMS Provider,例如ActiveMQ
4. JMS message:JMS的消息,JMS消息由一下三部分组成:
①消息头:每个消息头字段都有相应的getter和sertter方法
& 消息头包含消息的识别信息和路由信息,消息头包含的一些标准的属性如下
& &(1)JMSDestination:由send方法设置,消息的目的地,主要指示Queue和Topic,自动分配
& &(2)JMSDeliveryMode:由send方法设置,传送模式。有两种:持久化模式和非持久化模式。一条持久性的消息应该被传送“一次仅仅一次”,这意味着如果JMS提供者出现故障,该消息并不会丢失,他会在服务器恢复之后再次传递。一个非持久化的消息最多会传送一次,这意味着这服务器出现故障,该消息将永远丢失。自动分配
& &(3)JMSExpiration:由send方法设置,消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值,如果timeToLive值等于零JMSExpiration被设置为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除,自动分配
& &(4)JMSPriority:由send方法设置:消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息。JMS不要求JMS Provider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。默认是4级。自动分配
& &(5)JMSMessageID:由send方法设置,唯一识别每个消息的表示,有JMSProvider产生,自动分配
& &(6)JMSTimestamp:由客户端设置,一个JMsProvider在调用send方法时自动设置的,它的消息被发送和消费者实际接收的时间差,自动分配
& &(7)JMSCorrelationID:由客户端设置,用来连接到另外一个消息,典型的应用是在回复消息中链接到院消息,在大多数情况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标示的上一条消息的应答,不过JMSCorrelationID可以是任何值,不仅仅是JMSMessageID。由开发者设置
& &(8)JMSReplyTo:由客户端设置,提供本消息回复消息的目的地址,由开发者设置
& &(9)JMSType:由客户端设置,消息类型的识别符,有开发者设置
& &(10)JMSRedelivered:由JMS Provider设置,如果一个客户端收到一个设置了JMSRedelivered属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。如果该消息被重新传送,JMSRedelivered=true反之,JMSRedelivered=false。自动设置
②消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性
包含三中类型
(1)应用程序设置和添加的属性,比如:
Message.setStringProperty("username",username);
&(2)Jms定义属性
使用"JMSX"作为属性名的前缀,connection.getMetaData().getJMSXPropertyNames(),方法返回所有连接支持的JMS属性的名字。
& &1. JMSXUserID:发送消息的用户标识,发送时提供商设置
& &2. JMSAppID:发送消息的应用标识,发送时提供商设置
& &3. JMSXDeliveryCount:转发消息重试次数,第一次是1,第二次是2,...,发送时提供商设置
& &4. JMSXGroupID:消息所在消息组的标识,由客户端设置等等 (72-33m)&
(3)JMS供应商特定的属性
③消息体:封装具体的消息数据
JMS api定义了5中消息体格式,也叫消息类型,可以使用不同形式发送,接受数据,并可以兼容现有的消息格式。包括TextMessage、MapMessage、bytesMessage、StreamMessage、ObjectMessage。
5. JMS producer:消息生产者,创建和发送JMS消息的客户端应用
6. JMS consumer:消息消费者,接收和处理JMS消息的客户端应用
& 消息的消费可以采用以下两种方法之一:
&①:同步消费:通过调用消费者的receive方法从目的地中显示提取消息,receive方法可以一直阻塞到消息到达
&②:异步消费:客户可以为消费者注册一个消息监听,以定义在消息到达时所采取的动作
7. & &A-----m----&AMQ------m----&B &&
& &A:消息生产者
& &B:消息消费者
& &AMQ:消息中间件(ActiveMQ)
8. JMS domains:消息传递域,JMS规范中定义了两种消息传递域:点对点(point-to-point:简写成PTP)消息传递域和发布/订阅消息传递域(publish/subscribe,简写成pub/sub)
(1)点对点消息传递域的特点如下:
①每个消息只能有一个消费者
②消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候处于运行状态,他都是可以提取消息
9. 发布/订阅消息传递域的特点如下:
① 每个消息可以有多个消费者
②生产者和消费者之间有时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后的发布的消息,JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息
10. 在点对点消息传递域中,目的地被称为queue;在发布/订阅消息传递域中,目的地被称为主题(topic)。
11.ConnectionFactory:连接工厂,用来创建连接对象,以连接到JMS的provider
12.JMS Connection:封装客户和JMS提供者之间的一个虚拟的连接
13.JMS Session:是生产和消费消息的一个单线程上下文
& 会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
14. Destination:消息发送到的目的地(queue/topic)
15.Acknowledge: 签收
16.Transacted:事务
17.JMS client:用来收发消息的java应用
18.Non-JMS client:使用JMS provider本地API写的应用,用来替换JMS API实现收发消息的功能 ,通常会提供其它的一些特性,比如:CORBA、RMI等。
19.Administred objects:预定义的JMS对象,通常在provider规范中有定义,提供给JMS客户端来访问,比如:ConnectionFactory和DestinationApache ActiveMQ ™ -- How can I support priority queues
How can I support priority queues?
Use Message Priority
A common requirement is to support
so high priority messages are consumed before low priority.
In version 5.4 priority queues are supported. Both the message cursors and the message stores (KahaDB and JDBC) support message priority. The support is disabled by default so it needs to be be enabled using
through xml configuration, in the example below, 'prioritizedMessages' is enabled for all queues.
&destinationPolicy&
&policyMap&
&policyEntries&
&policyEntry queue="&" prioritizedMessages="true"/&
The full range of priority values (0-9) are supported by the
message store. For
three priority categories are supported, Low (& 4), Default (= 4) and High (& 4).
Since the message cursors (and client side) implement strict ordering of priorities, it's possible to observe strict priority ordering if message dispatching can happen from the cache and not have to hit the disk (i.e., your consumers are fast enough to keep up with producers), or if you're using non-persistent messages that never have to flush to disk (using the FilePendingMessageCursor). However, once you hit a situation where consumers are slow, or producers are just significantly faster, you'll observe that the cache will fill up (possibly with lower priority messages) while higher priority messages get stuck on disk and not available until they're paged in. In this case, you can make a decision to tradeoff optimized message dispatching for priority enforcement. You can disable the cache, message expiration check, and lower you consumer prefetch to 1 to ensure getting the high priority messages from the store ahead of lower priority messages Note, this sort of tradeoff can have significant performance implications, so you must test your scenarios thoroughly. :
&destinationPolicy&
&policyMap&
&policyEntries&
&policyEntry queue="&" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" /&
Alternative strategies
Use Selectors
You can have say 100 consumers using a selector to find the high priority stuff
JMSPriority & 6
then have 50 consumers doing average or above
JMSPriority &= 4
Then say 10 consumers consuming all messages (so all priorities). Then this way you'll have a pool of threads always processing high priority messages - giving you very efficient priority based dispatching of messages without ActiveMQ having to batch up messages and reorder them before dispatching them.
Use Resequencer
You can reorder messages on some input queue A and send them to queue B in sorted order to avoid having to change your clients. This avoids the need to use selectors in your application as shown above.
To do this use the
The Apache Software Foundation.
Apache ActiveMQ, ActiveMQ, Apache, the Apache feather logo, and the Apache ActiveMQ project logo are trademarks of The Apache Software Foundation.
All other marks mentioned may be trademarks or registered trademarks of their respective owners.博客分类:
============================================================================
原创作品,允许转载。转载时请务必以超链接形式标明原始出处、以及本声明。
请注明转自:
============================================================================
最近有个需求是要使用activeMQ作为一个完全优先级队列,且里面的任务都是耗时很长的任务,主要特点如下:
1. 多个producer,多个consumer
2.consumer取来消息后,会去执行一些长时间的任务,期间阻塞consumer
3.没有消费的消息,中间可能会修改优先级
4.优先级高的消息必须先被消费
在使用过程中,发生了一些意外状况,因为长时间的任务,导致activeMQ判定consumer为 导致在取消息的时候不是完全按照优先级来取消息。解决方案如下:
首先,在activemq.xml中配置,使其支持优先级队列以及针对Slow-Consumer做一些策略。
&destinationPolicy&
&policyMap&
&policyEntries&
&policyEntry queue="&"
producerFlowControl="false" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" /&
&policyEntry queue="&" strictOrderDispatch="false" /&
&policyEntry queue="&" &
&pendingMessageLimitStrategy&
&constantPendingMessageLimitStrategy limit="0"/&
&/pendingMessageLimitStrategy&
&messageEvictionStrategy&
&oldestMessageWithLowestPriorityEvictionStrategy/&
&/messageEvictionStrategy&
&/policyEntry&
&/policyEntries&
&/policyMap&
&/destinationPolicy&
其次,因为每个消息都是长时间的操作,一定要等消息里的命令完全执行完毕后,再向ActiveMQ发送ACK,这样就可以保证所有的消息都是按照优先级来消费的。
浏览: 107345 次
来自: 北京
3.没有消费的消息,中间可能会修改优先级 是怎么实 ...
bewithme 写道我操,毕业6年就有50万,还有20万公积 ...
qindongliang1922 写道恭喜,有房族了谢谢~小破 ...
我操,毕业6年就有50万,还有20万公积金,请问你是哪个单位的 ...
恭喜,有房族了
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'一 .安装运行ActiveMQ:
1.下载activemq
tar -xf apache-activemq-5.9.0-bin.tar.gz
[zcw@g1 ~]$ cd apache-activemq-5.9.0
[zcw@g1 apache-activemq-5.9.0]$ cd bin/
[zcw@g1 bin]$ activemq start
三种运行方式:
(1)普通启动 ./activemq start
(2)启动并指定日志文件 ./activemq
start &tmp/smlog
(3)后台启动方式nohup
./activemq start &/tmp/smlog
前两种方式下在命令行窗口关闭时或者ctrl+c时导致进程退出,采用后台启动方式则可以避免这种情况
管理后台为:
http://ip:8161/admin/
4.检查已经启动
&ActiveMQ默认采用61616端口提供JMS服务,使用8161端口提供管理控制台服务,执行以下命令以便检验是否已经成功启动ActiveMQ服务。
打开端口:nc -lp 61616 &
查看哪些端口被打开 netstat -anp
查看61616端口是否打开: netstat -an |
grep 61616
检查是否已经启动:
(1).查看控制台输出或者日志文件&
(2).直接访问activemq的管理页面:
如果开启方式是使用(1)或(2),则直接ctrl+c或者关闭对应的终端即可&
如果开启方式是(3),则稍微麻烦一点:&
先查找到activemq对应的进程:&
ps -ef | grep activemq&
然后把对应的进程杀掉,假设找到的进程编号为 168168&
kill 168168&
二.创建ActiveMQ的Eclipse项目并运行
1)P2P方式
到中心仓库(
http://search.maven.org/
)里面找到:activemq-core
&project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&
&modelVersion&4.0.0&/modelVersion&
&groupId&net.datafans.exercise.rockmq&/groupId&
&artifactId&TestJMS&/artifactId&
&version&0.0.1-SNAPSHOT&/version&
&packaging&jar&/packaging&
&name&TestJMS&/name&
&url&http://maven.apache.org&/url&
&properties&
&project.build.sourceEncoding&UTF-8&/project.build.sourceEncoding&
&/properties&
&dependencies&
&dependency&
&groupId&junit&/groupId&
&artifactId&junit&/artifactId&
&version&3.8.1&/version&
&scope&test&/scope&
&/dependency&
&dependency&
&groupId&org.apache.activemq&/groupId&
&artifactId&activemq-core&/artifactId&
&version&5.7.0&/version&
&/dependency&
&/dependencies&
&/project&
package net.datafans.exercise.rockmq.TestJMS;
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Sender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// MessageProducer:消息发送者
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://ip:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i &= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
Receiver:
package net.datafans.exercise.rockmq.TestJMS;
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.D
import javax.jms.MessageC
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// 消费者,消息接收者
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://ip:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
测试过程:
&2)Pub/Sub模式
package net.datafans.exercise.rockmq.TestJMS;
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
import javax.jms.T
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Pub {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// MessageProducer:消息发送者
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://ip:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage();
message.setText("message_hello_chenkangxian");
producer.send(message);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i &= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
package net.datafans.exercise.rockmq.TestJMS;
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.D
import javax.jms.JMSE
import javax.jms.M
import javax.jms.MessageC
import javax.jms.MessageL
import javax.jms.S
import javax.jms.TextM
import javax.jms.T
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Sub {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// 消费者,消息接收者
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://ip:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// TODO Auto-generated method stub
TextMessage tm = (TextMessage)
System.out.println(tm.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
PS:代码都写出来了只是不知道为啥测试Pub/Sub模式一直都出不来&
3.Request和Reply模式
* @author mike
* @date Apr 17, 2014
package net.datafans.exercise.rockmq.TestJMS;
class CONFIG {
public static final java.lang.String AUTHOR = "Mike Tang";
public static final java.lang.String QUEUE_NAME = "REQUEST AND REPLY";
public static void introduce() {
java.lang.StringBuilder builder = new java.lang.StringBuilder();
builder.append("This is a simple example to show how to write a \n");
builder.append("request and reply pattern program with Apache ActiveMQ.\n");
builder.append("which is not support such pattern.\n\n");
builder.append("
-------- By Mike Tang\n");
builder.append("
at Soochow University\n");
System.out.println(builder.toString());
public static void introduceServer() {
System.out.println("Wait for the client send messages, and you will see something");
public static void introduceClient() {
System.out.println("Input something and ENTER, you will get the reply.\n"
+ "and input 'exit' stop the program.");
MessageClient
package net.datafans.exercise.rockmq.TestJMS;
import java.util.S
import java.util.UUID;
import javax.jms.C
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.JMSE
import javax.jms.M
import javax.jms.MessageC
import javax.jms.MessageL
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TemporaryQ
import javax.jms.TextM
import org.apache.activemq.ActiveMQConnectionF
public class MessageClient {
static Session
static MessageConsumer consumer
static MessageProducer producer
static TemporaryQueue
temporaryQueue = null;
public void setURL(String brokerurl) {
this.brokerurl =
public void start() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerurl);
Connection connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(CONFIG.QUEUE_NAME);
temporaryQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
consumer = session.createConsumer(temporaryQueue);
consumer.setMessageListener(new MMessageListener());
public void request(String request) throws JMSException {
System.out.println("REQUEST TO : " + request);
TextMessage textMessage = session.createTextMessage();
textMessage.setText(request);
textMessage.setJMSReplyTo(temporaryQueue);
textMessage.setJMSCorrelationID(UUID.randomUUID().toString());
MessageClient.producer.send(textMessage);
private static class MMessageListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
String messageText = ((TextMessage) message).getText();
System.out.println("REPLY FROM : " + messageText.toUpperCase());
} catch (JMSException e) {
e.printStackTrace();
// -----------------------------------------------------------------------
public static void main(String[] args) {
CONFIG.introduce();
CONFIG.introduceClient();
MessageClient client = new MessageClient();
client.setURL("tcp://ip:61616");
Scanner scanner = new Scanner(System.in);
client.start();
System.out.println("-----------------------------------------");
System.out.println("|
Client Start!
System.out.println("-----------------------------------------");
String message = "";
int i = 0;
while (!(message = scanner.next()).equals("exit")) {
client.request(message + " : " + i++);
scanner.close();
} catch (JMSException e) {
e.printStackTrace();
MessageServer
package net.datafans.exercise.rockmq.TestJMS;
import javax.jms.C
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.JMSE
import javax.jms.M
import javax.jms.MessageC
import javax.jms.MessageL
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQConnectionF
import org.apache.activemq.broker.BrokerS
public class MessageServer {
static BrokerService
brokerService = null;
static MessageConsumer consumer
static MessageProducer producer
static Session
public void setURL(String brokerurl) {
this.brokerurl =
public void start() throws JMSException {
createBroker(brokerurl);
setConsumer(brokerurl);
public void createBroker(String brokerurl) {
brokerService = new BrokerService();
brokerService.setUseJmx(false);
brokerService.setPersistent(false);
brokerService.addConnector(brokerurl);
brokerService.start();
} catch (Exception e) {
e.printStackTrace();
public void setConsumer(String url) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(CONFIG.QUEUE_NAME);
producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MMessageListener());
private static class MMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage response = session.createTextMessage();
if (message instanceof TextMessage) {
String messageText = ((TextMessage) message).getText();
response.setText(handleMessage(messageText));
System.out.println("REQUEST FROM " + messageText.toUpperCase());
response.setJMSCorrelationID(message.getJMSCorrelationID());
producer.send(message.getJMSReplyTo(), response);
} catch (JMSException e) {
e.printStackTrace();
private String handleMessage(String text) {
return "RESPONSE TO " + text.toUpperCase();
// -----------------------------------------------------------------
public static void main(String[] args) {
CONFIG.introduce();
CONFIG.introduceServer();
MessageServer server = new MessageServer();
server.setURL("tcp://ip:61616");
server.start();
System.out.println("-----------------------------------------");
System.out.println("|
Server Start!
System.out.println("-----------------------------------------");
} catch (JMSException e) {
e.printStackTrace();
阅读(...) 评论()}

我要回帖

更多关于 activemq 优先级 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信