https://www.rabbitmq.com/dlx.html

https://blog.csdn.net/weixin_44688301/article/details/116237294

https://www.yii666.com/blog/67356.html

Rbmqphp死信队列2RabbitMQ的逝世信队列详解 PHP

一、去世行列步队是什么

“去世信”是RabbitMQ中的一种机制,当你在消费时,如果行列步队里的涌现以下情形:

> 被否定确认(被谢绝),利用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。

> 在行列步队的存活韶光超过设置的生存韶光(TTL)韶光。

> 行列步队的数量已经超过最大行列步队长度。

那么该将成为“去世信”。

“去世信”会被RabbitMQ进行分外处理,如果配置了去世信行列步队信息,那么该将会被丢进去世信行列步队中,如果没有配置,则该将会被丢弃。

去世信交流机(dlx)是正常的交流机,能够在任何行列步队上被指定。
实在去世信交流机和一样平常的交流机没啥差异,只是添加了去世信交流机的属性。

如果行列步队上存在去世信, RabbitMq 会将去世信投递到设置的 去世信交流机DLX 上去 ,然后被路由到一个行列步队上,这个行列步队,便是去世信行列步队。

把稳,并不是直接声明一个公共的去世信行列步队,然后去世信就自己跑到去世信行列步队里去了。

而是为每个须要利用去世信的业务行列步队配置一个去世信交流机,这里同一个项目的去世信交流机可以共用一个,然后为每个业务行列步队分配一个单独的路由key。

有了去世信交流机和路由key后,接下来,就像配置业务行列步队一样,配置去世信行列步队,然后绑定在去世信交流机上。

也便是说,去世信行列步队并不是什么分外的行列步队,只不过是绑定在去世信交流机上的行列步队。
去世信交流机也不是什么分外的交流机,只不过是用来接管去世信的交流机,

以是可以为任何类型【Direct、Fanout、Topic】。

一样平常来说,会为每个业务行列步队分配一个独占的路由key,并对应的配置一个去世信行列步队进行监听,也便是说,一样平常会为每个主要的业务行列步队配置一个去世信行列步队。

去世信行列步队的运用

> 担保不会丢失,担保数据的完全性;

> 可以借助延时消费的特性完成特定的功能(比如订单天生但是未支付,超过30分钟自动取消的业务场景)

1. 当在一个行列步队中变成去世信之后,它能重新被发送到另一个交流机中,这个交流机便是 去世信交流机,绑定 去世信交流机(DLX Exchange) 的行列步队就称之为去世信行列步队

2. 去世信行列步队同其他的行列步队一样都是普通的行列步队。

3. 在RabbitMQ中并没有特定的"去世信行列步队"类型,而是通过配置,将实在现。

设置去世信行列步队须要设置以下2个属性

交流机 x-dead-letter-exchange

路由键 x-dead-letter-routing-key

下面开始演示去世信行列步队的案例

(PHP)

消费生产者

// 去世信行列步队publicfunctiondeadMq(){ $data='去世信行列步队测试-被谢绝去世信行列步队'; / 正常行列步队 / $this->channel->exchange_declare('my-logs','direct',false,false,false); $args=newAMQPTable([ // 信息过期韶光 'x-message-ttl' =>20000, 'x-dead-letter-exchange' =>'dead-exc', 'x-dead-letter-routing-key'=>'dead-key' ]); // 通过行列步队额外参数设置过期期间等配置 $this->channel->queue_declare('user-log-1',false,true,false,false,false,$args); $this->channel->queue_bind('user-log-1','my-logs','user'); / 去世信行列步队的配置 / // 1、声明去世信交流机 $this->channel->exchange_declare('dead-exc','direct',false,false,false); // 2、声明去世信行列步队 $this->channel->queue_declare('dead-log-queue',false,true,false,false); // 3、去世信行列步队与去世信交流机绑定 $this->channel->queue_bind('dead-log-queue','dead-exc','dead-key'); // 正常行列步队发送 $msg=newAMQPMessage($data, ['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]); $this->channel->basic_publish($msg,'my-logs','user'); $this->channel->close(); $this->mqConnection->close();}

