jae-park-7GX5aICb5i4-unsplash

本日小黑哥来跟大家先容一下 Redis 发布/订阅功能。

大概有的小伙伴对这个功能比较陌生,不太清楚这个功能是干什么的,没紧要小黑哥先来举个例子。

phpredis订阅的作用Redis 宣布订阅小功效年夜用途真没那么废材 AJAX

假设我们有这么一个业务场景,在网站下单支付往后,须要关照库存做事进行发货处理。

上面业务实现不难,我们只要让库存做事供应给干系的给口,下单支付之后只要调用库存做事即可。

后面如果又有新的业务,比如说积分做事,他须要获取下单支付的结果,然后增加用户的积分。

这个实现也不难,让积分做事同样供应一个接口,下单支付之后只要调用库存做事即可。

如果就两个业务须要获取下单支付的结果,那也还好,程序改造也快。
可是随着业务不断的发展,越来越多的新业务说是要下单支付的结果。

这时我们会创造上面这样的系统架构存在很多问题:

第一,下单支付业务与其他业务重度耦合,每当有个新业务须要支付结果,就须要改动下单支付的业务。

第二,如果调用业务过多,会导致下单支付接口相应韶光变长。
其余,如果有任一下贱接口相应变慢,就会同步导致下单支付接口相应也变长。

第三,如果任一下贱接口失落败,可能导致数据不一致的情形。
比如说下图,先调用 A,成功之后再调用 B,末了再调用 C。

如果在调用 B 接口的发生非常,此时可能就导致下单支付接口返回失落败,但是此时 A 接口实在已经调用成功,这就代表它内部已经处理下单支付成功的结果。

这样就会导致 A,B,C 三个下贱接口,A 获取成功获取支付结果,但是 B,C 没有拿到,导致三者系统数据不一致的情形。

实在我们仔细想一下,对付下单支付业务来讲,它实在不须要关心下贱调用结果,只要有某种机制关照能关照到他们就可以了。

讲到这里,这就须要引入本日须要先容发布订阅机制。

Redis 发布与订阅

Redis 供应了基于「发布/订阅」模式的机制,在这种模式下,发布者与订阅者不须要进行直接通信。

如上图所示,发布者只须要想指定的频道发布,订阅该频道的每个客户端都可以接管到到这个。

利用 Redis 发布订阅这种机制,对付上面业务,下单支付业务只须要向支付结果这个频道发送,其他下贱业务订阅支付结果这个频道,就能收相应,然后做出业务处理即可。

这样就可以解耦系统高下游之间调用关系。

接下来我们来看下,我们来看下如何利用 Redis 发布订阅功能。

Redis 中供应了一组命令,可以用于发布,订阅频道,取消订阅以及按照模式订阅。

首先我们来看下如何发布一条,实在很大略只要利用 publish 指令:

publishchannelmessage

上图中,我们利用 publish 指令向 pay_result 这个频道发送了一条。
我们可以看到 redis 向我们返回 0 ,这实在代表当前订阅者个数,由于此时没有订阅,以是返回结果为 0 。

接下来我们利用 subscribe 订阅一个或多个频道

subscribechannel[channel...]

如上图所示,我们订阅 pay_result 这个频道,当有其他客户端往这个频道发送,

当前订阅者就会收到。

我们子在利用订阅命令,须要紧张几点:

第一,客户端实行订阅指令之后,就会进入订阅状态,之后就只能吸收 subscribe、psubscribe、unsubscribe、punsubscribe 这四个命令。

第二,新订阅的客户端,是无法收到这个频道之前的,这是由于 Redis 并不会对发布的持久化的。

比较于很多专业 MQ,比如 kafka、rocketmq 来说, redis 发布订阅功能就显得有点简陋了。
不过 redis 发布订阅功能胜在大略,如果当前场景可以容忍这些缺陷,还是可以选择利用的。

除了上面的功能以外的,Redis 还支持模式匹配的订阅办法。
大略来说,客户端可以订阅一个带 号的模式,如果某些频道的名字与这个模式匹配,那么当其他客户端发送给给这些频道时,订阅这个模式的客户端也将会到收到。

利用 Redis 订阅模式,我们须要利用一个新的指令 psubscribe。

我们实行下面这个指令:

psubscribepay.

那么一旦有其他客户端往 pay 开头的频道,比如 pay_result、pay_xxx,我们都可以收到。

如果须要取消订阅模式,我们须要利用相应punsubscribe 指令,比如取消上面订阅的模式:

