Kurento 实战之五:媒体播放,mysql 高级教程 ppt
spring.application.name=PlayerWithRecorder
新增一个数据结构 UserSession.java,每个网页都对应一个 UserSession 实例,重点关注的是 release 方法,在停止播放时调用此方法释放播放器和 WebRTC 连接资源:
package com.bolingcavalry.playerwithrecord;
import org.kurento.client.IceCandidate;
import org.kurento.client.MediaPipeline;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.WebRtcEndpoint;
public class UserSession {
private WebRtcEndpoint webRtcEndpoint;
private MediaPipeline mediaPipeline;
private PlayerEndpoint playerEndpoint;
public UserSession() {
}
public WebRtcEndpoint getWebRtcEndpoint() {
return webRtcEndpoint;
}
public void setWebRtcEndpoint(WebRtcEndpoint webRtcEndpoint) {
this.webRtcEndpoint = webRtcEndpoint;
}
public MediaPipeline getMediaPipeline() {
return mediaPipeline;
}
public void setMediaPipeline(MediaPipeline mediaPipeline) {
this.mediaPipeline = mediaPipeline;
}
public void addCandidate(IceCandidate candidate) {
webRtcEndpoint.addIceCandidate(candidate);
}
public PlayerEndpoint getPlayerEndpoint() {
return playerEndpoint;
}
public void setPlayerEndpoint(PlayerEndpoint playerEndpoint) {
this.playerEndpoint = playerEndpoint;
}
public void release() {
this.playerEndpoint.stop();
this.mediaPipeline.release();
}
}
启动类 PlayerWithRecorder.java,有两处要注意,一个是 registerWebSocketHandlers 方法用来绑定 websocket 的处理类,另一个是 kurentoClient,KurentoClient.create 方法的入参是 KMS 的服务地址:
package com.bolingcavalry.playerwithrecord;
import org.kurento.client.KurentoClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
@EnableWebSocket
@SpringBootApplication
public class PlayerWithRecorder implements WebSocketConfigurer {
@Bean
public PlayerHandler handler() {
return new PlayerHandler();
}
/**
实例化 KurentoClient,入参是 KMS 地址
@return
*/
@Bean
public KurentoClient kurentoClient() {
return KurentoClient.create("ws://192.168.91.128:8888/kurento");
}
@Bean
public ServletServerContainerFactoryBean createServletServerContainerFactoryBean() {
ServletServerContainerFactoryBean container = new ServletServerContainerFac
toryBean();
container.setMaxTextMessageBufferSize(32768);
return container;
}
/**
标准的 WebSocket 处理类绑定
@param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler(), "/player");
}
public static void main(String[] args) throws Exception {
SpringApplication.run(PlayerWithRecorder.class, args);
}
}
接下来就是 websocket 的处理类 PlayerHandler.java,这是本篇的核心,有几处重点稍后会提到:
package com.bolingcavalry.playerwithrecord;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.ConcurrentHashMap;
import org.kurento.client.EndOfStreamEvent;
import org.kurento.client.ErrorEvent;
import org.kurento.client.EventListener;
import org.kurento.client.IceCandidate;
import org.kurento.client.IceCandidateFoundEvent;
import org.kurento.client.KurentoClient;
import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaState;
import org.kurento.client.MediaStateChangedEvent;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.ServerManager;
import org.kurento.client.VideoInfo;
import org.kurento.client.WebRtcEndpoint;
import org.kurento.commons.exception.KurentoException;
import org.kurento.jsonrpc.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
public class PlayerHandler extends TextWebSocketHandler {
@Autowired
private KurentoClient kurento;
private final Logger log = LoggerFactory.getLogger(PlayerHandler.class);
private final Gson gson = new GsonBuilder().create();
private final ConcurrentHashMap<String, UserSession> users = new ConcurrentHashMap<>();
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
JsonObject jsonMessage = gson.fromJson(message.getPayload(), JsonObject.class);
String sessionId = session.getId();
log.debug("用户[{}]收到 websocket 命令: {} from sessionId", sessionId, jsonMessage);
try {
switch (jsonMessage.get("id").getAsString()) {
// 开始播放
case "start":
start(session, jsonMessage);
break;
// 停止播放
case "stop":
stop(sessionId);
break;
// 暂停
case "pause":
pause(sessionId);
break;
// 恢复
case "resume":
resume(session);
break;
// 生成监控内容
case "debugDot":
debugDot(session);
break;
// 前进或者倒退
case "doSeek":
doSeek(session, jsonMessage);
break;
// 取位置
case "getPosition":
getPosition(session);
break;
// 更新 WebRTC 的 ICE 数据
case "onIceCandidate":
onIceCandidate(sessionId, jsonMessage);
break;
default:
sendError(session, "Invalid message with id " + jsonMessage.get("id").getAsString());
break;
}
} catch (Throwable t) {
log.error("Exception handling message {} in sessionId {}", jsonMessage, sessionId, t);
sendError(session, t.getMessage());
}
}
private void start(final WebSocketSession session, JsonObject jsonMessage) {
// 1.新建 MediaPipeline 对象
MediaPipeline pipeline = kurento.createMediaPipeline();
// 2. 新建连接浏览器的 WebRtcEndpoint 对象
WebRtcEndpoint webRtcEndpoint = new WebRtcEndpoint.Builder(pipeline).build();
// 3.1 取出要播放的地址
String videourl = jsonMessage.get("videourl").getAsString();
// 3.2 新建负责播放的 PlayerEndpoint 对象
final PlayerEndpoint playerEndpoint = new PlayerEndpoint.Builder(pipeline, videourl).build();
// 4 playerEndpoint 连接 webRtcEndpoint,这样 playerEndpoint 解码出的内容通过 webRtcEndpoint 给到浏览器
playerEndpoint.connect(webRtcEndpoint);
// 5. WebRtc 相关的操作
// 5.1 一旦收到 KMS 的 candidate 就立即给到前端
webRtcEndpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
@Override
public void onEvent(IceCandidateFoundEvent event) {
JsonObject response = new JsonObject();
response.addProperty("id", "iceCandidate");
response.add("candidate", JsonUtils.toJsonObject(event.getCandidate()));
try {
synchronized (session) {
session.sendMessage(new TextMessage(response.toString()));
}
} catch (IOException e) {
log.debug(e.getMessage());
}
}
});
// SDP offer 是前端给的
String sdpOffer = jsonMessage.get("sdpOffer").getAsString();
// 给前端准备 SDP answer
String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer);
log.info("[Handler::start] SDP Offer from browser to KMS:\n{}", sdpOffer);
log.info("[Handler::start] SDP Answer from KMS to browser:\n{}", sdpAnswer);
JsonObject response = new JsonObject();
response.addProperty("id", "startResponse");
response.addProperty("sdpAnswer", sdpAnswer);
sendMessage(session, response.toString());
// 6. 和媒体播放有关的操作
// 6.1 KMS 会发送和媒体播放有关的消息过来,如果连接媒体成功,就把获取到的相关参数给到前端
webRtcEndpoint.addMediaStateChangedListener(new EventListener<MediaStateChangedEvent>() {
@Override
public void onEvent(MediaStateChangedEvent event) {
if (event.getNewState() == MediaState.CONNECTED) {
// 媒体相关的信息可以用 getVideoInfo 去的
VideoInfo videoInfo = playerEndpoint.getVideoInfo();
JsonObject response = new JsonObject();
response.addProperty("id", "videoInfo");
response.addProperty("isSeekable", videoInfo.getIsSeekable());
response.addProperty("initSeekable", videoInfo.getSeekableInit());
response.addProperty("endSeekable", videoInfo.getSeekableEnd());
response.addProperty("videoDuration", videoInfo.getDuration());
// 把这些媒体信息给前端
sendMessage(session, response.toString());
}
}
});
// 让 KMS 把它的 ICD Candidate 发过来(前面的监听会收到)
webRtcEndpoint.gatherCandidates();
// 7.1 添加媒体播放的监听:异常消息
playerEndpoint.addErrorListener(new EventListener<ErrorEvent>() {
@Override
public void onEvent(ErrorEvent event) {
log.info("ErrorEvent: {}", event.getDescription());
// 通知前端停止播放
sendPlayEnd(session);
}
});
// 7.2 添加媒体播放的监听:播放结束
playerEndpoint.addEndOfStreamListener(new EventListener<EndOfStreamEvent>() {
@Override
public void onEvent(EndOfStreamEvent event) {
log.info("EndOfStreamEvent: {}", event.getTimestamp());
// 通知前端停止播放
sendPlayEnd(session);
}
});
// 通过 KMS 开始连接远程媒体
playerEndpoint.play();
// 将 pipeline、webRtcEndpoint、playerEndpoint 这些信息放入 UserSession 对象中,
// 这样方便处理前端发过来的各种命令
final UserSession user = new UserSession();
user.setMediaPipeline(pipeline);
user.setWebRtcEndpoint(webRtcEndpoint);
user.setPlayerEndpoint(playerEndpoint);
users.put(session.getId(), user);
}
/**
暂停播放
@param sessionId
*/
private void pause(String sessionId) {
UserSession user = users.get(sessionId);
if (user != null) {
user.getPlayerEndpoint().pause();
}
}
/**
从暂停恢复
@param session
*/
private void resume(final WebSocketSession session) {
UserSession user = users.get(session.getId());
if (user != null) {
user.getPlayerEndpoint().play();
VideoInfo videoInfo = user.getPlayerEndpoint().getVideoInfo();
JsonObject response = new JsonObject();
response.addProperty("id", "videoInfo");
response.addProperty("isSeekable", videoInfo.getIsSeekable());
response.addProperty("initSeekable", videoInfo.getSeekableInit());
response.addProperty("endSeekable", videoInfo.getSeekableEnd());
response.addProperty("videoDuration", videoInfo.getDuration());
sendMessage(session, response.toString());
}
}
/**
停止播放
@param sessionId
*/
private void stop(String sessionId) {
UserSession user = users.remove(sessionId);
if (user != null) {
user.release();
}
}
/**
取得 Gstreamer 的 dot 内容,这样的内容可以被 graphviz 工具解析成拓扑图
@param session
*/
private void debugDot(final WebSocketSession session) {
UserSession user = users.get(session.getId());
if (user != null) {
final String pipelineDot = user.getMediaPipeline().getGstreamerDot();
try (PrintWriter out = new PrintWriter("player.dot")) {
out.println(pipelineDot);
} catch (IOException ex) {
log.error("[Handler::debugDot] Exception: {}", ex.getMessage());
}
final String playerDot = user.getPlayerEndpoint().getElementGstreamerDot();
try (PrintWriter out = new PrintWriter("player-decoder.dot")) {
out.println(playerDot);
} catch (IOException ex) {
log.error("[Handler::debugDot] Exception: {}", ex.getMessage());
}
}
ServerManager sm = kurento.getServerManager();
log.warn("[Handler::debugDot] CPU COUNT: {}", sm.getCpuCount());
log.warn("[Handler::debugDot] CPU USAGE: {}", sm.getUsedCpu(1000));
log.warn("[Handler::debugDot] RAM USAGE: {}", sm.getUsedMemory());
}
/**
跳转到指定位置
@param session
@param jsonMessage
*/
private void doSeek(final WebSocketSession session, JsonObject jsonMessage) {
UserSession user = users.get(session.getId());
if (user != null) {
try {
user.getPlayerEndpoint().setPosition(jsonMessage.get("position").getAsLong());
} catch (KurentoException e) {
log.debug("The seek cannot be performed");
JsonObject response = new JsonObject();
response.addProperty("id", "seek");
response.addProperty("message", "Seek failed");
sendMessage(session, response.toString());
}
}
}
/**
取得当前播放位置
@param session
*/
private void getPosition(final WebSocketSession session) {
UserSession user = users.get(session.getId());
if (user != null) {
long position = user.getPlayerEndpoint().getPosition();
JsonObject response = new JsonObject();
response.addProperty("id", "position");
response.addProperty("position", position);
sendMessage(session, response.toString());
}
}
/**
收到前端的 Ice candidate 后,立即发给 KMS
@param sessionId
评论