这个场景很熟习吧~~ 通过obs推流软件来推流。

户外直播,通过yasea手机端推流软件,利用手机摄像头推流。

电脑端效果播放喷鼻香港卫视

jspinceptorgithub项目LiveRoomDemo 着手打造属于本身的直播间 HTML

直播画面

项目总览

项目分为三个部分:

客户端

直播间视频拉流、播放和谈天室,炫酷的弹幕以及直播间信息

做事端

处理直播间、用户的数据业务,谈天室的处理

做事器支配视频做事器和web做事器技能栈

移动客户端

VUE百口桶UI层vonicaxios视频播放器: vue-video-player + videojs-contrib-hlswebsocket客户端: vue-stomp弹幕插件: vue-barrage打包工具:webpack

电脑端客户端

项目架构: Jquery + BootStrap视频播放器: video.jswebsocket客户端: stomp.js + sockjs.js弹幕插件: Jquery.danmu.js模版引擎: thymeleaf

做事端

IDE: IntelliJ IDEA项目架构: SpringBoot1.5.4 +Maven3.0主数据库: Mysql5.7辅数据库: redis3.2数据库访问层: spring-boot-starter-data-jpa + spring-boot-starter-data-rediswebsocket: spring-boot-starter-websocket中间件: RabbitMQ/3.6.10

做事器支配

视频直播模块: nginx-rtmp-moduleweb运用做事器: tomcat8.0做事器: 腾讯云centos6.5技能点讲解直播间紧张涉及到两个紧张功能:第一是视频直播、第二是谈天室。
这两个都是非常讲究实时性。
视频直播

说到直播我们先理解下几个常用的直播流协议,看了挺多的流媒体协议文章博客,但都是非常粗略,这里有个比较详细的 流媒体协议先容,如果想详细理解协议内容估计去要看看专业书本了。
这里我们用到的只是rtmp和hls,实践后创造:rtmp只能够在电脑端播放,hls只能够在手机端播放。
而且rtmp是相称快的只管没有rtsp那么快,延迟只有几秒,我测试的就差不多2-5秒,但是hls大概有10几秒。
以是如果你体验过demo,就会创造手机延迟比较多。

直播的流程:直播分为推流和拉流两个过程,那么流推向哪里,拉流又从哪里拉取呢?那当然须要视频做事器啦,千万不要以为视频直播做事器很繁芜,其实在nginx做事器中统统都变得大略。
后面我会讲解如何支配Nginx做事器并配置视频模块(nginx-rtmp-module).

首先主播通过推流软件,比如OBS Studio推流软件,这个是比较专业级别的,很多直播平台的推举主播利用这个软件来推送视频流,这里我也推举一个开源的安卓端推流工具Yasea,下载地址,文件很小,但是很强大。
直播内容推送到做事器后,就可以在做事器端利用视频编码工具进行转码了,可以转换成各种高清,标清,超清的分辨率视频,也便是为什么我们在各个视频网站都可以选择视频清晰度。
这里我们没有转码,只是通过前端视频播放器(video.js)来拉取视频.这样全体视频推流拉流过程就完成了。

谈天室

直播间里面的谈天室跟我们的群谈天差不多,只不过它变成了web端,web真个即时通信方案有很多,这里我们选择websocket协议来与做事端通信,websocket是基于http之上的传输协议,客户端向做事端发送http要求,并携带Upgrade:websocket升级头信息表示转换websocket协议,通过与做事端握手成功后就可以建立tcp通道,由此来通报,它与http最大的差别便是,做事端可以主动向客户端发送。

既然建立了通道,那我们就须要往通道里发,但是总得须要一个东西来管控该发给谁吧,要不然全乱套了,以是我们选择了中间件RabbitMQ.利用它来卖力的路由去向。

理论知识都讲完啦,实操韶光到!移动客户端实操

源码地址

工程构造