punsubscribepay.Redis 客户端发布订阅利用办法基于 Jedis 开拓发布/订阅

聊完 Redis 发布订阅指令,我们来看下 Java Redis 客户端如何利用发布订阅。

下面的例子紧张基于 Jedis,maven 版本为:

<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version></dependency>

其他 Redis 客户端大同小异。

jedis 发布代码比较大略,只须要调用 Jedis 类的 publish 方法。

//生产环境千万不要这么利用哦,推举利用JedisPool线程池的办法Jedisjedis=newJedis("localhost",6379);jedis.auth("xxxxx");jedis.publish("pay_result","helloworld");

订阅的代码就相对繁芜了,我们须要继续 JedisPubSub实现里面的干系方法,一旦有其他客户端往订阅的频道上发送,将会调用 JedisPubSub 相应的方法。

privatestaticclassMyListenerextendsJedisPubSub{@OverridepublicvoidonMessage(Stringchannel,Stringmessage){System.out.println("收到订阅频道:"+channel+":"+message);}@OverridepublicvoidonPMessage(Stringpattern,Stringchannel,Stringmessage){System.out.println("收到详细订阅频道:"+channel+"订阅模式:"+pattern+":"+message);}}

其次我们须要调用 Jedis 类的 subscribe 方法:

Jedisjedis=newJedis("localhost",6379);jedis.auth("xxx");jedis.subscribe(newMyListener(),"pay_result");

当有其他客户端往 pay_result频道发送时,订阅将会收到。

不过须要把稳的是,jedis#subscribe 是一个壅塞方法,调用之后将会壅塞主线程的,以是如果须要在正式项目利用须要利用异步线程运行,这里就不演示详细的代码了。

基于 Spring-Data-Redis 开拓发布订阅

原生 jedis 发布订阅操作,相对来说还是有点繁芜。
现在我们很多运用已经基于 SpringBoot 开拓,利用 spring-boot-starter-data-redis ,可以简化发布订阅开拓。

首先我们须要引入相应的 startter 依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><artifactId>lettuce-core</artifactId><groupId>io.lettuce</groupId></exclusion></exclusions></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>

这里我们利用 Jedis 当做底层连接客户端,以是须要打消 lettuce,然后引入 Jedis 依赖。

然后我们须要创建一个吸收类,里面须要有方法消费:

@Slf4jpublicclassReceiver{privateAtomicIntegercounter=newAtomicInteger();publicvoidreceiveMessage(Stringmessage){log.info("Received<"+message+">");counter.incrementAndGet();}publicintgetCount(){returncounter.get();}}

接着我们只须要注入 Spring- Redis 干系 Bean,比如:

StringRedisTemplate,用来操作 Redis 命令MessageListenerAdapter ,监听器,可以在这个类注入我们上面创建接管类ReceiverRedisConnectionFactory, 创建 Redis 底层连接

