写点什么

Kurento 实战之五:媒体播放,mysql 高级教程 ppt

用户头像
极客good
关注
发布于: 刚刚

spring.application.name=PlayerWithRecorder


  • 新增一个数据结构 UserSession.java,每个网页都对应一个 UserSession 实例,重点关注的是 release 方法,在停止播放时调用此方法释放播放器和 WebRTC 连接资源:


package com.bolingcavalry.playerwithrecord;


import org.kurento.client.IceCandidate;


import org.kurento.client.MediaPipeline;


import org.kurento.client.PlayerEndpoint;


import org.kurento.client.WebRtcEndpoint;


public class UserSession {


private WebRtcEndpoint webRtcEndpoint;


private MediaPipeline mediaPipeline;


private PlayerEndpoint playerEndpoint;


public UserSession() {


}


public WebRtcEndpoint getWebRtcEndpoint() {


return webRtcEndpoint;


}


public void setWebRtcEndpoint(WebRtcEndpoint webRtcEndpoint) {


this.webRtcEndpoint = webRtcEndpoint;


}


public MediaPipeline getMediaPipeline() {


return mediaPipeline;


}


public void setMediaPipeline(MediaPipeline mediaPipeline) {


this.mediaPipeline = mediaPipeline;


}


public void addCandidate(IceCandidate candidate) {


webRtcEndpoint.addIceCandidate(candidate);


}


public PlayerEndpoint getPlayerEndpoint() {


return playerEndpoint;


}


public void setPlayerEndpoint(PlayerEndpoint playerEndpoint) {


this.playerEndpoint = playerEndpoint;


}


public void release() {


this.playerEndpoint.stop();


this.mediaPipeline.release();


}


}


  • 启动类 PlayerWithRecorder.java,有两处要注意,一个是 registerWebSocketHandlers 方法用来绑定 websocket 的处理类,另一个是 kurentoClient,KurentoClient.create 方法的入参是 KMS 的服务地址:


package com.bolingcavalry.playerwithrecord;


import org.kurento.client.KurentoClient;


import org.springframework.boot.SpringApplication;


import org.springframework.boot.autoconfigure.SpringBootApplication;


import org.springframework.context.annotation.Bean;


import org.springframework.web.socket.config.annotation.EnableWebSocket;


import org.springframework.web.socket.config.annotation.WebSocketConfigurer;


import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;


import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;


@EnableWebSocket


@SpringBootApplication


public class PlayerWithRecorder implements WebSocketConfigurer {


@Bean


public PlayerHandler handler() {


return new PlayerHandler();


}


/**


  • 实例化 KurentoClient,入参是 KMS 地址

  • @return


*/


@Bean


public KurentoClient kurentoClient() {


return KurentoClient.create("ws://192.168.91.128:8888/kurento");


}


@Bean


public ServletServerContainerFactoryBean createServletServerContainerFactoryBean() {


ServletServerContainerFactoryBean container = new ServletServerContainerFac


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


toryBean();


container.setMaxTextMessageBufferSize(32768);


return container;


}


/**


  • 标准的 WebSocket 处理类绑定

  • @param registry


*/


@Override


public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {


registry.addHandler(handler(), "/player");


}


public static void main(String[] args) throws Exception {


SpringApplication.run(PlayerWithRecorder.class, args);


}


}


  • 接下来就是 websocket 的处理类 PlayerHandler.java,这是本篇的核心,有几处重点稍后会提到:


package com.bolingcavalry.playerwithrecord;


import java.io.IOException;


import java.io.PrintWriter;


import java.util.concurrent.ConcurrentHashMap;


import org.kurento.client.EndOfStreamEvent;


import org.kurento.client.ErrorEvent;


import org.kurento.client.EventListener;


import org.kurento.client.IceCandidate;


import org.kurento.client.IceCandidateFoundEvent;


import org.kurento.client.KurentoClient;


import org.kurento.client.MediaPipeline;


import org.kurento.client.MediaState;


import org.kurento.client.MediaStateChangedEvent;


import org.kurento.client.PlayerEndpoint;


import org.kurento.client.ServerManager;


import org.kurento.client.VideoInfo;


import org.kurento.client.WebRtcEndpoint;


import org.kurento.commons.exception.KurentoException;


import org.kurento.jsonrpc.JsonUtils;


import org.slf4j.Logger;


import org.slf4j.LoggerFactory;


import org.springframework.beans.factory.annotation.Autowired;


import org.springframework.web.socket.CloseStatus;


import org.springframework.web.socket.TextMessage;


import org.springframework.web.socket.WebSocketSession;