|—— build 构建做事和webpack配置 |—— congfig 项目不同环境的配置|—— dist build天生生产目录|—— static 静态资源|—— package.json 项目配置文件|—— src 开拓源代码目录 |—— api 通过axios导出的api目录 |—— components 页面和组件 |—— public 公有组件 |—— vuex 全局状态 |—— main.js 运用启动配置点功能模块拉取做事器的直播视频流(hls)并播放直播画面与做事端创建websocket连接,收发谈天室通过websocket获取消息并发送到弹幕通过websocket实时更新在线用户结合做事端获取访问历史记录问题反馈模块效果图

项目解释

请参考源码 https://github.com/jack-hoo/LiveRoomDemo_Server.git

做事端实操

源码地址 https://github.com/jack-hoo/LiveRoomDemo_Server.git

由于个人比较喜好打仗新的东西,所往后端选择了springboot,前端选择了Vue.js年轻人嘛总得跟上潮流。
SpringBoot实践过后创造真的太省心了,不用再理会各种配置文件,全自动化妆配。
这里贴一下pom.xml

&lt;?xml version="1.0" encoding="UTF-8"?&gt;<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hushangjie</groupId> <artifactId>rtmp-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rtmp-demo</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-actuator-docs</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!--<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency>--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <!--非严格模式解析HTML5--> <dependency> <groupId>net.sourceforge.nekohtml</groupId> <artifactId>nekohtml</artifactId> <version>1.9.22</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- 打包成war时可以移除嵌入式tomcat插件 --> <!--<exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions>--> </dependency> <!--<dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency>--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.webjars</groupId> <artifactId>vue</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.2</version> </dependency> <!-- RabbitMQ干系配置--> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>2.0.8.RELEASE</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-net</artifactId> <version>2.0.8.RELEASE</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> </configuration> </plugin> </plugins> </build></project>

application.properties文件

spring.datasource.url=jdbc:mysql://host:3306/database?characterEncoding=utf8&useSSL=falsespring.datasource.username=usernamespring.datasource.password=passwordspring.datasource.driver-class-name=com.mysql.jdbc.Driverspring.thymeleaf.mode=LEGACYHTML5server.port=8085# REDIS (RedisProperties)# Redis数据库索引(默认为0)spring.redis.database=0 # Redis做事器地址spring.redis.host=127.0.0.1# Redis做事器连接端口spring.redis.port=6379 # Redis做事器连接密码(默认为空)spring.redis.password=# 连接池最大连接数(利用负值表示没有限定)spring.redis.pool.max-active=8 # 连接池最大壅塞等待韶光(利用负值表示没有限定)spring.redis.pool.max-wait=-1 # 连接池中的最大空闲连接spring.redis.pool.max-idle=8 # 连接池中的最小空闲连接spring.redis.pool.min-idle=0 # 连接超时时间(毫秒)spring.redis.timeout=0 websocket配置