@ConfigurationpublicclassMessageConfiguration{@BeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory,MessageListenerAdapterlistenerAdapter){RedisMessageListenerContainercontainer=newRedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅指定频道利用ChannelTopic//订阅模式利用PatternTopiccontainer.addMessageListener(listenerAdapter,newChannelTopic("pay_result"));returncontainer;}@BeanMessageListenerAdapterlistenerAdapter(Receiverreceiver){//注入Receiver,指定类中的接管方法returnnewMessageListenerAdapter(receiver,"receiveMessage");}@BeanReceiverreceiver(){returnnewReceiver();}@BeanStringRedisTemplatetemplate(RedisConnectionFactoryconnectionFactory){returnnewStringRedisTemplate(connectionFactory);}}

末了我们利用 StringRedisTemplate#convertAndSend 发送,同时 Receiver 将会收到一条。

@SpringBootApplicationpublicclassMessagingRedisApplication{publicstaticvoidmain(String[]args)throwsInterruptedException{ApplicationContextctx=SpringApplication.run(MessagingRedisApplication.class,args);StringRedisTemplatetemplate=ctx.getBean(StringRedisTemplate.class);Receiverreceiver=ctx.getBean(Receiver.class);while(receiver.getCount()==0){template.convertAndSend("pay_result","HellofromRedis!");Thread.sleep(500L);}System.exit(0);}}

Redis 发布订阅实际运用Redis Sentinel 节点创造

Redis Sentinel 是 Redis 一套高可用方案,可以在主节点故障的时候,自动将从节点提升为主节点,从而转移故障。

本日这里我们不详细阐明 Redis Sentinel 详细事理,紧张来看下 Redis Sentinel 如何利用发布订阅机制。

Redis Sentinel 节点紧张利用发布订阅机制,实现新节点的创造,以及交流主节点的之间的状态。

如下所示,每一个 Sentinel 节点将会定时向 _sentinel_:hello 频道发送,并且每个 Sentinel 都会订阅这个节点。

这样一旦有节点往这个频道发送,其他节点就可以急速收到。

这样一旦有的新节点加入,它往这个频道发送,其他节点收到之后,判断本地列表并没有这个节点,于是就可以当做新的节点加入本地节点列表。

除此之外,每次往这个频道发送内容可以包含节点的状态信息,这样可以作为后面 Sentinel 领导者选举的依据。

以上都是对付 Redis 做事端来讲,对付客户端来讲,我们也可以用到发布订阅机制。

当 Redis Sentinel 进行主节点故障转移,这个过程各个阶段会通过发布订阅对外供应。

对付我们客户端来讲,比较关心切换之后的主节点,这样我们及时切换主节点的连接(旧节点此时已故障,不能再接管操作指令),

客户端可以订阅 +switch-master频道,一旦 Redis Sentinel 结束了对主节点的故障转移就会发布主节点的的。

redission 分布式锁

redission 开源框架供应一些便捷操作 Redis 的方法,个中比较出名的 redission 基于 Redis 的实现分布式锁。

本日我们来看下 Redis 的实现分布式锁中如何利用 Redis 发布订阅机制,提高加锁的性能。

PS:redission 分布式锁实现事理,可以参考之前写过的文章:

可重入分布式锁的实现办法

Redis 分布式锁,看似大略,实在真不大略

首先我们来看下 redission 加锁的方法:

Redissonredisson=....RLockredissonLock=redisson.getLock("xxxx");redissonLock.lock();

RLock 继续自 Java 标准的 Lock 接口,调用 lock 方法,如果当前锁已被其他客户端获取,那么当前加锁的线程将会被壅塞,直到其他客户端开释这把锁。

这里实在有个问题,当前壅塞的线程如何感知分布式锁已被开释呢?

这里实在有两种实现方法:

第一钟,定时查询分布时锁的状态,一旦查到锁已被开释(Redis 中不存在这个键值),那么就去加锁。

实现伪码如下:

while(true){booleanresult=lock();if(!result){Thread.sleep(N);}}

这种办法实现起来起来大略,不过缺陷也比较多。

如果定时任务韶光过短,将会导致查询次数过多,实在这些都是无效查询。

如果定时任务休眠韶光过长,那又会导致加锁韶光过长,导致加锁性能不好。

那么第二种实现方案,便是采取做事关照的机制,当分布式锁被开释之后,客户端可以收到锁开释的,然后第一韶光再去加锁。

这个做事关照的机制我们可以利用 Redis 发布订阅模式。

当线程加锁失落败之后,线程将会订阅 redisson_lock__channel_xxx(xx 代表锁的名称) 频道,利用异步线程监听,然后利用 Java 中 Semaphore 使当前哨程进入壅塞。

一旦其他客户端进行解锁,redission 就会往这个redisson_lock__channel_xxx 发送解锁。

等异步线程收到,将会调用 Semaphore 开释旗子暗记量,从而让当前被壅塞的线程唤醒去加锁。

ps:这里只是大略描述了 redission 加锁部分事理,出于篇幅,这里就不再解析源码。

感兴趣的小伙伴可以自己看下 redission 加锁的源码。

通过发布订阅机制,被壅塞的线程可以及时被唤醒,减少无效的空转的查询,有效的提高的加锁的效率。

ps: 这种办法,性能确实提高,但是实现起来的繁芜度也很高,这部分源码有点东西,快看晕了。

总结

本日我们紧张先容 Redis 发布订阅功能,紧张对应的 Redis 命令为:

subscribe channel [channel ...] 订阅一个或多个频道unsubscribe channel 退订指定频道publish channel message 发送psubscribe pattern 订阅指定模式punsubscribe pattern 退订指定模式

我们可以利用 Redis 发布订阅功能,实现的大略 MQ 功能,实现高下游的解耦。

不过须要把稳了,由于 Redis 发布的不会被持久化,这就会导致新订阅的客户端将不会收到历史。

以是,如果当前的业务场景不能容忍这些缺陷,那还是用专业 MQ 吧。

末了先容了两个利用 Redis 发布订阅功能利用场景供大家参考。