-
延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列@Configuration@Datapublic class RabbitMQConfig { /** * 交换机 */ private String orderEventExchange="order.event.exchange"; /** * 延迟队列, 不能被监听消费 */ private String orderCloseDelayQueue="order.close.delay.queue"; /** * 关单队列, 延迟队列的消息过期后转发的队列,被消费者监听 */ private String orderCloseQueue="order.close.queue"; /** * 进入延迟队列的路由key */ private String orderCloseDelayRoutingKey="order.close.delay.routing.key"; /** * 进入死信队列的路由key,消息过期进入死信队列的key */ private String orderCloseRoutingKey="order.close.routing.key"; /** * 过期时间 毫秒,临时改为1分钟定时关单 */ private Integer ttl=1000*60; /** * 消息转换器 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 创建交换机 Topic类型,也可以用dirct路由 * 一般一个微服务一个交换机 * @return */ @Bean public Exchange orderEventExchange(){ return new TopicExchange(orderEventExchange,true,false); } /** * 延迟队列 */ @Bean public Queue orderCloseDelayQueue(){ Map<String,Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange",orderEventExchange); args.put("x-dead-letter-routing-key",orderCloseRoutingKey); args.put("x-message-ttl",ttl); return new Queue(orderCloseDelayQueue,true,false,false,args); } /** * 死信队列,普通队列,用于被监听 */ @Bean public Queue orderCloseQueue(){ return new Queue(orderCloseQueue,true,false,false); } /** * 第一个队列,即延迟队列的绑定关系建立 * @return */ @Bean public Binding orderCloseDelayBinding(){ return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null); } /** * 死信队列绑定关系建立 * @return */ @Bean public Binding orderCloseBinding(){ return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null); }}异常队列配置类public class RabbitMQErrorConfig { @Autowired RabbitTemplate rabbitTemplate; /** * 异常交换机 */ private String orderErrorExchange = "order.error.exchange"; /** * 异常队列 */ private String orderErrorQueue = "order.error.queue"; /** * 异常routing.key */ private String orderErrorRoutingKey = "order.error.routing.key"; /** * 异常交换机 * @return */ @Bean public TopicExchange errorTopicExchange(){ return new TopicExchange(orderErrorExchange,true,false); } /** * 异常队列 * @return */ @Bean public Queue errorQueue(){ return new Queue(orderErrorQueue,true); } /** * 队列交换机进行绑定 * @param errorQueue * @return */ @Bean public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){ return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(orderErrorRoutingKey); } /** * 配置 RepublishMessageRecoverer * 用途:消息重试一定次数后,用特定的routingKey转发到指定的交换机中,方便后续排查和告警 * * 顶层是 MessageRecoverer接口,多个实现类 * * @return */ @Bean public MessageRecoverer messageRecoverer(){ return new RepublishMessageRecoverer(rabbitTemplate,orderErrorExchange,orderErrorRoutingKey); }}转载自https://www.cnblogs.com/xietingwei/p/17604829.html
-
RabbitMQ安装 PS:系统版本:linux-CentOS_7.8且需要部署Dockerdocker run -d --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3.8.15-management#网络安全组记得开放端口4369 erlang 发现口5672 client 端通信口15672 管理界面 ui 端口25672 server 间内部通信口访问管理界面ip:15672依赖引入<!--引入AMQP--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>application.xmlspring: ##----------rabbit配置-------------- rabbitmq: host: 192.168.75.146 port: 5672 virtual-host: dev username: admin password: password listener: simple: #消息确认方式,manual(手动ack) 和auto(自动ack) 。消息队列重试到达次数进入异常交换机--为实现,该策略要为auto acknowledge-mode: auto retry: #开启重试,消费者代码不能添加try catch捕获不往外抛异常 enabled: true #最大重试次数 max-attempts: 4 # 重试消息的时间间隔,5秒 max-interval: 5000RabbitMQ配置文件 (一个交换机,两个队列,routingKey匹配规则适用于两队列)@Configuration@Datapublic class RabbitMQConfig { /** * 交换机 */ private String shortLinkEventExchange="short_link.event.exchange"; /** * 创建交换机 Topic类型 * 一般一个微服务一个交换机 * @return */ @Bean public Exchange shortLinkEventExchange(){ return new TopicExchange(shortLinkEventExchange,true,false); //return new FanoutExchange(shortLinkEventExchange,true,false); } //新增短链相关配置==================================== /** * 新增短链 队列 */ private String shortLinkAddLinkQueue="short_link.add.link.queue"; /** * 新增短链映射 队列 */ private String shortLinkAddMappingQueue="short_link.add.mapping.queue"; /** * 新增短链具体的routingKey,【发送消息使用】 */ private String shortLinkAddRoutingKey="short_link.add.link.mapping.routing.key"; /** * topic类型的binding key,用于绑定队列和交换机,是用于 link 消费者 */ private String shortLinkAddLinkBindingKey="short_link.add.link.*.routing.key"; /** * topic类型的binding key,用于绑定队列和交换机,是用于 mapping 消费者 */ private String shortLinkAddMappingBindingKey="short_link.add.*.mapping.routing.key"; /** * 新增短链api队列和交换机的绑定关系建立 */ @Bean public Binding shortLinkAddApiBinding(){ return new Binding(shortLinkAddLinkQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddLinkBindingKey,null); } /** * 新增短链mapping队列和交换机的绑定关系建立 */ @Bean public Binding shortLinkAddMappingBinding(){ return new Binding(shortLinkAddMappingQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddMappingBindingKey,null); } /** * 新增短链Link普通队列,用于被监听 */ @Bean public Queue shortLinkAddLinkQueue(){ return new Queue(shortLinkAddLinkQueue,true,false,false); } /** * 新增短链mapping 普通队列,用于被监听 */ @Bean public Queue shortLinkAddMappingQueue(){ return new Queue(shortLinkAddMappingQueue,true,false,false); }对应的两个消费者@Component@Slf4j//@RabbitListener(queues = "short_link.add.link.queue")@RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") })public class ShortLinkAddLinkMQListener { @Autowired private ShortLinkService shortLinkService; @RabbitHandler public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException { log.info("ShortLinkAddLinkMQListener message:{}",message); try{ eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_LINK.name()); shortLinkService.handlerAddShortLink(eventMessage); }catch (Exception e){ // 处理业务异常,等其他操作 log.error("消费失败:{}",eventMessage); throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION); } log.info("消费成功:{}",eventMessage); //确认消息消费成功// channel.basicAck(tag,false); }}@Component@Slf4j//@RabbitListener(queues = "short_link.add.mapping.queue")@RabbitListener(queuesToDeclare = { @Queue("short_link.add.mapping.queue") })public class ShortLinkAddMappingMQListener { @Autowired private ShortLinkService shortLinkService; @RabbitHandler public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException { log.info("ShortLinkAddMappingMQListener message:{}",message); try{ eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_MAPPING.name()); shortLinkService.handlerAddShortLink(eventMessage); }catch (Exception e){ // 处理业务异常,等其他操作 log.error("消费失败:{}",eventMessage); throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION); } log.info("消费成功:{}",eventMessage); //确认消息消费成功// channel.basicAck(tag,false); }}实战,这里直接在controller层中编码。(应该在Service实现)@RestController@RequestMapping("/api/link/v1")public class ShortLinkController { @Autowired private ShortLinkService shortLinkService; @Autowired private RabbitTemplate rabbitTemplate; /** * 新增短链 * @param shortLinkAddRequest * @return */ @PostMapping("add") public JsonData createShortLink(@RequestBody ShortLinkAddRequest shortLinkAddRequest){ //参数:交换机、匹配规则Key、信息对象 EventMessage eventMessage = EventMessage.builder().[设置对象各字段信息] .build(); //生成MQ。 rabbitTemplate.convertAndSend(rabbitMQConfig.getShortLinkEventExchange(), rabbitMQConfig.getShortLinkAddRoutingKey(), eventMessage); return jsonData; } }转载自https://www.cnblogs.com/xietingwei/p/17594180.html
-
我们都知道,消息从生产端到消费端消费要经过3个步骤:生产端发送消息到RabbitMQ;RabbitMQ发送消息到消费端;消费端消费这条消息;这3个步骤中的每一步都有可能导致消息丢失,消息丢失不可怕,可怕的是丢失了我们还不知道,所以要有一些措施来保证系统的可靠性。这里的可靠并不是一定就100%不丢失了,磁盘损坏,机房爆炸等等都能导致数据丢失,当然这种都是极小概率发生,能做到99.999999%消息不丢失,就是可靠的了。下面来具体分析一下问题以及解决方案。微信搜索公众号:Java后端编程,回复:java 领取资料 。生产端可靠性投递生产端可靠性投递,即生产端要确保将消息正确投递到RabbitMQ中。生产端投递的消息丢失的原因有很多,比如消息在网络传输的过程中发生网络故障消息丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失,而我们根本不知道发生了什么。针对以上情况,RabbitMQ本身提供了一些机制。事务消息机制事务消息机制由于会严重降低性能,所以一般不采用这种方法,我就不介绍了,而采用另一种轻量级的解决方案:confirm消息确认机制。confirm消息确认机制什么是confirm消息确认机制?顾名思义,就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。通过下面这句代码来开启确认模式:channel.confirmSelect();// 开启发送方确认模式复制然后异步监听确认和未确认的消息:channel.addConfirmListener(new ConfirmListener() { //消息正确到达broker @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("已收到消息"); //做一些其他处理 } //RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未确认消息,标识:" + deliveryTag); //做一些其他处理,比如消息重发等 }});复制复制复制复制复制复制复制复制复制复制复制复制复制复制这样就可以让生产端感知到消息是否投递到RabbitMQ中了,当然这样还不够,稍后我会说一下极端情况。消息持久化那消息持久化呢?我们知道,RabbitMQ收到消息后将这个消息暂时存在了内存中,那这就会有个问题,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。那如何持久化呢?message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。所有需要给exchange、queue和message都进行持久化:exchange持久化://第三个参数true表示这个exchange持久化channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);复制复制queue持久化://第二个参数true表示这个queue持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);复制复制message持久化://第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));复制复制这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。到此,RabbitMQ提供的几种机制都介绍完了,但这样还不足以保证消息可靠性投递RabbitMQ中,上面我也提到了会有极端情况,比如RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,就不好做接下来的处理。所以除了RabbitMQ提供的一些机制外,我们自己也要做一些消息补偿机制,以应对一些极端情况。接下来我就介绍其中的一种解决方案——消息入库。消息入库消息入库,顾名思义就是将要发送的消息保存到数据库中。首先发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认。在生产端收到确认后将status设为1,表示RabbitMQ已收到消息。这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status=0的消息,所以给个时间)还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。这样消息就可以可靠性投递到RabbitMQ中了,而生产端也可以感知到了。消费端消息不丢失既然已经可以让生产端100%可靠性投递到RabbitMQ了,那接下来就该看看消费端的了,如何让消费端不丢失消息。默认情况下,以下3种情况会导致消息丢失:在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。其实,上述3中情况导致消息丢失归根结底是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以就需要将自动ack机制改为手动ack机制。消费端手动确认消息:DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { //接收到消息,做处理 //手动确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { //出错处理,这里可以让消息重回队列重新发送或直接丢弃消息 }};//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});复制复制复制复制复制复制复制复制复制复制复制这样,当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息;一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息。如果RabbitMQ一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机(RabbitMQ会自己感知到),则RabbitMQ会安排该消息重新进入队列(放在队列头部),等待投递给下一个消费者,当然也有能还是原来的那个消费端,当然消费端也需要确保幂等性。好了,到此从生产端到RabbitMQ再到消费端的全链路,就可以保证数据的不丢失。转自:https://mp.weixin.qq.com/s/PkL88EHtaxeo1HF2Xtno7g
-
1、消息整体处理过程这里我们将消息的整体处理阶段分为3个阶段进行分析:Producer发送消息阶段。Broker处理消息阶段。Consumer消费消息阶段。Producer发送消息阶段发送消息阶段涉及到Producer到broker的网络通信,因此丢失消息的几率一定会有,那RocketMQ在此阶段用了哪些手段保证消息不丢失了(或者说降低丢失的可能性)。手段一:提供SYNC的发送消息方式,等待broker处理结果。RocketMQ提供了3种发送消息方式,分别是:同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer只负责把请求发出去,而不处理响应结果。我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式。手段二:发送消息如果失败或者超时,则重新发送。发送重试源码如下,本质其实就是一个for循环,当发送消息发生异常的时候重新循环发送。默认重试3次,重试次数可以通过producer指定。手段三:broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。如果broker只有一个节点,则broker宕机了,即使producer有重试机制,也没用,因此利用多主模式,当某台broker宕机了,换一台broker进行投递。总结producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,同步等待发送结果,利用同步发送+重试机制+多个master节点,尽可能减小消息丢失的可能性。Broker处理消息阶段手段四:提供同步刷盘的策略public enum FlushDiskType { SYNC_FLUSH, //同步刷盘 ASYNC_FLUSH//异步刷盘(默认) }我们知道,当消息投递到broker之后,会先存到page cache,然后根据broker设置的刷盘策略是否立即刷盘,也就是如果刷盘策略为异步,broker并不会等待消息落盘就会返回producer成功,也就是说当broker所在的服务器突然宕机,则会丢失部分页的消息。手段五:提供主从模式,同时主从支持同步双写即使broker设置了同步刷盘,如果主broker磁盘损坏,也是会导致消息丢失。因此可以给broker指定slave,同时设置master为SYNC_MASTER,然后将slave设置为同步刷盘策略。此模式下,producer每发送一条消息,都会等消息投递到master和slave都落盘成功了,broker才会当作消息投递成功,保证消息不丢失。总结在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略+主从的方式解决丢失消息的可能。Consumer消费消息阶段手段六:consumer默认提供的是At least Once机制从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也不能理解为消息绝对的可靠。因此RockerMQ默认提供了At least Once机制保证消息可靠消费。何为At least Once?Consumer先pull 消息到本地,消费完成后,才向服务器返回ack。通常消费消息的ack机制一般分为两种思路:先提交后消费;先消费,消费成功后再提交;思路一可以解决重复消费的问题但是会丢失消息,因此Rocketmq默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。手段七:消费消息重试机制当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此RocketMQ本身提供了重新消费消息的能力。总结consumer端要保证消费消息的可靠性,主要通过At least Once+消费重试机制保证。2、如何保证消息不被重复消费回答这个问题,首先你别听到重复消费这个事儿,就一无所知吧,你先大概说一说可能会有哪些重复消费的问题。首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。有这么个场景。数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1/2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。所以第二个问题来了,怎么保证消息队列消费的幂等性?其实还是得结合业务来思考,我这里给几个思路:比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
-
4.路由模式(精确匹配)路由模式(Routing)的特点:该模式的交换机为direct,意思为定向发送,精准匹配。队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key) 消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息。生产者将消息发送到direct交换器,同时生产者在发送消息的时候会指定一个路由key,而在绑定队列和交换器的时候又会指定一个路由key,那么消息只会发送到相应routing key相同的队列,然后由监听该队列的消费者进行消费消息。模型如下图所示:5.Topic模式(模糊匹配)Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。但是Topic类型的Exchange可以让队列在绑定Routing key的时候使用通配符进行匹配,也就是模糊匹配,这样与之前的模式比起来,它更加的灵活!Topic主题模式的Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如:log.insert ,它的通配符规则如下:*:匹配不多不少恰好1个词#:匹配0或多个单词图解:红色Queue:绑定的是usa.# ,因此凡是以usa.开头的routing key都会被匹配到黄色Queue:绑定的是#.news ,因此凡是以.news结尾的 routing key都会被匹配总结 Topic主题模式需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。Topic主题模式可以实现Publish/Subscribe发布与订阅模式 和Routing路由模式的功能;只是Topic在配置routing key 的时候可以使用通配符,所以更加灵活。参考:cid:link_0
-
1.简单一对一模式该模式是个一对一模式,只有一个生产者Producer(用于生产消息),一个队列Queue(用于存储消息),一个消费者Consumer (用于接收消息)。使用的是默认交换机(AMQP default)。简单模式的不足之处:该模式是一对一,一个生产者向一个队列中发送消息,一个消费者从绑定的队列中获取消息,这样耦合性过高,如果有多个消费者想消费队列中信息就无法实现了。2.工作模式工作模式也被称为任务模型(Task Queues)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行。这种模式只有一个生产者Producer,一个用于存储消息的队列 Queue、多个消费者Consumer用于接收消息。工作队列模式的特点有三:一个生产者,一个队列,多个消费者同时竞争消息任务量过高时可以提高工作效率消费者获得的消息是无序的3.发布订阅模式发布订阅模式(Publish/Subscribe):该模式需要涉及到交换机了,也可以称它为广播模式,消息通过交换机广播到所有与其绑定的队列中。一个消费者将消息首先发送到交换机上(这里的交换机类型为fanout),然后交换机绑定到多个队列,这样每个发到fanout类型交换器的消息会被分发到所有的队列中,最后被监听该队列的消费者所接收并消费。如下图所示: 四种交换机的类型:direct(直连):消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。fanout(广播):把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。topic(主题):通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。匹配规则:① RoutingKey 和 BindingKey 为一个 点号 '.' 分隔的字符串。比如: stock.usd.nyse;可以放任意的key在routing_key中,当然最长不能超过255 bytes。② BindingKey可使用 * 和 # 用于做模糊匹配:*匹配一个单词,#匹配0个或者多个单词;headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外headers交换器和direct交换器完全一致,但性能差很多,目前几乎用不到了。发布订阅模式引入了交换机的概念,所以相对前面的类型更加灵活广泛一些。这种模式需要设置类型为fanout的交换机,并且将交换机和队列进行绑定,当消息发送到交换机后,交换机会将消息发送到所有绑定的队列,最后被监听该队列的消费者所接收并消费。发布订阅模式也可以叫广播模式,不需要RoutingKey的判断。发布订阅模式与工作队列模式的区别:工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。参考:cid:link_0
-
1.添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.配置propertiesspring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin3.定义RabbitConfigpackage com.example.sendmsg; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue queue(){ return new Queue("ch1");//定义队列 } }4.定义消息接收方法package com.example.sendmsg; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class MQReceiver { @RabbitListener(queues = "ch1",ackMode = "MANUAL")//手动应答 @RabbitHandler public void recv(Message msg, Channel channel) throws IOException, InterruptedException { System.out.println("recv1 msg:"+msg.getMessageProperties().getDeliveryTag()+" "+new String(msg.getBody())); channel.basicAck(msg.getMessageProperties().getDeliveryTag(),true); } @RabbitListener(queues = "ch1",ackMode = "AUTO")//自动应答 @RabbitHandler public void recv2(Message msg, Channel channel) throws IOException, InterruptedException { System.out.println("recv2 msg:"+msg.getMessageProperties().getDeliveryTag()+" "+new String(msg.getBody())); } }5.定义消息发送方法package com.example.sendmsg.controller; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; @Controller public class SendController { @Resource private AmqpTemplate amqpTemplate; @GetMapping("/testsend") @ResponseBody public String testSend(){ amqpTemplate.convertAndSend("ch1","hello mq"); return "send ok"; } }
-
那位大神,给解决一下,现在是rabbitmq消息调用了手动回执,但是消息还是重回队列,又被消费了一次。该如何解决。
-
按照这个帖子https://bbs.huaweicloud.com/forum/thread-96478-1-1.html 移植rabbitmq后出现报错:Error:{error,{missing_dependencies,[crypto,ssl], [amqp10_client,cowboy,cowlib,rabbitmq_aws, rabbitmq_management,rabbitmq_management_agent, rabbitmq_shovel,rabbitmq_trust_store]}} Stacktrace [{rabbit_plugins,ensure_dependencies,1, [{file,"src/rabbit_plugins.erl"},{line,263}]}, {'Elixir.RabbitMQ.CLI.Plugins.Helpers',list,1, [{file,"lib/rabbitmq/cli/plugins/plugins_helpers.ex"}, {line,49}]}, {'Elixir.RabbitMQ.CLI.Plugins.Helpers',validate_plugins,2, [{file,"lib/rabbitmq/cli/plugins/plugins_helpers.ex"}, {line,121}]}, {'Elixir.RabbitMQ.CLI.Plugins.Commands.EnableCommand',run,2, [{file,"lib/rabbitmq/cli/plugins/commands/enable_command.ex"}, {line,74}]}, {'Elixir.RabbitMQCtl',maybe_run_command,3, [{file,"lib/rabbitmqctl.ex"},{line,106}]}, {'Elixir.RabbitMQCtl','-exec_command/2-fun-0-',5, [{file,"lib/rabbitmqctl.ex"},{line,73}]}, {'Elixir.RabbitMQCtl',main,1, [{file,"lib/rabbitmqctl.ex"},{line,36}]}, {'Elixir.Kernel.CLI','-exec_fun/2-fun-0-',3, [{file,"lib/kernel/cli.ex"},{line,105}]}]查了以后发现是缺少erlang-openssl但是没找到kylin可用的,想请教各位大佬知道怎么解决这个问题吗?谢谢
-
如题https://repo.huaweicloud.com/rabbitmq-server/
-
大家好,本篇文章主要讲的是docker安装RabbitMQ详细步骤,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览目录• 1.查找镜像 • 2.下载RabbitMQ镜像 • 3.创建并启动RabbitMQ容器 • 4.进入容器交互页面 • 5.下载插件 • 6.阿里云控制台 开放端口号 • 7.登录 1.查找镜像1docker search rabbitmq2.下载RabbitMQ镜像直接下载最新的镜像如果需要下载其他版本 自行Docker官网查看并添加版本号再下载12345# 下载镜像docker pull rabbitmq #查看镜像docker images3.创建并启动RabbitMQ容器第一个-p :用于页面访问使用第二个-p :用于生产和消费端使用(也就是再代码里使用)1docker run -id --hostname myrabbit --name rabbitmq1 -p 15672:15672 -p 5672:5672 rabbitmq4.进入容器交互页面1docker exec -it rabbitmq1 /bin/bash5.下载插件1rabbitmq-plugins enable rabbitmq_management6.阿里云控制台 开放端口号注意:开放两个端口号7.登录ip+端口号访问账号密码均为:guest到此这篇关于docker安装RabbitMQ详细步骤的文章就介绍到这了转载自https://www.jb51.net/article/233585.htm
-
鲲鹏服务器部署RabbitMQ环境:鲲鹏ECS + openEuler 20.03 安装:1、配置安装环境:Erlang20.3及以上版本安装依赖包:yum install libtool libtool-ltdl-devel libevent-devel lua ncurses-devel openssl-devel flex获取源码包并编译安装:cd /homewget http://erlang.org/download/otp_src_20.3.tar.gztar -zxvf otp_src_20.3.tar.gzcd otp_src_20.3./configure && make && make install运行验证:输入 erl ,回显信息如下即为成功2、安装RabbitMQcd /homewget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-generic-unix-3.6.10.tar.xztar xvf rabbitmq-server-generic-unix-3.6.10.tar.xzvim /etc/profile ,在文件末尾增加环境变量 : export PATH=$PATH:/home/rabbitmq_server-3.6.10/sbinsource /etc/profile3、运行及验证启动服务 rabbitmq-server -detached查看运行状态 rabbitmqctl status页面访问验证:rabbitmq-plugins enable rabbitmq_managementrabbitmqctl stoprabbitmq-server -detached访问地址: http://ip:15672/(如无法访问可检查ECS对应安全组策略是否开放15672端口)登录验证:默认账号:guest/guest 如无法登陆,可尝试新建用户登录操作:rabbitmqctl add_user zzz 123456 ----新建用户,用户名zzz 密码123456rabbitmqctl set_user_tags zzz administrator ----为新用户zzz 设置管理员权限rabbitmqctl list_users ----可查看用户列表使用新用户zzz/123456,登录验证
-
docker环境部署RabbitMQ环境:CCE服务 查询rabbitMQ镜像docker search rabbitmq:management拉取镜像docker pull rabbitmq:management查看镜像列表:docker images启动RabbitMQdocker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management页面访问验证:docker ps ----获取容器IDdocker exec -it CONTAINERID /bin/bash ----在运行的容器中执行命令rabbitmq-plugins enable rabbitmq_management访问验证:地址: http://ip:15672/ 默认账号密码: guest/guest(如无法访问可检查ECS对应安全组策略是否开放15672端口或重启容器重试)
-
【功能模块】【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
-
我想问一下,rabbitmq在publish时是不是可以不用定义和绑定队列 ?下面这个例子好像就没有绑定队列const bson = require('bson')var amqp = require('amqplib/callback_api');var uploadDataExchange = "iothub.events.upload_data"var updateStatusExchange = "iothub.events.update_status"var commandRespExchange = "iothub.events.cmd_resp"var dataRequestRespExchange = "iothub.events.data_request"var currentChannel = null;amqp.connect(process.env.RABBITMQ_URL, function (error0, connection) { if (error0) { console.log(error0); } else { connection.createChannel(function (error1, channel) { if (error1) { console.log(error1) } else { currentChannel = channel; channel.assertExchange(uploadDataExchange, 'direct', {durable: true}) channel.assertExchange(updateStatusExchange, 'direct', {durable: true}) channel.assertExchange(commandRespExchange, 'direct', {durable: true}) } }); }});static notifyUpdateStatus({productName, deviceName, deviceStatus}){ var data = bson.serialize({ device_name: deviceName, device_status: deviceStatus }) if(currentChannel != null) { currentChannel.publish(updateStatusExchange, productName, data, { persistent: true }) } }
推荐直播
-
OpenHarmony应用开发之网络数据请求与数据解析
2025/01/16 周四 19:00-20:30
华为开发者布道师、南京师范大学泰州学院副教授,硕士研究生导师,开放原子教育银牌认证讲师
科技浪潮中,鸿蒙生态强势崛起,OpenHarmony开启智能终端无限可能。当下,其原生应用开发适配潜力巨大,终端设备已广泛融入生活各场景,从家居到办公、穿戴至车载。 现在,机会敲门!我们的直播聚焦OpenHarmony关键的网络数据请求与解析,抛开晦涩理论,用真实案例带你掌握数据访问接口,轻松应对复杂网络请求、精准解析Json与Xml数据。参与直播,为开发鸿蒙App夯实基础,抢占科技新高地,别错过!
回顾中 -
Ascend C高层API设计原理与实现系列
2025/01/17 周五 15:30-17:00
Ascend C 技术专家
以LayerNorm算子开发为例,讲解开箱即用的Ascend C高层API
回顾中
热门标签