上述代码声明两个与之干系的交流机与行列步队。
一个是正常业务的行列步队,一个是去世信行列步队。

1、正常行列步队业务逻辑中,通过额外参数配置过期韶光等,让其成为去世信行列步队;

2、通过 x-dead-letter-exchange 和 x-dead-letter-routing-key 配置信息交流机名称与路由键;

3、配置去世信行列步队,交流机是 x-dead-letter-exchange 设置的值;

4、去世信行列步队绑定去世信交流机并设置路由键。

者(正常业务)

<?phpnamespaceApp\Console\Commands;useIlluminate\Console\Command;usePhpAmqpLib\Connection\AMQPStreamConnection;//正常者classLogMqextendsCommand{ protected$signature='mq:log'; protected$description='Command description'; publicfunctionhandle():void { $connection=newAMQPStreamConnection('localhost',5672,'test','123456','sms'); $channel=$connection->channel(); $channel->exchange_declare('my-logs','direct',false,false,false); list($queueName, ,)=$channel->queue_declare("",false,false,true,false); $channel->queue_bind($queueName,'my-logs','user'); $callback=function($msg) { echo'输出: '.$msg->body.PHP_EOL; $msg->ack(); }; $channel->basic_qos(null,1,null); $channel->basic_consume($queueName,'',false,false,false,false,$callback); while($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close(); }}

这是一个正常业务的消费者。
后续我们改动这个消费者来实现去世信行列步队。

去世信行列步队消费者