import org.springframework.web.socket.handler.TextWebSocketHandler;


import com.google.gson.Gson;


import com.google.gson.GsonBuilder;


import com.google.gson.JsonObject;


public class PlayerHandler extends TextWebSocketHandler {


@Autowired


private KurentoClient kurento;


private final Logger log = LoggerFactory.getLogger(PlayerHandler.class);


private final Gson gson = new GsonBuilder().create();


private final ConcurrentHashMap<String, UserSession> users = new ConcurrentHashMap<>();


@Override


public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {


JsonObject jsonMessage = gson.fromJson(message.getPayload(), JsonObject.class);


String sessionId = session.getId();


log.debug("用户[{}]收到 websocket 命令: {} from sessionId", sessionId, jsonMessage);


try {


switch (jsonMessage.get("id").getAsString()) {


// 开始播放


case "start":


start(session, jsonMessage);


break;


// 停止播放


case "stop":


stop(sessionId);


break;


// 暂停


case "pause":


pause(sessionId);


break;


// 恢复


case "resume":


resume(session);


break;


// 生成监控内容


case "debugDot":


debugDot(session);


break;


// 前进或者倒退


case "doSeek":


doSeek(session, jsonMessage);


break;


// 取位置


case "getPosition":


getPosition(session);


break;


// 更新 WebRTC 的 ICE 数据


case "onIceCandidate":


onIceCandidate(sessionId, jsonMessage);


break;


default:


sendError(session, "Invalid message with id " + jsonMessage.get("id").getAsString());


break;


}


} catch (Throwable t) {


log.error("Exception handling message {} in sessionId {}", jsonMessage, sessionId, t);


sendError(session, t.getMessage());


}


}


private void start(final WebSocketSession session, JsonObject jsonMessage) {


// 1.新建 MediaPipeline 对象


MediaPipeline pipeline = kurento.createMediaPipeline();


// 2. 新建连接浏览器的 WebRtcEndpoint 对象


WebRtcEndpoint webRtcEndpoint = new WebRtcEndpoint.Builder(pipeline).build();


// 3.1 取出要播放的地址


String videourl = jsonMessage.get("videourl").getAsString();


// 3.2 新建负责播放的 PlayerEndpoint 对象


final PlayerEndpoint playerEndpoint = new PlayerEndpoint.Builder(pipeline, videourl).build();


// 4 playerEndpoint 连接 webRtcEndpoint,这样 playerEndpoint 解码出的内容通过 webRtcEndpoint 给到浏览器


playerEndpoint.connect(webRtcEndpoint);


// 5. WebRtc 相关的操作


// 5.1 一旦收到 KMS 的 candidate 就立即给到前端


webRtcEndpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {


@Override


public void onEvent(IceCandidateFoundEvent event) {


JsonObject response = new JsonObject();


response.addProperty("id", "iceCandidate");


response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));


try {


synchronized (session) {


session.sendMessage(new TextMessage(response.toString()));


}


} catch (IOException e) {


log.debug(e.getMessage());


}


}


});


// SDP offer 是前端给的


String sdpOffer = jsonMessage.get("sdpOffer").getAsString();


// 给前端准备 SDP answer


String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer);


log.info("[Handler::start] SDP Offer from browser to KMS:\n{}", sdpOffer);


log.info("[Handler::start] SDP Answer from KMS to browser:\n{}", sdpAnswer);


JsonObject response = new JsonObject();


response.addProperty("id", "startResponse");


response.addProperty("sdpAnswer", sdpAnswer);


sendMessage(session, response.toString());


// 6. 和媒体播放有关的操作


// 6.1 KMS 会发送和媒体播放有关的消息过来,如果连接媒体成功,就把获取到的相关参数给到前端


webRtcEndpoint.addMediaStateChangedListener(new EventListener<MediaStateChangedEvent>() {


@Override


public void onEvent(MediaStateChangedEvent event) {


if (event.getNewState() == MediaState.CONNECTED) {


// 媒体相关的信息可以用 getVideoInfo 去的


VideoInfo videoInfo = playerEndpoint.getVideoInfo();


JsonObject response = new JsonObject();


response.addProperty("id", "videoInfo");


response.addProperty("isSeekable", videoInfo.getIsSeekable());


response.addProperty("initSeekable", videoInfo.getSeekableInit());


response.addProperty("endSeekable", videoInfo.getSeekableEnd());


response.addProperty("videoDuration", videoInfo.getDuration());


// 把这些媒体信息给前端


sendMessage(session, response.toString());


}


}


});


// 让 KMS 把它的 ICD Candidate 发过来(前面的监听会收到)