@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { //拦截器注入service失落败办理办法 @Bean public MyChannelInterceptor myChannelInterceptor(){ return new MyChannelInterceptor(); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { //添加访问域名限定可以防止跨域socket连接 //setAllowedOrigins("http://localhost:8085") registry.addEndpoint("/live").setAllowedOrigins("").addInterceptors(new HandShkeInceptor()).withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { /.enableSimpleBroker("/topic","/queue");/ //如果须要第三方代理,比如rabitMQ,activeMq,在这里配置 registry.setApplicationDestinationPrefixes("/demo") .enableStompBrokerRelay("/topic","/queue") .setRelayHost("127.0.0.1") .setRelayPort(61613) .setClientLogin("guest") .setClientPasscode("guest") .setSystemLogin("guest") .setSystemPasscode("guest") .setSystemHeartbeatSendInterval(5000) .setSystemHeartbeatReceiveInterval(4000); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { ChannelRegistration channelRegistration = registration.setInterceptors(myChannelInterceptor()); super.configureClientInboundChannel(registration); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { super.configureClientOutboundChannel(registration); }}

配置类继续了代理配置类,意味着我们将利用代理rabbitmq.利用registerStompEndpoints方法注册一个websocket终端连接。
这里我们须要理解两个东西,第一个是stomp和sockjs,sockjs是啥呢,实在它是对付websocket的封装,由于如果纯挚利用websocket的话效率会非常低,我们须要的编码量也会增多,而且如果浏览器不支持websocket,sockjs会自动降级为轮询策略,并仿照websocket,担保客户端和做事端可以通信。
stomp有是什么看这里

stomp是一种大略(流)文本定向协议,它供应了一个可互操作的连接格式,许可STOMP客户端与任意STOMP代理(Broker)进行交互,也便是我们上面的RabbbitMQ,它便是一个代理。
我们可以通过configureMessageBroker来配置代理,须要把稳的是我们将要支配的做事器也该当要有RabbitMQ,由于它是一个中间件,安装非常随意马虎,这里就不解释了。
这里我们配置了“/topic,/queue”两个代理转播策略,便是说客户端订阅了前缀为“/topic,/queue”频道都会通过代理(RabbitMQ)来转发。
跟spring没啥关系啦,完备解耦。

websocke如何担保安全

一开始打仗 stomp的时候一贯有个问题困扰我,客户端只要与做事端通过websocket建立了连接,那么他就可以订阅任何内容,意味着可以接管任何,这样岂不是乱了套啦,于是我翻阅了大量博客文章,很多都是官方的例子并没有办理实际问题。
经由琢磨,实在websocket是要考虑安全性的。
详细在以下几个方面

跨域websocket连接协议升级前握手拦截器信道拦截器

对付跨域问题,我们可以通过setAllowedOrigins方法来设置可连接的域名,防止跨站连接。

对付站内用户是否许可连接我们可以如下配置

public class HandShkeInceptor extends HttpSessionHandshakeInterceptor { private static final Set<UserEntity> ONLINE_USERS = new HashSet<>(); @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { System.out.println("握手前"+request.getURI()); //http协议转换websoket协议进行前,常日这个拦截器可以用来判断用户合法性等 //鉴别用户 if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; //这句话很主要如果getSession(true)会导致移动端无法握手成功 //request.getSession(true):若存在会话则返回该会话,否则新建一个会话。
//request.getSession(false):若存在会话则返回该会话,否则返回NULL //HttpSession session = servletRequest.getServletRequest().getSession(false); HttpSession session = servletRequest.getServletRequest().getSession(); UserEntity user = (UserEntity) session.getAttribute("user"); if (user != null) { //这里只利用大略的session来存储用户,如果利用了springsecurity可以直策应用principal return super.beforeHandshake(request, response, wsHandler, attributes); }else { System.out.println("用户未登录,握手失落败!
"); return false; } } return false; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { //握手成功后,常日用来注册用户信息 System.out.println("握手后"); super.afterHandshake(request, response, wsHandler, ex); }}

HttpSessionHandshakeInterceptor 这个拦截器用来管理握手和握手后的事情,我们可以通过要求信息,比如token、或者session判用户是否可以连接,这样就能够戒备造孽用户。

那如何限定用户只能订阅指定内容呢?我们接着往下看

public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //检测用户订阅内容(防止用户订阅不合法频道) if (StompCommand.SUBSCRIBE.equals(command)) { //从数据库获取用户订阅频道进行比拟(这里为了演示直策应用set凑集代替) Set<String> subedChannelInDB = new HashSet<>(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); if (subedChannelInDB.contains(accessor.getDestination())) { //该用户订阅的频道合法 return super.preSend(message, channel); } else { //该用户订阅的频道不合法直接返回null前端用户就接管不到该频道信息。
return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); //检测用户是否连接成功,搜集在线的用户信息如果数据量过大我们可以选择利用缓存数据库比如redis, //这里由于须要频繁的删除和增加凑集内容,我们选择set凑集来存储在线用户 StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user != null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); //通过websocket实时返回在线人数 this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } //如果用户断开连接,删除用户信息 if (StompCommand.DISCONNECT.equals(command)){ Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user != null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); }}

在stomp里面,Channel信道便是传送的通道,客户端与做事端建立了连接就相称于建立了通道,往后的信息便是通过这个通道来传输。
所有的都有头,被封装在了spring 的messag接口中,比如建立连接时候头就含有CONNECT,当然还有一些其他的信息。
客户端订阅的时候也有订阅头信息SUBSCRIBE,那么我是不是可以在这个拦截器ChannelInterceptorAdapter 中拦截每个人的订阅信息,然后与数据库的信息作比对,末了决定这个用户是否可以订阅这个频道的信息呢,对的,这是我的想法,按照这样的思路,做单聊不是迎刃而解了吗。
那客户端通过websocket发送的如何到达订阅者手中呢,按照rabbitmq的规则,订阅者属于消费者,发送的一方属于生产者,生产者通过websocket把发送到做事端,做事端通过转发给代理(rabbitmq),代理卖力存储,管理发送规则,推送给订阅者,看下面的代码

@MessageMapping(value = "/chat") @SendTo("/topic/group") public MsgEntity testWst(String message , @Header(value = "simpSessionAttributes") Map<String,Object> session){ UserEntity user = (UserEntity) session.get("user"); String username = user.getRandomName(); MsgEntity msg = new MsgEntity(); msg.setCreator(username); msg.setsTime(Calendar.getInstance()); msg.setMsgBody(message); return msg; }

@MessageMapping看起来跟springmvc方法特殊像,它即可以用在类级别上也可以用在方法级别上当发送者往‘/chat’发送后,做事端接管到,再发送给“/topic/group”的订阅者,@SendTo便是发送给谁,这里须要把稳的有,如果我们没有配置代理,只利用了enableSimpleBroker("/topic","/queue")大略代理,那么便是直接发送到订阅者,如果配置了代理,那还要通过代理,由它来转发。

如果我们想在做事端随时发送,而不是在客户端发送(这样的场景很常见,比如发送全局关照),可以利用SimpMessagingTemplate类,通过注入该bean,在得当的业务场景中发送。

Redis统计数据

直播间常常须要统计数据,比如实时在线人数,访问量,贡献排行榜,订阅量。
我选择的方案是利用redis来计数,只管这个demo可能不会太多人访问,但是我的目的是学习如何利用redis先看springboot中redis的配置

@Configurationpublic class RedisConfig extends CachingConfigurerSupport{ / 天生key的策略 @return / @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } return sb.toString(); } }; } / 管理缓存 @param redisTemplate @return / @SuppressWarnings("rawtypes") @Bean public CacheManager cacheManager(RedisTemplate redisTemplate) { RedisCacheManager rcm = new RedisCacheManager(redisTemplate); //设置缓存过期韶光 // rcm.setDefaultExpiration(60);//秒 //设置value的过期韶光 Map<String,Long> map=new HashMap(); map.put("test",60L); rcm.setExpires(map); return rcm; } / RedisTemplate配置 @param factory @return / @Bean public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer);//如果key是String 须要配置一下StringSerializer,不然key会乱码 /XX/XX template.afterPropertiesSet(); //template.setStringSerializer(); return template; }}

redis数据统计Dao的实现

@Repositorypublic class StatDao { @Autowired RedisTemplate redisTemplate; public void pushOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().add("OnlineUser",userEntity); } public void popOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().remove("OnlineUser" ,userEntity); } public Set getAllUserOnline(){ return redisTemplate.opsForSet().members("OnlineUser"); } public void pushGuestHistory(Guest guest){ //最多存储指定个数的访客 if (redisTemplate.opsForList().size("Guest") == 200l){ redisTemplate.opsForList().rightPop("Guest"); } redisTemplate.opsForList().leftPush("Guest",guest); } public List getGuestHistory(){ return redisTemplate.opsForList().range("Guest",0,-1); }}

Dao层非常大略,由于我们只须要统计在线人数和访客。
但是在线人数是实时更新的,既然我们利用了websocket实时数据更新就非常随意马虎了,前面我们讲过,通过信道拦截器可以拦截连接,订阅,断开连接等等事宜信息,以是我们就可以当用户连接时存储在线用户,通过websocket返回在线用户信息。

public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //检测用户订阅内容(防止用户订阅不合法频道) if (StompCommand.SUBSCRIBE.equals(command)) { //从数据库获取用户订阅频道进行比拟(这里为了演示直策应用set凑集代替) Set<String> subedChannelInDB = new HashSet<>(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); if (subedChannelInDB.contains(accessor.getDestination())) { //该用户订阅的频道合法 return super.preSend(message, channel); } else { //该用户订阅的频道不合法直接返回null前端用户就接管不到该频道信息。
return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); //检测用户是否连接成功,搜集在线的用户信息如果数据量过大我们可以选择利用缓存数据库比如redis, //这里由于须要频繁的删除和增加凑集内容,我们选择set凑集来存储在线用户 StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user != null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); //通过websocket实时返回在线人数 this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } //如果用户断开连接,删除用户信息 if (StompCommand.DISCONNECT.equals(command)){ Map<String,UserEntity> map = (Map<String, UserEntity>) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user != null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); }}