<?phpnamespaceApp\Console\Commands;useIlluminate\Console\Command;usePhpAmqpLib\Connection\AMQPStreamConnection;classDeadMqLogextendsCommand{ protected$signature='mq:dead'; protected$description='Command description'; publicfunctionhandle():void { $connection=newAMQPStreamConnection('localhost',5672,'test','123456','ziruchu'); $channel=$connection->channel(); $channel->exchange_declare('dead-exc','direct',false,false,false); $channel->queue_bind('dead-log-queue','dead-exc','dead-key'); $callback=function($msg) { echo'从去世信行列步队中输出的: '.$msg->body.PHP_EOL; $msg->ack(); }; $channel->basic_qos(null,1,null); $channel->basic_consume('dead-log-queue','',false,false,false,false,$callback); while($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close(); }}

这是一个去世信行列步队消费者,当有去世信消费时就会走到这里

去世信行列步队演示

A. 谢绝吸收

1、修正 LogMq.php 正常消费者代码

// 未变革代码$callback=function($msg) { // 谢绝吸收 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);};// 未变革代码

2、效果

# 生产者发布$curlhttp://la10test.test/mq/dead# 正常消费者$phpartisanmq:log# 去世信行列步队$phpartisanmq:dead从去世信行列步队中输出的:去世信行列步队测试-被谢绝去世信行列步队从去世信行列步队中输出的:去世信行列步队测试-被谢绝去世信行列步队

B.过期

利用过期可以实现延迟行列步队的效果

1、修正 LogMq.php 正常消费者代码

$callback=function($msg) { // 就寝 23 秒, sleep(23); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);};

2、效果

# 生产者发布$curlhttp://la10test.test/mq/dead# 正常消费者$phpartisanmq:log# 去世信行列步队$phpartisanmq:dead从去世信行列步队中输出的:去世信行列步队测试-被谢绝去世信行列步队从去世信行列步队中输出的:去世信行列步队测试-被谢绝去世信行列步队从去世信行列步队中输出的:去世信行列步队测试-被谢绝去世信行列步队从去世信行列步队中输出的:去世信行列步队测试-被谢绝去世信行列步队

C. 行列步队长度达到最大长度

1、修正生产者代码配置(deadMq 方法)

$args=newAMQPTable([ 'x-max-length' =>5, 'x-overflow' =>'reject-publish-dlx', 'x-dead-letter-exchange' =>'dead-exc', 'x-dead-letter-routing-key'=>'dead-key']);

------------------------------------------------------------------

(Go版本)

https://www.jianshu.com/p/6255b17fed68

Golang处理去世信行列步队

1.定义去世信交流机

当成普通交流机定义就行。

// 声明交流机 err = ch.ExchangeDeclare( "tizi365.dead", // 交流机名字 "topic", // 交流机类型 true, // 是否持久化 false, false, false, nil, )

2.定义去世信行列步队

当成普通行列步队定义就行

// 声明行列步队 q, err := ch.QueueDeclare( "", // 行列步队名字,不填则随机天生一个 false, // 是否持久化行列步队 false, true, false, nil, ) // 行列步队绑定去世信交流机 err = ch.QueueBind( q.Name, // 行列步队名 "#", // 路由参数,# 井号的意思就匹配所有路由参数,意思便是吸收所有去世信 "tizi365.dead", // 去世信交流机名字 false, nil)

提示:去世信行列步队就当成普通行列步队用便是了

3.定义去世信消费者

// 创建消费者 msgs, err := ch.Consume( q.Name, // 引用前面的去世信行列步队名 "", // 消费者名字,不填自动天生一个 true, // 自动向行列步队确认已经处理 false, false, false, nil, ) // 循环消费去世信行列步队中的 for d := range msgs { log.Printf("吸收去世信=%s", d.Body) }

4.将去世信交流机绑定到指定正常行列步队

// 行列步队属性 props := make(map[string]interface{}) // 设置正常行列步队的去世信交流机 props["x-dead-letter-exchange"] = "tizi365.dead" // 可选: 设置去世信投递到去世信交流机的时候的路由参数,如果不设置,则利用原来自带的路由参数 // props["x-dead-letter-routing-key"] = "www.tizi365.com" q, err := ch.QueueDeclare( "normo.hello", // 行列步队名 true, // 是否持久化 false, false, false, props, // 设置行列步队属性 )

这样,只要tizi365.demo.hello行列步队的变成去世信的话,会被转发到tizi365.dead去世信交流机。
作者:一位师长西席_链接:https://www.jianshu.com/p/6255b17fed68来源:简书著作权归作者所有。
商业转载请联系作者得到授权,非商业转载请注明出处。

https://blog.csdn.net/weixin_43495948/article/details/129249740

实在整体的思路便是分别创建一个normal_exchange、dead_exchange、normal_queue、dead_queue,

然后将normal_exchange与normal_queue进行绑定,将dead_exchange与dead_queue进行绑定,

这里比较关键的一个点在于说如何将normal_queue与dead_exchange进行绑定,这样才能将缺点的通报过来。

--------------------------------------------------------------------------------

https://blog.csdn.net/weixin_43495948/article/details/129249740

实在整体的思路便是分别创建一个normal_exchange、dead_exchange、normal_queue、dead_queue,

然后将normal_exchange与normal_queue进行绑定,将dead_exchange与dead_queue进行绑定,

这里比较关键的一个点在于说如何将normal_queue与dead_exchange进行绑定,这样才能将缺点的通报过来。

// 声明一个normal行列步队_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{//"x-message-ttl": 5000, // 指定过期韶光//"x-max-length": 6,// 指定长度。
超过这个长度的会发送到dead_exchange中"x-dead-letter-exchange": constant.DeadExchange, // 指定去世信交流机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定去世信routing-key})

消费者超出韶光未应答

produce.go

package day07import ("context"amqp "github.com/rabbitmq/amqp091-go""strconv""time""v1/utils")func Produce() {// 获取信道ch := utils.GetChannel()// 声明一个交流机err := ch.ExchangeDeclare("normal_exchange",amqp.ExchangeDirect,true,false,false,false,nil)utils.FailOnError(err, "Failed to declare a exchange")ctx, cancer := context.WithTimeout(context.Background(), 5time.Second)defer cancer()// 发送了10条for i := 0; i < 10; i++ {msg := "Info:" + strconv.Itoa(i)ch.PublishWithContext(ctx,"normal_exchange","normal_key",false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(msg),})}}

consumer1.go

package day07import (amqp "github.com/rabbitmq/amqp091-go""log""v1/utils")type Constant struct {NormalExchange stringDeadExchange stringNormalQueue stringDeadQueue stringNormalRoutingKey stringDeadRoutingKey string}func Consumer1() {// 获取连接ch := utils.GetChannel()// 创建一个变量常量constant := Constant{NormalExchange: "normal_exchange",DeadExchange: "dead_exchange",NormalQueue: "normal_queue",DeadQueue: "dead_queue",NormalRoutingKey: "normal_key",DeadRoutingKey: "dead_key",}// 声明normal交流机err := ch.ExchangeDeclare(constant.NormalExchange,amqp.ExchangeDirect,true,false,false,false,nil,)utils.FailOnError(err, "Failed to declare a normal exchange")// 声明一个dead交流机err = ch.ExchangeDeclare(constant.DeadExchange,amqp.ExchangeDirect,true,false,false,false,nil,)utils.FailOnError(err, "Failed to declare a dead exchange")// 声明一个normal行列步队_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{"x-message-ttl": 5000, // 指定过期韶光//"x-max-length": 6,"x-dead-letter-exchange": constant.DeadExchange, // 指定去世信交流机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定去世信routing-key})utils.FailOnError(err, "Failed to declare a normal queue") // 声明一个dead行列步队:把稳不要给去世信行列步队设置韶光,否者去世信行列步队里面的信息会再次过期_, err = ch.QueueDeclare(constant.DeadQueue,true,false,false,false,nil)utils.FailOnError(err, "Failed to declare a dead queue")// 将normal_exchange与normal_queue进行绑定err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue") // 将dead_exchange与dead_queue进行绑定err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")// 消费msgs, err := ch.Consume(constant.NormalQueue,"",false, // 这个地方一定要关闭自动应答false,false,false,nil)utils.FailOnError(err, "Failed to consume in Consumer1")var forever chan struct{}go func() {for d := range msgs {if err := d.Reject(false); err != nil {utils.FailOnError(err, "Failed to Reject a message")}}}()log.Printf(" [] Waiting for logs. To exit press CTRL+C")<-forever}

consumer2.go

package day07import (amqp "github.com/rabbitmq/amqp091-go""log""v1/utils")func Consumer2() {// 拿取信道ch := utils.GetChannel()// 声明一个交流机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil)utils.FailOnError(err, "Failed to Declare a exchange")// 吸收的应答msgs, err := ch.Consume("dead_queue","",false,false,false,false,nil,)var forever chan struct{}go func() {for d := range msgs {log.Printf("[x] %s", d.Body)// 开启手动应答ßd.Ack(false)}}()log.Printf(" [] Waiting for logs. To exit press CTRL+C")<-forever}

限定一定的长度

只须要改变consumer1.go中的对normal_queue的声明

// 声明一个normal行列步队_, err = ch.QueueDeclare(constant.NormalQueue,true,false,false,false,amqp.Table{//"x-message-ttl": 5000, // 指定过期韶光"x-max-length": 6,"x-dead-letter-exchange": constant.DeadExchange, // 指定去世信交流机"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定去世信routing-key})

消费者谢绝的回到去世信行列步队中

这里须要完成两点事情

事情1:须要在consumer1中作出谢绝的操作

go func() {for d := range msgs {if err := d.Reject(false); err != nil {utils.FailOnError(err, "Failed to Reject a message")}}}()

事情2:如果你consume的时候开启了自动应答一定要关闭

// 消费msgs, err := ch.Consume(constant.NormalQueue,"",false, // 这个地方一定要关闭自动应答false,false,false,nil)