如何基于RabbitMQ实现如何解除低优先级队列列

RabbitMQ开源企业级消息队列系统实现方案(单机版) - Linux服务器 - 次元立方网 - 电脑知识与技术互动交流平台
RabbitMQ开源企业级消息队列系统实现方案(单机版)
RabbitMQ安装配置文档
一.rabbitMQ介绍
1.简单说明
RabbitMQ是流行的开源消息队列系统,用erlang语言开发;它是AMQP(高级消息队列协议)的标准实现,是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
2.实现模式图
3.几个主要概念说明
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
4.消息队列的使用过程
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为&abc&,那么客户端提交的消息,只有设置了key为&abc&的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号&#&匹配一个或多个词,符号&*&匹配正好一个词。例如&abc.#&匹配&abc.def.ghi&,&abc.*&只匹配&abc.def&。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
  (1)exchange持久化,在声明时指定durable =& 1
  (2)queue持久化,在声明时指定durable =& 1
  (3)消息持久化,在投递时指定delivery_mode =& 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
二.安装配置
1.RabbitMQ安装与配置
[root@linuxblind zdh]# wget -O /etc/yum.repos.d/epel-erlang.repo \
http://binaries./rpm/centos/erlang_solutions.repo
[root@linuxblind zdh]# yum install esl-erlang
[root@linuxblind sbin]# erl
Erlang R15B03 (erts-5.9.3.1) [source][64-bit] [async-threads:0] [hipe] [kernel-poll:false]
Eshell V5.9.3.1 &(abort with ^G)
1& io:format(&hello world~n&).
hello world & &&-----表示安装成功
[root@linuxblind zdh]# wget \
/releases/rabbitmq-server/v3.1.1/rabbitmq-server-generic-unix-3.1.1.tar.gz
[root@linuxblind zdh]# tar zxf rabbitmq-server-generic-unix-3.1.1.tar.gz -C/usr/local/
[root@linuxblind zdh]# cd /usr/local
[root@linuxblind zdh]# mv rabbitmq-server-3.1.1 rabbitmq
#配置rabbitmq服务的环境变量
[root@linuxblind rabbitmq]# vim etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODE_IP_ADDRESS=192.168.245.129
RABBITMQ_NODE_PORT=5672
RABBITMQ_LOG_BASE=/u1/logs/rabbitmq/log
RABBITMQ_MNESIA_BASE=/u1/logs/rabbitmq/mnesia
RABBITMQ_PLUGINS_DIR=/usr/local/rabbitmq/plugins
#配置rabbitmqweb管理插件
[root@gy02 rabbitmq]#vim etc/rabbitmq/rabbitmq.config
[{rabbitmq_management,
&[{listener, [{port, 15672},
& & & & & {ip, &192.168.245.129&}
& & & & &]}
2. rabbitMQ服务的启动与关闭
#启动管理界面:http://ip:15672/初始用户名:guest 密码:guest
[root@linuxblind zdh]# /usr/local/rabbitmq/sbin/rabbitmq-plugins enable \
rabbitmq_management
& & & [root@linuxblindzdh]# /usr/local/rabbitmq/sbin/rabbitmq-server -detached
& & & #关闭服务
& & & [root@linuxblind zdh]#/usr/local/rabbitmq/sbin/rabbitmqctl stop
& & & #查看状态
& & & [root@linuxblind zdh]#/usr/local/rabbitmq/sbin/rabbitmqctl status
& & & 注:线上安装目录与命令同上(在160服务器上)
& & & #启动时报错
& & & 问题:
& & & & & =INFO REPORT====27-Jun-:38 ===
& &Error description:
& & & & &{could_not_start_tcp_listener,{&192.168.0.107&,5672}}
& & & 解决办法:
& & & & [root@gyv7 sbin]#netstat -anpl | grep 5672
&tcp &0 &0 &0.0.0.0:.0.0:* & LISTEN & & 1595/qpidd & & & & &
&[root@gyv7 sbin]#service qpidd stop
& Stopping Qpid AMQPdaemon: & & & & & & & & & & &[ OK &]
&[root@gyv7 sbin]#chkconfig --level 35 qpidd off
3. 其他命令与测试程序(perl[含环境配置])
& & & 3.1 测试前准备
#创建测试程序目录并进入
[root@linuxblind zdh]# mkdir /u1/cd /u1/scripts
#创建vhost
[root@linuxblind scripts]# /usr/local/rabbitmq/sbin/rabbitmqctl \
add_vhost /pyhtest
#创建用户及密码
& [root@linuxblind scripts]# /usr/local/rabbitmq/sbin/rabbitmqctl \
& & & add_user pyh pyh1234
& & &#设置pyh用户对/pyhtest这个vhost拥有全部权限
& [root@linuxblind scripts]# /usr/local/rabbitmq/sbin/rabbitmqctl \
& & &set_permissions -p /pyhtest pyh &.*& &.*& &.*&
& & &#更多命令
& [root@linuxblind scripts]# /usr/local/rabbitmq/sbin/rabbitmqctl &help
[ 注:后面三个&*&代表pyh用户拥有对/pyhtest的配置、写、读全部权限]
3.2 测试环境配置
[root@linuxblindscripts]# wget \
http://search.cpan.org/CPAN/authors/id/J/JE/JESUS/Net-RabbitMQ-0.2.0.tar.gz
[root@linuxblindscripts]# tar zxf Net-RabbitMQ-0.2.0.tar.gz
[root@linuxblindscripts]# cd Net-RabbitMQ-0.2.0
[root@linuxblindscripts]# perl Makefile.PL
[root@linuxblindscripts]# make && make install
[root@linuxblindscripts]# wget \
http://66.39.76.93/authors/id/C/CA/CAUGUSTIN/UUID-Tiny-1.03.tar.gz
[root@linuxblind scripts]# tar zxfUUID-Tiny-1.03.tar.gz
[root@linuxblindscripts]# cd UUID-Tiny-1.03
[root@linuxblindscripts]# perl Makefile.PL
[root@linuxblindscripts]# make && make install
3.3 测试程序
[root@linuxblindscripts]# cat producer.pl
#!/usr/bin/perl
use Net::RabbitMQ;
use UUID::T
my $channel = 1000; & & & #channel ID,可以随意指定,只要不冲突
my $queuename =&pyh_queue&; & & & & & &#队列名
my $exchange =&pyh_exchange&; & & & & & #交换机名
my $routing_key =&test&; & & & #routingkey
my $mq =Net::RabbitMQ-&new(); & & & & & #创建一个RabbitMQ对象
# 建立连接
$mq-&connect(&localhost&,{vhost=&&/pyhtest&,user=&&pyh&,pass=&&pyh1234&});
$mq-&channel_open($channel); & & &# 打开一个channel
$mq-&exchange_declare($channel,$exchange,{durable=&1}); & & & # 声明一个持久化的交换机
$mq-&queue_declare($channel,$queuename,{durable=&1}); & & & & # 声明一个持久化的队列
# 使用routing key在交换机和队列间建立绑定
$mq-&queue_bind($channel,$queuename,$exchange,$routing_key);
for(my$i=0;$i&1000;$i++){
& my $string = create_UUID_as_string(UUID_V1); & & # 产生一条UUID作为消息主体
& # 将消息结合key以持久化模式投递到交换机
& $mq-&publish($channel,$routing_key,$string,{exchange=&$exchange},{delivery_mode=&2});
$mq-&disconnect();
[root@linuxblindscripts]#catconsumer.pl
#!/usr/bin/perl
use Net::RabbitMQ;
my $channel = 1001;
my $queuename =&pyh_queue&;
my $mq = Net::RabbitMQ-&new();
$mq-&connect(&localhost&,{vhost=&&/pyhtest&,user=&&pyh&,pass=&&pyh1234&});
$mq-&channel_open($channel);
& my $hashref =$mq-&get($channel,$queuename);
& last unless defined $
& print$hashref-&{message_count},&:&,$hashref-&{body},&\n&;
$mq-&disconnect();
#查看消息队列数
[root@linuxblind sbin]# ./rabbitmqctllist_queues -p /pyhtest
Listing queues ...
pyh_queue & & & 100000
[ 结果在/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@linuxblind/msg_store_persistent目录下产生<span style="color: #0K &0.rdq文件; 并且在http://192.168.245.129:15672/#/页面可以查看msg的生产和消费的变化]
延伸阅读:
1 前言Ansible是自动化运维的工具,基于Python开发,...
本教程为 李华明 编著的iOS-Cocos2d游戏开发系列教程:教程涵盖关于i......
专题主要学习DirectX的初级编程入门学习,对Directx11的入门及初学者有......
&面向对象的JavaScript&这一说法多少有些冗余,因为JavaScript 语言本......
Windows7系统专题 无论是升级操作系统、资料备份、加强资料的安全及管......消息队列中间件的技术选型分析
【http://cloudate.net/?p=/04/25 & &消息队列
消息队列中间件是互联网行业不可或缺的一项基本技术,在高并发消峰,非关键业务异步化,通知系统,监控数据推送等场景下是必不可少的,下文为转载文章,具体出处不详。
个人很喜欢ZeroMQ,非企业级的消息中间件,具有及低延迟-微秒级,使用简单灵活可嵌入等特性,性能报告请参考官网:
消息中间件是一种由消息传送机制或消息队列模式组成的中间件技术,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。目前业界有很多的MQ产品,像RabbitMQ、ActiveMQ、ZeroMQ等都是极好的消息中间件,但是我们在项目中该选择哪个更适合呢?本文针对以下几种消息队列产品作了评估比较:RabbitMQ、ZeroMQ、ActiveMQ、MSMQ、Redis、memcacheQ
题外话:这里我们可以先思考个小问题&Web应用中为什么会需要消息队列服务?&在高并发环境下,由于来不及同步处理,请求往往会发生堵塞(主要原因),比如说,大量的insert,update之类的请求同时到达mysql,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。是AMQP协议领先的一个实现,它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。此特性使得RabbitMQ易于使用和部署,适宜于很多场景如路由、负载均衡或消息持久化等,用消息队列只需几行代码即可搞定。但是,这使得它的可扩展性差,速度较慢,因为中央节点增加了延迟,消息封装后也比较大。如需配置RabbitMQ则需要在目标机器上安装Erlang环境。
&OMQ(ZeroMQ)号称最快的消息队列系统,尤其针对大吞吐量的需求场景。是一个非常轻量级的消息系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常可以发现它。与RabbitMQ相比,ZeroMQ支持许多高级消息场景,但是你必须实现ZeroMQ框架中的各个块(比如Socket或Device等)。
&OMQ(ZeroMQ)能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,Twitter的Storm中使用ZeroMQ作为数据流的传输。ZeroMQ非常灵活,但是你必须学习它的80页的手册(如果你要写一个分布式系统,一定要阅读它)。
ZeroMQ没有中间件架构,不需要任何服务进程和运行。事实上,你的应用程序端点扮演了这个服务角色。这让部署起来非常简单,但担心的是,你没有地方可以观察它是否有问题出现。就目前了解到的,ZeroMQ仅提供非持久性的队列。你可以在需要的地方实现自己的审计和数据恢复功能。MSMQ这是微软的产品里唯一被认为有价值的东西。如果MSMQ能证明可以应对这种任务,他们将选择使用它。关键是这个东西并不复杂,除了接收和发送,没有别的;它有一些硬性限制,比如最大消息体积是4MB。然而,通过和一些像MassTransit 或 NServiceBus这样的软件的连接,它完全可以解决这些问题。
Jafka/Kafkakafka(能将消息分散到不同的节点上)是LinkedIn于2010年12月开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
Apache ActiveMQActiveMQ居于两者(RabbitMQ & ZeroMQ)之间,类似于ZemoMQ,它可以部署于代理模式和P2P模式。类似于RabbitMQ,它易于实现高级场景,而且只需付出低消耗。ActiveMQ被誉为Java世界的中坚力量。它有很长的历史,而且被广泛的使用。它还是跨平台的,给那些非微软平台的产品提供了一个天然的集成接入点。然而,它只有跑过了MSMQ才有可能被考虑。如需配置ActiveMQ则需要在目标机器上安装Java环境。
需要注意一点的是ActiveMQ的下一代产品为Apollo,Apollo以ActiveMQ原型为基础,是一个更快、更可靠、更易于维护的消息代理工具。Apache称Apollo为最快、最强健的STOMP(Streaming Text Orientated Message Protocol,流文本定向消息协议)服务器。Apollo的特性如下:支持Stomp 1.0和Stomp 1.1协议主题和队列队列浏览器主题持久订阅镜像队列可靠的消息传递消息过期和交换消息选择器JAAS验证基于ACL的授权支持SSL/TLS,证书验证REST Management API
Redis是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
MemcacheQ持久化消息队列memcacheq(简称mcq)是一个轻量级的消息队列,MemcacheQ的特性:1 简单易用2 处理速度快3 多条队列4 并发性能好5 与memcache的协议兼容。这就意味着只要装了memcache的extension就可以了,不需要额外的插件。6 在zend framework中使用也很方便。
最终,这几个产品:1. 都有各自客户端API或支持多种编程语言;2. 都有大量的文档;3. 都提供了积极的支持。4. ActiveMQ、RabbitMQ、MSMQ、Redis都需要启动服务进程,这些都可以监控和配置,其他几个就有问题了5. 都相对提供了良好的可靠性(一致性)、扩展性和负载均衡,当然还有性能
这里就不瞎扯了,下面附上一组从网上截取的测试结果。显示的是发送和接受的每秒钟的消息数。整个过程共产生1百万条1K的消息。测试的执行是在一个Windows Vista单机上进行的。
就像你看到的,ZeroMQ和其它的不是一个级别。它的性能惊人的高。尽管这样,但这个产品不提供消息持久化、无法方便存储及监控中间过程,需要自己实现审计和数据恢复,因此在易用性和HA上不是令人满意。结论很清楚:如果你希望一个应用程序发送消息越快越好,你选择ZeroMQ。当你不太在意偶然会丢失某些消息的情况下更有价值。
本文博主未来往事更希望(也不是很希望很希望)选择使用的是Rabbit,Rabbitmq内置了ha,如果组建cluster,负载均衡之类的问题就无需担忧了同时可以设置队列镜像。但这种事情是应该做更多的测试,你最终会有一个最爱,我所听到的、读到的各种关于Rabbit的事情让我觉得它应该是最佳选择。
其他一些队列列表,这里就不再一一分析,如果需要了解更多可自行google。
> 本站内容系网友提交或本网编辑转载,其目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请及时与本网联系,我们将在第一时间删除内容!
一.简介 EQueue是一个参照RocketMQ实现的开源消息队列中间件,兼容Mono,具体可以参看作者的文章&分享一个c#写的开源分布式消息队列equeue&.项目开源地址:/tangxuehua/equeue,项目中包含了队列的全部源代码以及如何使用的示例. 二.安装EQueue Producer.Cons ...
分布式的基础:一致性哈希
路由算法的一致性hash http://www.jiacheo.org/blog/174 /articles/vQVbmai /huangxincheng/p/3708316.html
redis 和 mongodb的使用,要能自己写
bin/zkServer.sh start /home/guym/down/kafka_2.8.0-0.8.0/config/zookeeper.properties&bin/kafka-server-start.sh config/server.propertiesbin/kafka-create-topic.sh --zookeeper loca ...
本文地址: http://blog.csdn.net/wangjia184/article/details/本文演示从1个zookeeper+1个kafka broker到3个zookeeper+2个kafka broker集群的配置过程.kafka依赖于zookeeper, 首先下载zookeeper和kafka$ wget http:// ...
1.写在前面
本来一年前的时候还打算以那篇面经为契机,开始自己写博客的习惯,结果后来一拖再拖,虽然evernote里面积攒了不少东西,但是发现想整理成博客真的是太累了,毕设的时候觉得累没整理,刚到公司做mini项目觉得累没整理,后来刚进工作室熟悉环境觉得累没整理,不知不觉就一年没写博客了,囧.
为什么想起来写这样一篇文章呢?其实主要还是两 ...
Beanstalkd 是一个高性能的消息队列中间件,本博文宅鸟将介绍一下这个东东的使用. 一.先通过概念让大家了解Beanstalkd的特性和工作场景. Beanstalkd 是一个轻量级消息中间件,它最大特点是将自己定位为基于管道 (tube) 和任务 (job) 的工作队列 (work-queue): Beanstalkd 支持任务优先级 (priori ...
消息队列技术是分布式应用间交换信息的一种技术.消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走.通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置.或在继续执行前不需要等待接收程序接收此消息.
消息中间件概述 消息队列技术是分布式应用间交换信息的一种技术.消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走.通过消 ...
转自:/blog/286724 消息队列技术是分布式应用间交换信息的一种技术.消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走.通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置.或在继续执行前不需要等待接收程序接收此消息. 消息中间件概述 消息队列技术是分布式应用间交换信息的 ...如何基于RabbitMQ实现优先级队列_百度知道
如何基于RabbitMQ实现优先级队列
我有更好的答案
概述 由于种种原因,RabbitMQ到目前为止,官方还没有实现优先级队列,只实现了Consumer的优先级处理。 但是,迫于种种原因,应用层面上又需要优先级队列,因此需求来了:如何为RabbitMQ加入优先级队列特性。 查询资料后,得知RabbitMQ虽然官方没有支持此特性,但是社区已经有相关优先级队列插件了,并且这个插件被列在RabbitMQ官方网站中了。 地址如下:/community-plugins.html
插件安装 不要立刻下载这个url中的那个链接,要先根据你想要更新目标的rabbitmq版本再去另外一个地方下载相应插件,如:
会列出两大版本的插件目录(选择对应目录进入下载,否则会报错...):
插件如何安装? 进入rabbitmq安装目录,进入plugins目录,将上面这个ez文件拷贝到plugins目录中,然后运行命令...
其他类似问题
为您推荐:
优先级的相关知识
等待您来回答
下载知道APP
随时随地咨询
出门在外也不愁如何基于RabbitMQ实现优先级队列_百度知道
如何基于RabbitMQ实现优先级队列
我有更好的答案
概述 由于种种原因,RabbitMQ到目前为止,官方还没有实现优先级队列,只实现了Consumer的优先级处理。 但是,迫于种种原因,应用层面上又需要优先级队列,因此需求来了:如何为RabbitMQ加入优先级队列特性。 查询资料后,得知RabbitMQ虽然官方没有支持此特性,但是社区已经有相关优先级队列插件了,并且这个插件被列在RabbitMQ官方网站中了。 地址如下:/community-plugins.html
插件安装 不要立刻下载这个url中的那个链接,要先根据你想要更新目标的rabbitmq版本再去另外一个地方下载相应插件,如:
会列出两大版本的插件目录(选择对应目录进入下载,否则会报错...):
插件如何安装? 进入rabbitmq安装目录,进入plugins目录,将上面这个ez文件拷贝到plugins目录中,然后运行命令...
其他类似问题
为您推荐:
优先级的相关知识
等待您来回答
下载知道APP
随时随地咨询
出门在外也不愁}

我要回帖

更多关于 优先级队列实现 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信