jae-park-7GX5aICb5i4-unsplash
本日小黑哥来跟大家先容一下 Redis 发布/订阅功能。
大概有的小伙伴对这个功能比较陌生,不太清楚这个功能是干什么的,没紧要小黑哥先来举个例子。
假设我们有这么一个业务场景,在网站下单支付往后,须要关照库存做事进行发货处理。
上面业务实现不难,我们只要让库存做事供应给干系的给口,下单支付之后只要调用库存做事即可。
后面如果又有新的业务,比如说积分做事,他须要获取下单支付的结果,然后增加用户的积分。
这个实现也不难,让积分做事同样供应一个接口,下单支付之后只要调用库存做事即可。
如果就两个业务须要获取下单支付的结果,那也还好,程序改造也快。可是随着业务不断的发展,越来越多的新业务说是要下单支付的结果。
这时我们会创造上面这样的系统架构存在很多问题:
第一,下单支付业务与其他业务重度耦合,每当有个新业务须要支付结果,就须要改动下单支付的业务。
第二,如果调用业务过多,会导致下单支付接口相应韶光变长。其余,如果有任一下贱接口相应变慢,就会同步导致下单支付接口相应也变长。
第三,如果任一下贱接口失落败,可能导致数据不一致的情形。比如说下图,先调用 A,成功之后再调用 B,末了再调用 C。
如果在调用 B 接口的发生非常,此时可能就导致下单支付接口返回失落败,但是此时 A 接口实在已经调用成功,这就代表它内部已经处理下单支付成功的结果。
这样就会导致 A,B,C 三个下贱接口,A 获取成功获取支付结果,但是 B,C 没有拿到,导致三者系统数据不一致的情形。
实在我们仔细想一下,对付下单支付业务来讲,它实在不须要关心下贱调用结果,只要有某种机制关照能关照到他们就可以了。
讲到这里,这就须要引入本日须要先容发布订阅机制。
Redis 发布与订阅Redis 供应了基于「发布/订阅」模式的机制,在这种模式下,发布者与订阅者不须要进行直接通信。
如上图所示,发布者只须要想指定的频道发布,订阅该频道的每个客户端都可以接管到到这个。
利用 Redis 发布订阅这种机制,对付上面业务,下单支付业务只须要向支付结果这个频道发送,其他下贱业务订阅支付结果这个频道,就能收相应,然后做出业务处理即可。
这样就可以解耦系统高下游之间调用关系。
接下来我们来看下,我们来看下如何利用 Redis 发布订阅功能。
Redis 中供应了一组命令,可以用于发布,订阅频道,取消订阅以及按照模式订阅。
首先我们来看下如何发布一条,实在很大略只要利用 publish 指令:
publishchannelmessage
上图中,我们利用 publish 指令向 pay_result 这个频道发送了一条。我们可以看到 redis 向我们返回 0 ,这实在代表当前订阅者个数,由于此时没有订阅,以是返回结果为 0 。
接下来我们利用 subscribe 订阅一个或多个频道
subscribechannel[channel...]
如上图所示,我们订阅 pay_result 这个频道,当有其他客户端往这个频道发送,
当前订阅者就会收到。
我们子在利用订阅命令,须要紧张几点:
第一,客户端实行订阅指令之后,就会进入订阅状态,之后就只能吸收 subscribe、psubscribe、unsubscribe、punsubscribe 这四个命令。
第二,新订阅的客户端,是无法收到这个频道之前的,这是由于 Redis 并不会对发布的持久化的。
比较于很多专业 MQ,比如 kafka、rocketmq 来说, redis 发布订阅功能就显得有点简陋了。不过 redis 发布订阅功能胜在大略,如果当前场景可以容忍这些缺陷,还是可以选择利用的。
除了上面的功能以外的,Redis 还支持模式匹配的订阅办法。大略来说,客户端可以订阅一个带 号的模式,如果某些频道的名字与这个模式匹配,那么当其他客户端发送给给这些频道时,订阅这个模式的客户端也将会到收到。
利用 Redis 订阅模式,我们须要利用一个新的指令 psubscribe。
我们实行下面这个指令:
psubscribepay.
那么一旦有其他客户端往 pay 开头的频道,比如 pay_result、pay_xxx,我们都可以收到。
如果须要取消订阅模式,我们须要利用相应punsubscribe 指令,比如取消上面订阅的模式:
punsubscribepay.
Redis 客户端发布订阅利用办法基于 Jedis 开拓发布/订阅
聊完 Redis 发布订阅指令,我们来看下 Java Redis 客户端如何利用发布订阅。
下面的例子紧张基于 Jedis,maven 版本为:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version></dependency>
其他 Redis 客户端大同小异。
jedis 发布代码比较大略,只须要调用 Jedis 类的 publish 方法。
//生产环境千万不要这么利用哦,推举利用JedisPool线程池的办法Jedisjedis=newJedis("localhost",6379);jedis.auth("xxxxx");jedis.publish("pay_result","helloworld");
订阅的代码就相对繁芜了,我们须要继续 JedisPubSub实现里面的干系方法,一旦有其他客户端往订阅的频道上发送,将会调用 JedisPubSub 相应的方法。
privatestaticclassMyListenerextendsJedisPubSub{@OverridepublicvoidonMessage(Stringchannel,Stringmessage){System.out.println("收到订阅频道:"+channel+":"+message);}@OverridepublicvoidonPMessage(Stringpattern,Stringchannel,Stringmessage){System.out.println("收到详细订阅频道:"+channel+"订阅模式:"+pattern+":"+message);}}
其次我们须要调用 Jedis 类的 subscribe 方法:
Jedisjedis=newJedis("localhost",6379);jedis.auth("xxx");jedis.subscribe(newMyListener(),"pay_result");
当有其他客户端往 pay_result频道发送时,订阅将会收到。
不过须要把稳的是,jedis#subscribe 是一个壅塞方法,调用之后将会壅塞主线程的,以是如果须要在正式项目利用须要利用异步线程运行,这里就不演示详细的代码了。
基于 Spring-Data-Redis 开拓发布订阅原生 jedis 发布订阅操作,相对来说还是有点繁芜。现在我们很多运用已经基于 SpringBoot 开拓,利用 spring-boot-starter-data-redis ,可以简化发布订阅开拓。
首先我们须要引入相应的 startter 依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><artifactId>lettuce-core</artifactId><groupId>io.lettuce</groupId></exclusion></exclusions></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>
这里我们利用 Jedis 当做底层连接客户端,以是须要打消 lettuce,然后引入 Jedis 依赖。
然后我们须要创建一个吸收类,里面须要有方法消费:
@Slf4jpublicclassReceiver{privateAtomicIntegercounter=newAtomicInteger();publicvoidreceiveMessage(Stringmessage){log.info("Received<"+message+">");counter.incrementAndGet();}publicintgetCount(){returncounter.get();}}
接着我们只须要注入 Spring- Redis 干系 Bean,比如:
StringRedisTemplate,用来操作 Redis 命令MessageListenerAdapter ,监听器,可以在这个类注入我们上面创建接管类ReceiverRedisConnectionFactory, 创建 Redis 底层连接@ConfigurationpublicclassMessageConfiguration{@BeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory,MessageListenerAdapterlistenerAdapter){RedisMessageListenerContainercontainer=newRedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅指定频道利用ChannelTopic//订阅模式利用PatternTopiccontainer.addMessageListener(listenerAdapter,newChannelTopic("pay_result"));returncontainer;}@BeanMessageListenerAdapterlistenerAdapter(Receiverreceiver){//注入Receiver,指定类中的接管方法returnnewMessageListenerAdapter(receiver,"receiveMessage");}@BeanReceiverreceiver(){returnnewReceiver();}@BeanStringRedisTemplatetemplate(RedisConnectionFactoryconnectionFactory){returnnewStringRedisTemplate(connectionFactory);}}
末了我们利用 StringRedisTemplate#convertAndSend 发送,同时 Receiver 将会收到一条。
@SpringBootApplicationpublicclassMessagingRedisApplication{publicstaticvoidmain(String[]args)throwsInterruptedException{ApplicationContextctx=SpringApplication.run(MessagingRedisApplication.class,args);StringRedisTemplatetemplate=ctx.getBean(StringRedisTemplate.class);Receiverreceiver=ctx.getBean(Receiver.class);while(receiver.getCount()==0){template.convertAndSend("pay_result","HellofromRedis!");Thread.sleep(500L);}System.exit(0);}}
Redis Sentinel 是 Redis 一套高可用方案,可以在主节点故障的时候,自动将从节点提升为主节点,从而转移故障。
本日这里我们不详细阐明 Redis Sentinel 详细事理,紧张来看下 Redis Sentinel 如何利用发布订阅机制。
Redis Sentinel 节点紧张利用发布订阅机制,实现新节点的创造,以及交流主节点的之间的状态。
如下所示,每一个 Sentinel 节点将会定时向 _sentinel_:hello 频道发送,并且每个 Sentinel 都会订阅这个节点。
这样一旦有节点往这个频道发送,其他节点就可以急速收到。
这样一旦有的新节点加入,它往这个频道发送,其他节点收到之后,判断本地列表并没有这个节点,于是就可以当做新的节点加入本地节点列表。
除此之外,每次往这个频道发送内容可以包含节点的状态信息,这样可以作为后面 Sentinel 领导者选举的依据。
以上都是对付 Redis 做事端来讲,对付客户端来讲,我们也可以用到发布订阅机制。
当 Redis Sentinel 进行主节点故障转移,这个过程各个阶段会通过发布订阅对外供应。
对付我们客户端来讲,比较关心切换之后的主节点,这样我们及时切换主节点的连接(旧节点此时已故障,不能再接管操作指令),
客户端可以订阅 +switch-master频道,一旦 Redis Sentinel 结束了对主节点的故障转移就会发布主节点的的。
redission 分布式锁redission 开源框架供应一些便捷操作 Redis 的方法,个中比较出名的 redission 基于 Redis 的实现分布式锁。
本日我们来看下 Redis 的实现分布式锁中如何利用 Redis 发布订阅机制,提高加锁的性能。
PS:redission 分布式锁实现事理,可以参考之前写过的文章:
可重入分布式锁的实现办法
Redis 分布式锁,看似大略,实在真不大略
首先我们来看下 redission 加锁的方法:
Redissonredisson=....RLockredissonLock=redisson.getLock("xxxx");redissonLock.lock();
RLock 继续自 Java 标准的 Lock 接口,调用 lock 方法,如果当前锁已被其他客户端获取,那么当前加锁的线程将会被壅塞,直到其他客户端开释这把锁。
这里实在有个问题,当前壅塞的线程如何感知分布式锁已被开释呢?
这里实在有两种实现方法:
第一钟,定时查询分布时锁的状态,一旦查到锁已被开释(Redis 中不存在这个键值),那么就去加锁。
实现伪码如下:
while(true){booleanresult=lock();if(!result){Thread.sleep(N);}}
这种办法实现起来起来大略,不过缺陷也比较多。
如果定时任务韶光过短,将会导致查询次数过多,实在这些都是无效查询。
如果定时任务休眠韶光过长,那又会导致加锁韶光过长,导致加锁性能不好。
那么第二种实现方案,便是采取做事关照的机制,当分布式锁被开释之后,客户端可以收到锁开释的,然后第一韶光再去加锁。
这个做事关照的机制我们可以利用 Redis 发布订阅模式。
当线程加锁失落败之后,线程将会订阅 redisson_lock__channel_xxx(xx 代表锁的名称) 频道,利用异步线程监听,然后利用 Java 中 Semaphore 使当前哨程进入壅塞。
一旦其他客户端进行解锁,redission 就会往这个redisson_lock__channel_xxx 发送解锁。
等异步线程收到,将会调用 Semaphore 开释旗子暗记量,从而让当前被壅塞的线程唤醒去加锁。
ps:这里只是大略描述了 redission 加锁部分事理,出于篇幅,这里就不再解析源码。
感兴趣的小伙伴可以自己看下 redission 加锁的源码。
通过发布订阅机制,被壅塞的线程可以及时被唤醒,减少无效的空转的查询,有效的提高的加锁的效率。
ps: 这种办法,性能确实提高,但是实现起来的繁芜度也很高,这部分源码有点东西,快看晕了。
总结本日我们紧张先容 Redis 发布订阅功能,紧张对应的 Redis 命令为:
subscribe channel [channel ...] 订阅一个或多个频道unsubscribe channel 退订指定频道publish channel message 发送psubscribe pattern 订阅指定模式punsubscribe pattern 退订指定模式我们可以利用 Redis 发布订阅功能,实现的大略 MQ 功能,实现高下游的解耦。
不过须要把稳了,由于 Redis 发布的不会被持久化,这就会导致新订阅的客户端将不会收到历史。
以是,如果当前的业务场景不能容忍这些缺陷,那还是用专业 MQ 吧。
末了先容了两个利用 Redis 发布订阅功能利用场景供大家参考。