webRtcEndpoint.gatherCandidates();


// 7.1 添加媒体播放的监听:异常消息


playerEndpoint.addErrorListener(new EventListener<ErrorEvent>() {


@Override


public void onEvent(ErrorEvent event) {


log.info("ErrorEvent: {}", event.getDescription());


// 通知前端停止播放


sendPlayEnd(session);


}


});


// 7.2 添加媒体播放的监听:播放结束


playerEndpoint.addEndOfStreamListener(new EventListener<EndOfStreamEvent>() {


@Override


public void onEvent(EndOfStreamEvent event) {


log.info("EndOfStreamEvent: {}", event.getTimestamp());


// 通知前端停止播放


sendPlayEnd(session);


}


});


// 通过 KMS 开始连接远程媒体


playerEndpoint.play();


// 将 pipeline、webRtcEndpoint、playerEndpoint 这些信息放入 UserSession 对象中,


// 这样方便处理前端发过来的各种命令


final UserSession user = new UserSession();


user.setMediaPipeline(pipeline);


user.setWebRtcEndpoint(webRtcEndpoint);


user.setPlayerEndpoint(playerEndpoint);


users.put(session.getId(), user);


}


/**


  • 暂停播放

  • @param sessionId


*/


private void pause(String sessionId) {


UserSession user = users.get(sessionId);


if (user != null) {


user.getPlayerEndpoint().pause();


}


}


/**


  • 从暂停恢复

  • @param session


*/


private void resume(final WebSocketSession session) {


UserSession user = users.get(session.getId());


if (user != null) {


user.getPlayerEndpoint().play();


VideoInfo videoInfo = user.getPlayerEndpoint().getVideoInfo();


JsonObject response = new JsonObject();


response.addProperty("id", "videoInfo");


response.addProperty("isSeekable", videoInfo.getIsSeekable());


response.addProperty("initSeekable", videoInfo.getSeekableInit());


response.addProperty("endSeekable", videoInfo.getSeekableEnd());


response.addProperty("videoDuration", videoInfo.getDuration());


sendMessage(session, response.toString());


}


}


/**


  • 停止播放

  • @param sessionId


*/


private void stop(String sessionId) {


UserSession user = users.remove(sessionId);


if (user != null) {


user.release();


}


}


/**


  • 取得 Gstreamer 的 dot 内容,这样的内容可以被 graphviz 工具解析成拓扑图

  • @param session


*/


private void debugDot(final WebSocketSession session) {


UserSession user = users.get(session.getId());


if (user != null) {


final String pipelineDot = user.getMediaPipeline().getGstreamerDot();


try (PrintWriter out = new PrintWriter("player.dot")) {


out.println(pipelineDot);


} catch (IOException ex) {


log.error("[Handler::debugDot] Exception: {}", ex.getMessage());


}


final String playerDot = user.getPlayerEndpoint().getElementGstreamerDot();


try (PrintWriter out = new PrintWriter("player-decoder.dot")) {


out.println(playerDot);


} catch (IOException ex) {


log.error("[Handler::debugDot] Exception: {}", ex.getMessage());


}


}


ServerManager sm = kurento.getServerManager();


log.warn("[Handler::debugDot] CPU COUNT: {}", sm.getCpuCount());


log.warn("[Handler::debugDot] CPU USAGE: {}", sm.getUsedCpu(1000));


log.warn("[Handler::debugDot] RAM USAGE: {}", sm.getUsedMemory());


}


/**


  • 跳转到指定位置

  • @param session

  • @param jsonMessage


*/


private void doSeek(final WebSocketSession session, JsonObject jsonMessage) {


UserSession user = users.get(session.getId());


if (user != null) {


try {


user.getPlayerEndpoint().setPosition(jsonMessage.get("position").getAsLong());


} catch (KurentoException e) {


log.debug("The seek cannot be performed");


JsonObject response = new JsonObject();


response.addProperty("id", "seek");


response.addProperty("message", "Seek failed");


sendMessage(session, response.toString());


}


}


}


/**


  • 取得当前播放位置

  • @param session


*/


private void getPosition(final WebSocketSession session) {


UserSession user = users.get(session.getId());


if (user != null) {


long position = user.getPlayerEndpoint().getPosition();


JsonObject response = new JsonObject();


response.addProperty("id", "position");


response.addProperty("position", position);


sendMessage(session, response.toString());


}


}


/**


  • 收到前端的 Ice candidate 后,立即发给 KMS

  • @param sessionId

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
Kurento实战之五:媒体播放,mysql高级教程ppt