却定是11mqmq不能在电视剧收视率排行榜了,首页弹出的wwW11mqmqcom提示框呢?

NET环境中使用RabbitMQ_C#应用_动态网站制作指南
NET环境中使用RabbitMQ
来源:人气:1520
一 环境搭建
首先,由于RabbitMQ使用Erlang编写的,需要运行在Erlang运行时环境上,所以在安装RabbitMQ Server之前需要安装Erlang 运行时环境,可以到Erlang官网下载对应平台的安装文件。如果没有安装运行时环境,安装RabbitMQ Server的时候,会提示需要先安装Erlang环境。 安装完成之后,确保已经将Erlang的安装路径注册到系统的环境变量中。安装完Erlang之后,这个环境会自动设置,如果没有:按照下图进行设置。
然后,去RabbitMQ官网下载RabbitMQ Server服务端程序,选择合适的平台版本下载。安装完成之后,就可以开始使用了。
现在就可以对RabbitMQ Server进行配置了。
首先,切换到RabbitMQ Server的安装目录:
在sbin下面有很多batch文件,用来控制RabbitMQ Server。
最简单的方式是使RabbitMQ以Windows Service的方式在后台运行,所以我们需要以管理员权限打开cmd,然后切换到sbin目录下,执行这三条命令即可:
rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
现在RabbitMQ的服务端已经启动起来了(如果启动失败,请检查安装后是否服务已经启动,如果没有,则可能是因为安装版本的原因)。
下面可以使用sbin目录下面的rabbitmqctl.bat这个脚本来查看和控制服务端状态的,在cmd中直接运行rabbitmqctl status。如果不是看到以下结果:,需要到C:\Windows目录下,将.erlang.cookie文件,拷贝到用户目录下 C:\Users\{用户名},这是Erlang的Cookie文件,允许与Erlang进行交互:
RabbitMQ Server上面也有用户概念,安装好之后,使用rabbitmqctl list_users命令,可以看到上面目前的用户:
可以使用下面的命令来添加用户并设置权限:
rabbitmqctl
add_user &test
rabbitmqctl
set_permissions
rabbitmqctl
set_user_tags test administrator
上面的一条命令添加了一个名为test的用户,并设置了密码123456,下面的命令为用户test分别授予对所有消息队列的配置、读和写的权限。
现在我们可以将默认的guest用户删掉,使用下面的命令即可:
rabbitmqctl delete_user guest
如果要修改密码,可以使用下面的命令:
rabbitmqctl change_pass {username}
{newpassowrd}
二 开始使用
在.NET中使用RabbitMQ需要下载RabbitMQ的客户端程序集,可以到官网下载,下载解压后就可以得到RabbitMQ.Client.dll,这就是RabbitMQ的客户端。
在使用RabitMQ之前,需要对下面的几个基本概念说明一下:
RabbitMQ是一个消息代理。他从消息生产者(oducers)那里接收消息,然后把消息送给消息消费者(consumer)在发送和接受之间,他能够根据设置的规则进行路由,缓存和持久化。
一般提到RabbitMQ和消息,都用到一些专有名词。
生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用"P"来表示:
队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们只能存储在队列(queue)中。 队列(queue)容量没有限制,你要存储多少消息都可以&&基本上是一个无限的缓冲区。多个生产者(producers)能够把消息发送给同一个队列,同样,多个消费者(consumers)也能从同一个队列(queue)中获取数据。队列可以画成这样(图上是队列的名称):
消费(Consuming)和获取消息是一样的意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它画作"C":
通常,消息生产者,消息消费者和消息代理不在同一台机器上。
2.1 Hello World
为了展示RabbitMQ的基本使用,我们发送一个HelloWorld消息,然后接收并处理。
首先创建一个控制台程序,用来将消息发送到RabbitMQ的消息队列中,代码如下:
首先,需要创建一个ConnectionFactory,设置目标,由于是在本机,所以设置为localhost,如果RabbitMQ不在本机,只需要设置目标机器的地址或者机器名称即可,然后设置前面创建的用户名test和密码123456。
紧接着要创建一个Channel,如果要发送消息,需要创建一个队列,然后将消息发布到这个队列中。在创建队列的时候,只有RabbitMQ上该队列不存在,才会去创建。消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化和然后转化为二进制数组。
现在客户端发送代码已经写好了,运行之后,消息会发布到RabbitMQ的消息队列中,现在需要编写服务端的代码连接到RabbitMQ上去获取这些消息。
同样,创建一个名为Receive的服务端控制台应用程序,服务端代码如下:
和发送一样,首先需要定义连接,然后声明消息队列。要接收消息,需要定义一个Consume,然后从消息队列中不断Dequeue消息,然后处理。
现在发送端和接收端的代码都写好了,运行发送端,发送消息:
现在,名为hello的消息队列中,发送了一条消息。这条消息存储到了RabbitMQ的服务器上了。使用rabbitmqctl 的list_queues可以查看所有的消息队列,以及里面的消息个数,可以看到,目前Rabbitmq上只有一个消息队列,里面只有一条消息:
rabbitmqctl list_queues
Listing queues ...
现在运行接收端程序:
可以看到,已经接受到了客户端发送的Hello World,现在再来看RabitMQ上的消息队列信息:
rabbitmqctl list_queues
Listing queues ...
可以看到,hello这个队列中的消息队列个数为0,这表示,当接收端,接收到消息之后,RabbitMQ上就把这个消息删掉了。
2.2 工作队列
前面的例子展示了如何往一个指定的消息队列中发送和收取消息。现在我们创建一个工作队列(work queue)来将一些耗时的任务分发给多个工作者(workers):
工作队列(work queues, 又称任务队列Task Queues)的主要思想是为了避免立即执行并等待一些占用大量资源、时间的操作完成。而是把任务(Task)当作消息发送到队列中,稍后处理。一个运行在后台的工作者(worker)进程就会取出任务然后处理。当运行多个工作者(workers)时,任务会在它们之间共享。
这个在网络应用中非常有用,它可以在短暂的HTTP请求中处理一些复杂的任务。在一些实时性要求不太高的地方,我们可以处理完主要操作之后,以消息的方式来处理其他的不紧要的操作,比如写日志等等。
在第一部分,发送了一个包含&Hello World!&的字符串消息。现在发送一些字符串,把这些字符串当作复杂的任务。这里使用time.sleep()函数来模拟耗时的任务。在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比如"Hello..."就会耗时3秒钟。
对之前示例的send.cs做些简单的调整,以便可以发送随意的消息。这个程序会按照计划发送任务到我们的工作队列中。
static void Main(string[] args)
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "test";
factory.Password = "123456";
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare("hello", false, false, false, null);
string message = GetMessage(args);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "hello", properties, body);
Console.WriteLine(" set {0}", message);
Console.ReadKey();
private static string GetMessage(string[] args)
return ((args.Length & 0) ? string.Join(" ", args) : "Hello World!");
加粗部分是经过修改过了的。
接着我们修改接收端,让他根据消息中的逗点的个数来Sleep对应的秒数:
static void Main(string[] args)
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "test";
factory.Password = "123456";
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare("hello", false, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true, consumer);
while (true)
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.B
var message = Encoding.UTF8.GetString(body);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine("Received {0}", message);
Console.WriteLine("Done");
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。
现在,我们先启动两个接收端,等待接受消息,然后启动一个发送端开始发送消息。
在cmd条件下,发送了5条消息,每条消息后面的逗点表示该消息需要执行的时长,来模拟耗时的操作。
然后可以看到,两个接收端依次接收到了发出的消息:
默认,RabbitMQ会将每个消息按照顺序依次分发给下一个消费者。所以每个消费者接收到的消息个数大致是平均的。 这种消息分发的方式称之为轮询(round-robin)。
2.3 消息响应
当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在当前的代码中,当RabbitMQ将消息发送给消费者(consumers)之后,马上就会将该消息从队列中移除。此时,如果把处理这个消息的工作者(worker)停掉,正在处理的这条消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。
我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望该消息会重新发送给其他的工作者(worker)。
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。
如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
消息响应默认是开启的。在之前的例子中使用了no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。
channel.BasicConsume("hello", false, consumer);
while (true)
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.B
var message = Encoding.UTF8.GetString(body);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine("Received {0}", message);
Console.WriteLine("Done");
channel.BasicAck(ea.DeliveryTag, false);
现在,可以保证,即使正在处理消息的工作者被停掉,这些消息也不会丢失,所有没有被应答的消息会被重新发送给其他工作者.
一个很常见的错误就是忘掉了BasicAck这个方法,这个错误很常见,但是后果很严重. 当客户端退出时,待处理的消息就会被重新分发,但是RabitMQ会消耗越来越多的内存,因为这些没有被应答的消息不能够被释放。调试这种case,可以使用rabbitmqct打印messages_unacknoledged字段。
rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
2.4 消息持久化
前面已经搞定了即使消费者down掉,任务也不会丢失,但是,如果RabbitMQ Server停掉了,那么这些消息还是会丢失。
当RabbitMQ Server 关闭或者崩溃,那么里面存储的队列和消息默认是不会保存下来的。如果要让RabbitMQ保存住消息,需要在两个地方同时设置:需要保证队列和消息都是持久化的。
首先,要保证RabbitMQ不会丢失队列,所以要做如下设置:
bool durable =
channel.QueueDeclare("hello", durable, false, false, null);
虽然在语法上是正确的,但是在目前阶段是不正确的,因为我们之前已经定义了一个非持久化的hello队列。RabbitMQ不允许我们使用不同的参数重新定义一个已经存在的同名队列,如果这样做就会报错。现在,定义另外一个不同名称的队列:
bool durable =
channel.queueDeclare("task_queue", durable, false, false, null);
queueDeclare 这个改动需要在发送端和接收端同时设置。
现在保证了task_queue这个消息队列即使在RabbitMQ Server重启之后,队列也不会丢失。 然后需要保证消息也是持久化的, 这可以通过设置IBasicProperties.SetPersistent 为true来实现:
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms
2.5 公平分发
你可能会注意到,消息的分发可能并没有如我们想要的那样公平分配。比如,对于两个工作者。当奇数个消息的任务比较重,但是偶数个消息任务比较轻时,奇数个工作者始终处理忙碌状态,而偶数个工作者始终处理空闲状态。但是RabbitMQ并不知道这些,他仍然会平均依次的分发消息。
为了改变这一状态,我们可以使用basicQos方法,设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工作者发送多于1个的消息,或者换句话说。在一个工作者还在处理消息,并且没有响应消息之前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工作者。
channel.BasicQos(0, 1, false);
2.6 完整实例
现在将所有这些放在一起:
发送端代码如下:
static void Main(string[] args)
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "test";
factory.Password = "123456";
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
bool durable =
channel.QueueDeclare("task_queue", durable, false, false, null);
string message = GetMessage(args);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "task_queue", properties, body);
Console.WriteLine(" set {0}", message);
Console.ReadKey();
private static string GetMessage(string[] args)
return ((args.Length & 0) ? string.Join(" ", args) : "Hello World!");
接收端代码如下:
static void Main(string[] args)
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "test";
factory.Password = "123456";
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
bool durable =
channel.QueueDeclare("task_queue", durable, false, false, null);
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("task_queue", false, consumer);
while (true)
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.B
var message = Encoding.UTF8.GetString(body);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine("Received {0}", message);
Console.WriteLine("Done");
channel.BasicAck(ea.DeliveryTag, false);
三 管理界面
RabbitMQ还有一个管理界面,通过该界面可以查看RabbitMQ Server 当前的状态,该界面是以插件形式提供的,并且在安装RabbitMQ的时候已经自带了该插件。需要做的是在RabbitMQ控制台界面中启用该插件,命令如下:
rabbitmq-plugins enable rabbitmq_management
现在,在浏览器中输入 http://server-name:15672/ server-name换成机器地址或者域名,如果是本地的,直接用localhost在输入之后,弹出登录界面,使用我们之前创建的用户登录。
在该界面上可以看到当前RabbitMQServer的所有状态。
本文简单介绍了消息队列的相关概念,并介绍了RabbitMQ消息代理的基本原理以及在Windows 上如何安装RabbitMQ和在.NET中如何使用RabbitMQ。消息队列在构建分布式系统和提高系统的可扩展性和响应性方面有着很重要的作用,希望本文对您了解消息队列以及如何使用RabbitMQ有所帮助。
五 参考文献
http://www.infoq.com/cn/articles/message-based-distributed-architecture
http://www.rabbitmq.com/getstarted.html
http://www.codethinked.com/using-rabbitmq-with-c-and-net
http://www.infoq.com/cn/articles/AMQP-RabbitMQ
http://www.infoq.com/cn/articles/ebay-scalability-best-practices
优质网站模板博主最新文章
博主热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)博客分类:
此远程接口调用是基于RPC的
先来看看提供暴露接口方法的配置
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"&
&bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory"&
&constructor-arg value="localhost" /&
&property name="username" value="guest" /&
&property name="password" value="guest" /&
&bean id="amqpAdmin"
class="org.springframework.amqp.rabbit.core.RabbitAdmin"&
&constructor-arg ref="connectionFactory" /&
&bean id="rabbitTemplate"
class="org.springframework.amqp.rabbit.core.RabbitTemplate"&
&constructor-arg ref="connectionFactory"&&/constructor-arg&
&bean id="testService" class="com.abin.test.TestServiceImpl"&&/bean&
class="org.springframework.amqp.rabbit.remoting.RabbitInvokerServiceExporter"&
&property name="connectionFactory" ref="connectionFactory" /&
&property name="serviceInterface"
value="com.abin.test.TestService" /&
&property name="service" ref="testService" /&
&property name="exchange" value="service_exhange" /&
&property name="exchangeTypes" value="topic" /&
&property name="routingKey" value="routing.example.service" /&
&property name="queueName" value="incoming_queue_name" /&
&property name="poolsize" value="5" /&
RabbitInvokerServiceExporter类用于把接口services放到一个类型为“direct”的queue或者exchange中,并处理远程接口调用的回调。
远程调用配置如下:
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"&
&!-- 创建connectionFactory --&
&bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory"&
&constructor-arg value="localhost" /&
&property name="username" value="guest" /&
&property name="password" value="guest" /&
&!-- 创建rabbitAdmin 代理类 --&
&bean id="rabbitAdmin"
class="org.springframework.amqp.rabbit.core.RabbitAdmin"&
&constructor-arg ref="connectionFactory" /&
&!-- 创建rabbitTemplate 消息模板类 --&
&bean id="rabbitTemplate"
class="org.springframework.amqp.rabbit.core.RabbitTemplate"&
&constructor-arg ref="connectionFactory"&&/constructor-arg&
&bean id="testService"
class="org.springframework.amqp.rabbit.remoting.RabbitInvokerProxyFactoryBean"&
&property name="connectionFactory" ref="connectionFactory" /&
&property name="serviceInterface"
value="com.abin.test.TestService"&
&/property&
&property name="exchange" value="service_exhange" /&
&property name="exchangeTypes" value="topic" /&
&property name="routingKey" value="routing.example.service" /&
&bean id="testAction" class="com.abin.action.TestAction"&
&property name="testService" ref="testService" /&
RabbitInvokerProxyFactoryBean类通过拦截器方法调用在rabbitmq中已提供的远程接口信息。
上述用到的程序在附件中。还可以参考
下载次数: 372
下载次数: 339
wubin850219
浏览: 192833 次
来自: 南昌
这个项目不能运行啊
有没有能跑起来的。。。。。。
jar包配置没有啊
tianhuilove 写道谢谢分享我试了一下你的程序,sen ...
看了同步和异步的这两个例子,有点不明白,在代码上哪里可以体现同 ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'什么是消息中间件,例如IBM MQ,能否用通俗的语言帮忙解释一下。。_百度知道
什么是消息中间件,例如IBM MQ,能否用通俗的语言帮忙解释一下。。
我有更好的答案
其他类似问题
您可能关注的内容
中间件的相关知识
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。> 博客详情
理解 Ack,设置为手动 Ack,如何在异常时,进行数据回返,我们再次不理解基础的发送和接受的功能,官网的实例已经很满足学习的要求了,其实在队列的配置中,最复杂的也就是消费者的逻辑,我这边讲解的适用于开发大型网站,对数据的处理要非常的谨慎的,如果是简单学习,不建议看。
概念性解读(Ack的灵活)
&&&&&&&&&首先啊,有的人不是太理解这个Ack是什么,讲的接地气一点,其实就是一个通知,怎么说呢,当我监听消费者,正常情况下,不会出异常,但是如果是出现了异常,甚至是没有获取的异常,那是不是这条数据就会作废,但是我们肯定不希望这样的情况出现,我们想要的是,如果在出现异常的时候,我们识别到,如果确实是一个不良异常,肯定希望数据重新返回队列中,再次执行我们的业务逻辑代码,此时我就需要一个Ack的通知,告诉队列服务,我是否已经成功处理了这条数据,而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这么来的,更加灵活的,我们需要Ack不自动,而是手动,这样做的好处,就是使得我们开发人员更加人性化或者灵活的来处理我们的业务罗杰代码,更加方便的处理异常的问题以及数据的返回处理等。下面是通话机制的四条原则:
Basic.Ack 发回给 RabbitMQ 以告知,可以将相应 message 从 RabbitMQ 的消息缓存中移除。
Basic.Ack 未被 consumer 发回给 RabbitMQ 前出现了异常,RabbitMQ 发现与该 consumer 对应的连接被断开,之后将该 message 以轮询方式发送给其他 consumer (假设存在多个 consumer 订阅同一个 queue)。
在 no_ack=true 的情况下,RabbitMQ 认为 message 一旦被 deliver 出去了,就已被确认了,所以会立即将缓存中的 message 删除。所以在 consumer 异常时会导致消息丢失。
来自 consumer 侧的 Basic.Ack 与 发送给 Producer 侧的 Basic.Ack 没有直接关系。
正题部分(配置手动Ack,实现异常消息回滚)
A.&在消费者端的mq配置文件上添加,配置 &关键代码为 acknowledeg = "manual",意为表示该消费者的ack方式为手动(此时的queue已经和生产者的exchange通过某个routeKey绑定了)
&rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"&
&rabbit:listener queues="queue_xxx" ref="MqConsumer"/&
&rabbit:listener queues="queue_xxx" ref="MqConsumer2"/&
&/rabbit:listener-container&
B.&新建一个类 MqConsumer ,并实现接口 &ChannelAwareMessageListener ,实现onMessage方法,不需要指定方法。
springAMQP中已经实现了一个功能,如果该监听器已经实现了下面2个接口,则直接调用onMessage方法
C.&关键点在实现了ChannelAwareMessageListener的onMessage方法后,会有2个参数。
一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
其中deliveryTag是tag的id,由生产者生成。第二个参数我其实也没理解用途,暂时还没有模拟出场景,所以先不讨论。
同样的,如果要Nack或者拒绝消息(reject)的时候,也是调用channel里面的basicXXX方法就可以了(当然要制定tagId)。注意如果抛异常或Nack(并且requeue为true),消息会一直重新入队列,一不小心就会重复一大堆消息不断出现~。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到队列,api里面解释得很清楚
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
D.&针对上面所描述的情况,我们在搭建一个消息队列时候,我们的思路应该是这样的,首先,我们要启动ack的手动方式,紧接着,我们处理代码逻辑,如果发生了异常信息,我们首先通知到ack,我已经表示接受到这条数据了,你可以进行删除了,不需要让他自动的重新进入队列中,然后,我们启用一个错误处理,手动将其重新插入队列中,在此之前,有几个类和Api一起来看一下。
& & 1.&SimpleMessageListenerContainer
& & 这个是我们的基础监听,他的作用就是队列的总监听,可以为其配置ack模式,异常处理类等。。
& & 2.&org.springframework.amqp.support.converter.SimpleMessageConverter
& & 这个类和下面的Converter类很容易搞混淆,这个类的作用是可以解析队列中的 message 转 obj
& & 3.&org.springframework.amqp.rabbit.retry.MessageRecoverer
& & 这个接口,需要我们开发者自定义实现,其中的一个方法recover(Message message, Throwable cause),就可以看出来他是干嘛的,就是说在监听出错,也就是没有抓取的异常而是抛出的异常会触发该方法,我们就会在这个接口的实现中,将消息重新入队列
& & 4.&org.springframework.util.ErrorHandler
& & 这个接口也是在出现异常时候,会触发他的方法
E.& 完整实例
& & 1. spring配置队列xml
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd"&
&!-- 连接服务配置 --&
&rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" /&
&!-- 设置Ack模式为手动 --&
&bean id="ackManual"
class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean"&
&property name="staticField"
value="org.springframework.amqp.core.AcknowledgeMode.MANUAL" /&
&!-- 异常处理,记录异常信息 --&
&bean id="mqErrorHandler" class="com.zefun.wechat.utils.MQErrorHandler"/&
&!-- 将类自动注入,可解析msg信息 --&
&bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" /&
&!-- 创建rabbitAdmin 代理类 --&
&rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/&
&rabbit:admin connection-factory="connectionFactory" /&
&!-- 创建SimpleMessageListenerContainer的理想通道,主要实现异常事件处理逻辑 --&
&bean id="retryOperationsInterceptorFactoryBean"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean"&
&property name="messageRecoverer"&
&bean class="com.zefun.wechat.utils.MQRepublishMessageRecoverer"/&
&/property&
&property name="retryOperations"&
&bean class="org.springframework.retry.support.RetryTemplate"&
&property name="backOffPolicy"&
class="org.springframework.retry.backoff.ExponentialBackOffPolicy"&
&property name="initialInterval" value="500" /&
&property name="multiplier" value="10.0" /&
&property name="maxInterval" value="10000" /&
&/property&
&/property&
&!-- 定义队列,在下面的交换机中引用次队列,实现绑定 --&
&rabbit:queue id="queue_system_error_logger_jmail" name="${rabbitmq.system.out.log.error.mail}" durable="true"
auto-delete="false" exclusive="false" /&
&!--路由设置 将队列绑定,属于direct类型 --&
&rabbit:direct-exchange id="directExchange"
name="directExchange" durable="true" auto-delete="false"&
&rabbit:bindings&
&rabbit:binding queue="queue_system_error_logger_jmail" key="${rabbitmq.system.out.log.error.mail}" /&
&/rabbit:bindings&
&/rabbit:direct-exchange&
&!-- logger 日志发送功能 --&
&bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"&
&property name="connectionFactory" ref="connectionFactory" /&
&property name="acknowledgeMode" ref="ackManual" /&
&property name="queueNames" value="${rabbitmq.system.out.log.error.mail}" /&
&property name="messageListener"&
&bean class="com.zefun.wechat.listener.SystemOutLogErrorMessageNoitce" /&
&/property&
&property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers}" /&
&property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" /&
&property name="errorHandler" ref="mqErrorHandler" /&
& & 2. MessageRecoverer 配置,将小心重新入队列
package com.zefun.wechat.
import java.io.PrintW
import java.io.StringW
import java.util.M
import org.apache.log4j.L
import org.springframework.amqp.core.M
import org.springframework.amqp.rabbit.core.RabbitT
import org.springframework.amqp.rabbit.retry.MessageR
import org.springframework.amqp.support.converter.MessageC
import org.springframework.beans.factory.annotation.A
public class MQRepublishMessageRecoverer implements MessageRecoverer {
private static final Logger logger = Logger.getLogger(MQRepublishMessageRecoverer.class);
@Autowired
private RabbitTemplate rabbitT
@Autowired
private MessageConverter msgC
public void recover(Message message, Throwable cause) {
Map&String, Object& headers = message.getMessageProperties().getHeaders();
headers.put("x-exception-stacktrace", getStackTraceAsString(cause));
headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());
headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey());
this.rabbitTemplate.send(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message);
logger.error("handler msg (" + msgConverter.fromMessage(message) + ") err, republish to mq.", cause);
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
& & 3. MQErrorHandler 写法,在出现异常时,记录异常
package com.zefun.wechat.
import java.lang.reflect.F
import java.util.D
import org.apache.commons.lang.reflect.FieldU
import org.apache.log4j.L
import org.springframework.amqp.core.M
import org.springframework.amqp.support.converter.MessageC
import org.springframework.beans.factory.annotation.A
import org.springframework.util.ErrorH
import com.zefun.wechat.service.RedisS
public class MQErrorHandler implements ErrorHandler {
private static final Logger logger = Logger.getLogger(MQErrorHandler.class);
@Autowired
private RedisService redisS
@Autowired
private MessageConverter msgC
public void handleError(Throwable cause) {
Field mqMsgField = FieldUtils.getField(MQListenerExecutionFailedException.class, "mqMsg", true);
if (mqMsgField != null) {
Message mqMsg = (Message) mqMsgField.get(cause);
Object msgObj = msgConverter.fromMessage(mqMsg);
logger.error("handle MQ msg: " + msgObj + " failed, record it to redis.", cause);
redisService.zadd(App.MsgErr.MQ_MSG_ERR_RECORD_KEY, new Double(new Date().getTime()), msgObj.toString());
} catch (Exception e) {
e.printStackTrace();
logger.error("An error occurred.", cause);
& & 4.&SystemOutLogErrorMessageNoitce 实现&ChannelAwareMessageListener接口,处理邮件服务
package com.zefun.wechat.
import javax.mail.internet.MimeM
import org.apache.log4j.L
import org.springframework.amqp.core.M
import org.springframework.amqp.rabbit.core.ChannelAwareMessageL
import org.springframework.amqp.support.converter.MessageConversionE
import org.springframework.amqp.support.converter.MessageC
import org.springframework.beans.factory.annotation.A
import org.springframework.mail.javamail.JavaMailSenderI
import org.springframework.mail.javamail.MimeMessageH
import com.rabbitmq.client.C
import com.zefun.wechat.utils.A
import net.sf.json.JSONO
public class SystemOutLogErrorMessageNoitce
implements ChannelAwareMessageListener {
private static final Logger logger = Logger.getLogger(MemberWechatMessageTextNoitce.class);
@Autowired
private MessageConverter msgC
/** logger b */
@Autowired
private JavaMailSenderImpl senderI
public void onMessage(Message message, Channel channel) throws Exception {
Object obj =
obj = msgConverter.fromMessage(message);
} catch (MessageConversionException e) {
logger.error("convert MQ message error.", e);
} finally {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (deliveryTag != App.DELIVERIED_TAG) {
channel.basicAck(deliveryTag, false);
message.getMessageProperties().setDeliveryTag(App.DELIVERIED_TAG);
logger.info("revice and ack msg: " + (obj == null ? message : new String((byte[]) obj)));
if (obj == null) {
JSONObject map = JSONObject.fromObject(obj);
sendMailSystemLoggerError(map.getString("date"), map.getString("subject"), map.getString("domain"), map.getString("requestURL"), map.getString("message"));
* jmail logger
* @author 小高
* @date 日 下午3:24:46
* @param date
* @param subject
* @param domain
* @param message
logger日志
* @param requestURL
* @throws Exception
public void sendMailSystemLoggerError(String date, String subject, String domain, String requestURL, String message) throws Exception{
MimeMessage mailMessage = this.senderImpl.createMimeMessage();
MimeMessageHelper messageHelper = new MimeMessageHelper(mailMessage, true);
messageHelper.setTo("");
messageHelper.setFrom("");
messageHelper.setSubject(date + " 系统异常");
String msg = "&p&异常时间:" + date + "&/p&&p&门店企业:" + subject + "&/p&"
+ "&p&部署环境:" + domain + "&/p&&p&异常连接:" + requestURL + "&/p&"
+ "&p&异常内容:&/p&" +
messageHelper.setText("&html&&head&&/head&&body&" + msg + "&/body&&/html&", true);
senderImpl.send(mailMessage);
logger.info("jmail push message success");
E.&rabbitMq中文文档,方便查阅API&
什么问题呢?
获取到的是生产者在发送的时候生成的tag
这两个是我个人的配置,看你自己的命名规则就可以了
看一下服务商收到了数据么
检查一下在端口上面的数据,是否到达了rabbit上,在查看Java的程序中的数据。
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥}

我要回帖

更多关于 2017收视率最高的韩剧 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信