由于这个项目有移动端和电脑端,以是须要根据要求代理UserAgent来判断客户端属于哪一种类型。
这个工具类在源码上有。
我就不贴了。

做事器支配

说了这么多即时通信,却没创造视频直播。
不要焦急我们立时进入视频环节。
文章开头就解释了几种媒体流协议,这里不讲解详细的协议流程,只须要知道,我们是通过推流软件采集视频信息,如何采集也不是我们关注的。
采集到信息后通过软件来推送到指定的做事器,如下图

obs推流设置yasea手机端推流设置

赤色部分是做事器开放的获取流接口。

Nginx-rtmp-module配置

视频做事器有很多,也支持很多媒体流协议。
这里我们选择nginx-rtmp-module来做视频做事,接下来我们须要在linux下安装nginx,并安装rtmp模块。
本人也是linux初学者,一步步摸索着把做事器搭建好,听说tomcat和nginx很配哦,以是作为免费开源确当然首选这两个。
接下来须要在linux安装一下软件和做事。

Nginx以及Nginx-rtmp-moduleTomcatMysqlRedisRabbitMQ

安装步骤我就不说了,大家搜索一下啦,这里贴一下nginx.conf文件配置

rtmp { server { listen 1935; chunk_size 4096; application video { play /yjdata/www/www/video; } application live { live on; hls on; hls_path /yjdata/www/www/live/hls/; hls_fragment 5s; } }}

上面代码是配置rtmp模块, play /yjdata/www/www/video 指的是配置点播模块,可以直接播放/yjdata/www/www/video路径下的视频。
hls_path制订hls分块存放路径,由于hls是通过获取到推送的视频流信息,分块存储在做事器。
以是它的延时比rtmp要更高。

server { listen 80; server_name localhost; #charset koi8-r; index index.jsp index.html; root /yjdata/www/www; #access_log logs/host.access.log main; location / { proxy_pass http://127.0.0.1:8080; } location ~ .\.(gif|jpg|jpeg|png|bmp|swf|js|css|docx|pdf|doc|ppt|html|properties)$ { expires 30d; root /yjdata/www/www/static/; } location /hls { types { application/vnd.apple.mpegurl m3u8; #application/x-mpegURL; video/mp2t ts; } alias /yjdata/www/www/live/hls/; expires -1; add_header Cache-Control no-cache; } location /stat { rtmp_stat all; rtmp_stat_stylesheet stat.xsl; } location /stat.xsl { root /soft/nginx/nginx-rtmp-module/; }

上面配置了location 指向/hls,别名是/yjdata/www/www/live/hls/,以是可以在前端直接通过域名+/hls/+文件名.m3u8获取直播视频。
关于nginx的配置还有很多,我也在学习当中。
总而言之nginx非常强大。

总结

通过从前端=>后台=>做事器,全体流程走下来还是须要花很多心思。
但是收成也是很