使用 Go 语言开发流媒体视频网站
简介
流媒体如今已经成为工业上的一个重要技术了,比如:直播网站、视频监控传输、APP 直播等,如何实现一个高并发的视频网站,这就涉及到语言技术的选型以及流媒体技术的使用,本节将主要介绍如何使用 Golang 来实现一个流媒体视频网站。
目录
为什么选择 Go 以及 Go 的一些优势
GoLang 的简介以及实现一个 webserver 工具链
Golang 的 channel 并发模式
用 Golang 完成一个流媒体网站
网站部署
为什么选择 Go 以及 Go 的一些优势
为什么会选择 Go 来开发视频网站呢?这其实主要体现在 Go 语言的优势。那么 Go 有哪些优势呢?
开发效率高,不管用其他语言,都需要很多其他的配置或插件,就连全家桶配套齐全的 Java 语言都会需要一个 Servlet 引擎,如:tomcat、jetty 等。但 Go 在这方面,提供了得天独厚的能力。大部分功能和内容已经集成在了 pkg。包括开发完整的开发工具链(tools、test、benchmark、builtin.etc),包括 Go 命令(go test、go install、go build)。这些都是完整的,直接下载 Go 后即可使用。包括音视频相关的插件、配置,都已经被包含在了 pkg。所以用 go 开发音视频,可以完美诠释 go 语言的优势。
另一方面,部署简单,go 属于编译性语言,而且是能够编译多个平台可执行文件的语言。Compile once,run everywhere,直接编译后生成二进制文件,直接运行。
良好的 native http 库、集成模板引擎,无需添加第三方框架。
GoLang 的简介以及实现一个 webserver 工具链
Go 语言是一种编译性语言,一个开源的编程语言,能够让构造简单、可靠且高效的软件变得容易。而且它的目标是兼具 Python 等动态语言的开发速度和集成 C/C++等编译语言的性能与安全性。它的主要特性是:
简洁、快速、安全、并行、内存管理、数组安全、编译迅速。
Go 中有一些常见的工具链,比如:
go build,编译 go 文件,可以跨平台编译:env GOOS=linux GOARCH=amd64 go build,在 CI/CD 中,这是一个非常有用的命令。
go install,这也是编译,但与 build 的区别是编译后将输出文件打包成库放在 pkg 下。
go get,用于获取 go 的第三方包,常见的是:go get -u git 地址,表示从 git 上获取某个资源并安装到本地。
go fmt,统一代码风格、排版,这将使得 go 代码更加易读、易理解。
go test,运行当前目录下的 tests,"go test -v" 会打印所有的信息,而"go test"只会打印测试的结果。
go 的 test 文件一般以 XXX_test.go 命名,这样,在执行"go test"的时候,程序会自动去执行那些被加了 test 的文件,这是一种约定的方式。
要点:
使用 TestMain 作为初始化 test,并且使用 Run()来调用其它 tests 可以完成一些需要初始化操作的 testing,如:音视频资源数据库、文件加载等,这些有的可能需要被多次使用,但在设计模式中,只会加载一次到内存,这样,可以减少过多的内存占用,同时可以一次性的进行清理。
func TestMain(m *testing.M) {
fmt.Println("Test begin")
m.Run()
}
如果没在其中加 Run(),除了 TestMain 的其它的 tests 都不被执行。
func TestPrint(t *testing.T) {
fmt.Println("Test print")
}
func TestMain(m *testing.M) {
fmt.Println("Test begin")
//m.Run()
}
按照上面说的,如果没有执行 Run()方法,则 TestPrint 函数不会被执行。
Golang 的 channel 并发模式
在 Go 中,既然有了协程,那么这些协程之间如何通信呢?Go 提供了一个 channel(通道) 来解决。
声明一个 channel
在 Go 语言中,声明一个 channel 非常简单,使用内置的 make 函数即可,如下:
ch:=make(chan string)
其中 chan 是一个关键字,表示是 channel 类型。后面的 string 表示 channel 里的数据是 string 类型。通过 channel 的声明也可以看到,chan 是一个集合类型。
定义好 chan 后就可以使用了,一个 chan 的操作只有两种:发送和接收:
发送:向 chan 发送值,把值放在 chan 中,操作符为 chan <-
接收:获取 chan 中的值,操作符为 <- chan
示例:
package main
import "fmt"
func main() {
ch := make(chan string)
go func() {
fmt.Println("La")
ch <- "发送数据者:La"
}()
fmt.Println("I am main goroutine")
v := <- ch
fmt.Println("接收到的chan中的值为:",v)
}
我们先来执行看看打印结果:
I am main goroutine
La
接收到的chan中的值为:送数据者:La
从运行结果可以看出:达到了使用 time.Sleep 函数的效果。
相信应该明白为什么程序不会在新的 goroutine 完成之前退出了,因为通过 make 创建的 chan 中没有值,而 main goroutine 又想从 chan 中获取值,获取不到就一直等待,等到另一个 goroutine 向 chan 发送值为止。
无缓冲 channel
上面的示例中,使用 make 创建的 chan 就是一个无缓冲 channel,它的容量是 0,不能存储任何数据。所以无缓冲 channel 只起到传输数据的作用,数据并不会在 channel 中做任何停留。这也意味着,无缓冲 channel 的发送和接收操作是同时进行的,它也被称为同步 channel。
有缓冲 channel
有缓冲 channel 类似一个可阻塞的队列,内部的元素先进先出。通过 make 函数的第二个参数可以指定 channel 容量的大小,进而创建一个有缓冲 channel,如:
cacheCh := make(chan int,5)
定义了一个容量为 5 的元素为 int 类型的 chan。
一个有缓冲 channel 具备以下特点:
有缓冲 channel 的内部有一个缓冲队列
发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行,接收操作释放队列的空间
接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行,发送操作插入新的元素
cache := make(chan int,5)
cache <- 2
cache <- 3
fmt.Println("容量:",cap(cache),",元素个数:",len(cache))
无缓冲 channel 其实就是一个容量大小为 0 的 channel。比如 make(chan int,0)
关闭 channel
通过内置函数 close 即可关闭 channel。如果一个 channel 被关闭了,就不能向里面发送数据了,如果发送的话,会引起 painc 异常。但是还可以接收 channel 里的数据,如果 channel 里没有数据的话,接收的数据是元素类型的零值。
单向 channel
所谓单向,即可要不发送,要么只能接收。所以单向 channel 的声明也很简单,只需要在声明的时候带上 <- 操作符即可,如下:
send := make(chan <- int)
receive := make(<- chan int)
用 Golang 完成一个流媒体网站
业务模块
API 接口设计
分层
Restful 风格设计
CRUD 区分资源操作
返回码规范
首先,我们写个启动类:
package main
import (
"net/http"
"github.com/julienschmidt/httprouter"
)
type middleWareHandler struct {
r *httprouter.Router
}
func NewMiddleWareHandler(r *httprouter.Router) http.Handler {
m := middleWareHandler{}
m.r = r
return m
}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//check session
validateUserSession(r)
m.r.ServeHTTP(w, r)
}
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.POST("/user", CreateUser)
router.POST("/user/:user_name", Login)
return router
}
func main() {
r := RegisterHandlers()
mh := NewMiddleWareHandler(r)
http.ListenAndServe(":1000", mh)
}
在这里我们实现了注册、登录以及一些初始化监听端口。接下来,我们需要看看对于后端视频处理时,主要关心的是 session:
package session
import (
"time"
"sync"
"github.com/avenssi/video_server/api/defs"
"github.com/avenssi/video_server/api/dbops"
"github.com/avenssi/video_server/api/utils"
)
var sessionMap *sync.Map
func init() {
sessionMap = &sync.Map{}
}
func nowInMilli() int64{
return time.Now().UnixNano()/1000000
}
func deleteExpiredSession(sid string) {
sessionMap.Delete(sid)
dbops.DeleteSession(sid)
}
func LoadSessionsFromDB() {
r, err := dbops.RetrieveAllSessions()
if err != nil {
return
}
r.Range(func(k, v interface{}) bool{
ss := v.(*defs.SimpleSession)
sessionMap.Store(k, ss)
return true
})
}
func GenerateNewSessionId(un string) string {
id, _ := utils.NewUUID()
ct := nowInMilli()
ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min
ss := &defs.SimpleSession{Username: un, TTL: ttl}
sessionMap.Store(id, ss)
dbops.InsertSession(id, ttl, un)
return id
}
func IsSessionExpired(sid string) (string, bool) {
ss, ok := sessionMap.Load(sid)
if ok {
ct := nowInMilli()
if ss.(*defs.SimpleSession).TTL < ct {
deleteExpiredSession(sid)
return "", true
}
return ss.(*defs.SimpleSession).Username, false
}
return "", true
}
从上面的代码中,可以看到,Go 主要引用了相关的视频插件库:avenssi/video_server 等来处理缓存 session。这也是为什么选择 go 开发后端的一个原因。
同时,我们还定义了一个错误码信息:
package defs
type Err struct {
Error string `json:"error"`
ErrorCode string `json:"error_code"`
}
type ErrResponse struct {
HttpSC int
Error Err
}
var (
ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: "Request body is not correct", ErrorCode: "001"}}
ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: "User authentication failed.", ErrorCode: "002"}}
ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: "DB ops failed", ErrorCode: "003"}}
ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: "Internal service error", ErrorCode: "004"}}
)
以上对于业务层中处理主要逻辑就是这些了,下面主要讲推流和播放。
推流
通过 RTMP 协议推送视频流,这里在推流时,需要 cache 缓存,这样可以避免服务端的不断通信导致压力增加而被 crash,因为由于每个进程都是保持长链接,当每个客户端进程不断的发起请求,那么对于服务端的进程是会堵塞的,所以,在传输协议中,大部分音视频这种,流媒体处理方式,是需要加上 cache 来缓冲对于服务器的压力,避免整个服务的瘫痪,导致客户端一直无响应,从而导致客户端进入卡死状态:
type Cache struct {
gop *GopCache
videoSeq *SpecialCache
audioSeq *SpecialCache
metadata *SpecialCache
}
func NewCache() *Cache {
return &Cache{
gop: NewGopCache(configure.Config.GetInt("gop_num")),
videoSeq: NewSpecialCache(),
audioSeq: NewSpecialCache(),
metadata: NewSpecialCache(),
}
}
func (cache *Cache) Write(p av.Packet) {
if p.IsMetadata {
cache.metadata.Write(&p)
return
} else {
if !p.IsVideo {
ah, ok := p.Header.(av.AudioPacketHeader)
if ok {
if ah.SoundFormat() == av.SOUND_AAC &&
ah.AACPacketType() == av.AAC_SEQHDR {
cache.audioSeq.Write(&p)
return
} else {
return
}
}
} else {
vh, ok := p.Header.(av.VideoPacketHeader)
if ok {
if vh.IsSeq() {
cache.videoSeq.Write(&p)
return
}
} else {
return
}
}
}
cache.gop.Write(&p)
}
func (cache *Cache) Send(w av.WriteCloser) error {
if err := cache.metadata.Send(w); err != nil {
return err
}
if err := cache.videoSeq.Send(w); err != nil {
return err
}
if err := cache.audioSeq.Send(w); err != nil {
return err
}
if err := cache.gop.Send(w); err != nil {
return err
}
return nil
}
在推流时,主要还是先进行选择:
const (
maxQueueNum = 1024
SAVE_STATICS_INTERVAL = 5000
)
var (
readTimeout = configure.Config.GetInt("read_timeout")
writeTimeout = configure.Config.GetInt("write_timeout")
)
type Client struct {
handler av.Handler
getter av.GetWriter
}
func NewRtmpClient(h av.Handler, getter av.GetWriter) *Client {
return &Client{
handler: h,
getter: getter,
}
}
func (c *Client) Dial(url string, method string) error {
connClient := core.NewConnClient()
if err := connClient.Start(url, method); err != nil {
return err
}
if method == av.PUBLISH {
writer := NewVirWriter(connClient)
log.Debugf("client Dial call NewVirWriter url=%s, method=%s", url, method)
c.handler.HandleWriter(writer)
} else if method == av.PLAY {
reader := NewVirReader(connClient)
log.Debugf("client Dial call NewVirReader url=%s, method=%s", url, method)
c.handler.HandleReader(reader)
if c.getter != nil {
writer := c.getter.GetWriter(reader.Info())
c.handler.HandleWriter(writer)
}
}
return nil
}
func (c *Client) GetHandle() av.Handler {
return c.handler
}
type Server struct {
handler av.Handler
getter av.GetWriter
}
func NewRtmpServer(h av.Handler, getter av.GetWriter) *Server {
return &Server{
handler: h,
getter: getter,
}
}
func (s *Server) Serve(listener net.Listener) (err error) {
defer func() {
if r := recover(); r != nil {
log.Error("rtmp serve panic: ", r)
}
}()
for {
var netconn net.Conn
netconn, err = listener.Accept()
if err != nil {
return
}
conn := core.NewConn(netconn, 4*1024)
log.Debug("new client, connect remote: ", conn.RemoteAddr().String(),
"local:", conn.LocalAddr().String())
go s.handleConn(conn)
}
}
func (s *Server) handleConn(conn *core.Conn) error {
if err := conn.HandshakeServer(); err != nil {
conn.Close()
log.Error("handleConn HandshakeServer err: ", err)
return err
}
connServer := core.NewConnServer(conn)
if err := connServer.ReadMsg(); err != nil {
conn.Close()
log.Error("handleConn read msg err: ", err)
return err
}
appname, name, _ := connServer.GetInfo()
if ret := configure.CheckAppName(appname); !ret {
err := fmt.Errorf("application name=%s is not configured", appname)
conn.Close()
log.Error("CheckAppName err: ", err)
return err
}
log.Debugf("handleConn: IsPublisher=%v", connServer.IsPublisher())
if connServer.IsPublisher() {
if configure.Config.GetBool("rtmp_noauth") {
key, err := configure.RoomKeys.GetKey(name)
if err != nil {
err := fmt.Errorf("Cannot create key err=%s", err.Error())
conn.Close()
log.Error("GetKey err: ", err)
return err
}
name = key
}
channel, err := configure.RoomKeys.GetChannel(name)
if err != nil {
err := fmt.Errorf("invalid key err=%s", err.Error())
conn.Close()
log.Error("CheckKey err: ", err)
return err
}
connServer.PublishInfo.Name = channel
if pushlist, ret := configure.GetStaticPushUrlList(appname); ret && (pushlist != nil) {
log.Debugf("GetStaticPushUrlList: %v", pushlist)
}
reader := NewVirReader(connServer)
s.handler.HandleReader(reader)
log.Debugf("new publisher: %+v", reader.Info())
if s.getter != nil {
writeType := reflect.TypeOf(s.getter)
log.Debugf("handleConn:writeType=%v", writeType)
writer := s.getter.GetWriter(reader.Info())
s.handler.HandleWriter(writer)
}
if configure.Config.GetBool("flv_archive") {
flvWriter := new(flv.FlvDvr)
s.handler.HandleWriter(flvWriter.GetWriter(reader.Info()))
}
} else {
writer := NewVirWriter(connServer)
log.Debugf("new player: %+v", writer.Info())
s.handler.HandleWriter(writer)
}
return nil
}
type GetInFo interface {
GetInfo() (string, string, string)
}
type StreamReadWriteCloser interface {
GetInFo
Close(error)
Write(core.ChunkStream) error
Read(c *core.ChunkStream) error
}
type StaticsBW struct {
StreamId uint32
VideoDatainBytes uint64
LastVideoDatainBytes uint64
VideoSpeedInBytesperMS uint64
AudioDatainBytes uint64
LastAudioDatainBytes uint64
AudioSpeedInBytesperMS uint64
LastTimestamp int64
}
type VirWriter struct {
Uid string
closed bool
av.RWBaser
conn StreamReadWriteCloser
packetQueue chan *av.Packet
WriteBWInfo StaticsBW
}
func NewVirWriter(conn StreamReadWriteCloser) *VirWriter {
ret := &VirWriter{
Uid: uid.NewId(),
conn: conn,
RWBaser: av.NewRWBaser(time.Second * time.Duration(writeTimeout)),
packetQueue: make(chan *av.Packet, maxQueueNum),
WriteBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0},
}
go ret.Check()
go func() {
err := ret.SendPacket()
if err != nil {
log.Warning(err)
}
}()
return ret
}
func (v *VirWriter) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) {
nowInMS := int64(time.Now().UnixNano() / 1e6)
v.WriteBWInfo.StreamId = streamid
if isVideoFlag {
v.WriteBWInfo.VideoDatainBytes = v.WriteBWInfo.VideoDatainBytes + length
} else {
v.WriteBWInfo.AudioDatainBytes = v.WriteBWInfo.AudioDatainBytes + length
}
if v.WriteBWInfo.LastTimestamp == 0 {
v.WriteBWInfo.LastTimestamp = nowInMS
} else if (nowInMS - v.WriteBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL {
diffTimestamp := (nowInMS - v.WriteBWInfo.LastTimestamp) / 1000
v.WriteBWInfo.VideoSpeedInBytesperMS = (v.WriteBWInfo.VideoDatainBytes - v.WriteBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.WriteBWInfo.AudioSpeedInBytesperMS = (v.WriteBWInfo.AudioDatainBytes - v.WriteBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.WriteBWInfo.LastVideoDatainBytes = v.WriteBWInfo.VideoDatainBytes
v.WriteBWInfo.LastAudioDatainBytes = v.WriteBWInfo.AudioDatainBytes
v.WriteBWInfo.LastTimestamp = nowInMS
}
}
func (v *VirWriter) Check() {
var c core.ChunkStream
for {
if err := v.conn.Read(&c); err != nil {
v.Close(err)
return
}
}
}
func (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Warningf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
// try to don't drop audio
if ok && tmpPkt.IsAudio {
if len(pktQue) > maxQueueNum-2 {
log.Debug("drop audio pkt")
<-pktQue
} else {
pktQue <- tmpPkt
}
}
if ok && tmpPkt.IsVideo {
videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader)
// dont't drop sps config and dont't drop key frame
if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) {
pktQue <- tmpPkt
}
if len(pktQue) > maxQueueNum-10 {
log.Debug("drop video pkt")
<-pktQue
}
}
}
log.Debug("packet queue len: ", len(pktQue))
}
//
func (v *VirWriter) Write(p *av.Packet) (err error) {
err = nil
if v.closed {
err = fmt.Errorf("VirWriter closed")
return
}
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("VirWriter has already been closed:%v", e)
}
}()
if len(v.packetQueue) >= maxQueueNum-24 {
v.DropPacket(v.packetQueue, v.Info())
} else {
v.packetQueue <- p
}
return
}
func (v *VirWriter) SendPacket() error {
Flush := reflect.ValueOf(v.conn).MethodByName("Flush")
var cs core.ChunkStream
for {
p, ok := <-v.packetQueue
if ok {
cs.Data = p.Data
cs.Length = uint32(len(p.Data))
cs.StreamID = p.StreamID
cs.Timestamp = p.TimeStamp
cs.Timestamp += v.BaseTimeStamp()
if p.IsVideo {
cs.TypeID = av.TAG_VIDEO
} else {
if p.IsMetadata {
cs.TypeID = av.TAG_SCRIPTDATAAMF0
} else {
cs.TypeID = av.TAG_AUDIO
}
}
v.SaveStatics(p.StreamID, uint64(cs.Length), p.IsVideo)
v.SetPreTime()
v.RecTimeStamp(cs.Timestamp, cs.TypeID)
err := v.conn.Write(cs)
if err != nil {
v.closed = true
return err
}
Flush.Call(nil)
} else {
return fmt.Errorf("closed")
}
}
}
func (v *VirWriter) Info() (ret av.Info) {
ret.UID = v.Uid
_, _, URL := v.conn.GetInfo()
ret.URL = URL
_url, err := url.Parse(URL)
if err != nil {
log.Warning(err)
}
ret.Key = strings.TrimLeft(_url.Path, "/")
ret.Inter = true
return
}
func (v *VirWriter) Close(err error) {
log.Warning("player ", v.Info(), "closed: "+err.Error())
if !v.closed {
close(v.packetQueue)
}
v.closed = true
v.conn.Close(err)
}
type VirReader struct {
Uid string
av.RWBaser
demuxer *flv.Demuxer
conn StreamReadWriteCloser
ReadBWInfo StaticsBW
}
func NewVirReader(conn StreamReadWriteCloser) *VirReader {
return &VirReader{
Uid: uid.NewId(),
conn: conn,
RWBaser: av.NewRWBaser(time.Second * time.Duration(writeTimeout)),
demuxer: flv.NewDemuxer(),
ReadBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0},
}
}
func (v *VirReader) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) {
nowInMS := int64(time.Now().UnixNano() / 1e6)
v.ReadBWInfo.StreamId = streamid
if isVideoFlag {
v.ReadBWInfo.VideoDatainBytes = v.ReadBWInfo.VideoDatainBytes + length
} else {
v.ReadBWInfo.AudioDatainBytes = v.ReadBWInfo.AudioDatainBytes + length
}
if v.ReadBWInfo.LastTimestamp == 0 {
v.ReadBWInfo.LastTimestamp = nowInMS
} else if (nowInMS - v.ReadBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL {
diffTimestamp := (nowInMS - v.ReadBWInfo.LastTimestamp) / 1000
//log.Printf("now=%d, last=%d, diff=%d", nowInMS, v.ReadBWInfo.LastTimestamp, diffTimestamp)
v.ReadBWInfo.VideoSpeedInBytesperMS = (v.ReadBWInfo.VideoDatainBytes - v.ReadBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.ReadBWInfo.AudioSpeedInBytesperMS = (v.ReadBWInfo.AudioDatainBytes - v.ReadBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.ReadBWInfo.LastVideoDatainBytes = v.ReadBWInfo.VideoDatainBytes
v.ReadBWInfo.LastAudioDatainBytes = v.ReadBWInfo.AudioDatainBytes
v.ReadBWInfo.LastTimestamp = nowInMS
}
}
func (v *VirReader) Read(p *av.Packet) (err error) {
defer func() {
if r := recover(); r != nil {
log.Warning("rtmp read packet panic: ", r)
}
}()
v.SetPreTime()
var cs core.ChunkStream
for {
err = v.conn.Read(&cs)
if err != nil {
return err
}
if cs.TypeID == av.TAG_AUDIO ||
cs.TypeID == av.TAG_VIDEO ||
cs.TypeID == av.TAG_SCRIPTDATAAMF0 ||
cs.TypeID == av.TAG_SCRIPTDATAAMF3 {
break
}
}
p.IsAudio = cs.TypeID == av.TAG_AUDIO
p.IsVideo = cs.TypeID == av.TAG_VIDEO
p.IsMetadata = cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3
p.StreamID = cs.StreamID
p.Data = cs.Data
p.TimeStamp = cs.Timestamp
v.SaveStatics(p.StreamID, uint64(len(p.Data)), p.IsVideo)
v.demuxer.DemuxH(p)
return err
}
func (v *VirReader) Info() (ret av.Info) {
ret.UID = v.Uid
_, _, URL := v.conn.GetInfo()
ret.URL = URL
_url, err := url.Parse(URL)
if err != nil {
log.Warning(err)
}
ret.Key = strings.TrimLeft(_url.Path, "/")
return
}
func (v *VirReader) Close(err error) {
log.Debug("publisher ", v.Info(), "closed: "+err.Error())
v.conn.Close(err)
}
播放
视频流媒体播放,支持多种协议:rtmp、flv、hls,我们先看看 rtmp:
var (
STOP_CTRL = "RTMPRELAY_STOP"
)
type RtmpRelay struct {
PlayUrl string
PublishUrl string
cs_chan chan core.ChunkStream
sndctrl_chan chan string
connectPlayClient *core.ConnClient
connectPublishClient *core.ConnClient
startflag bool
}
func NewRtmpRelay(playurl *string, publishurl *string) *RtmpRelay {
return &RtmpRelay{
PlayUrl: *playurl,
PublishUrl: *publishurl,
cs_chan: make(chan core.ChunkStream, 500),
sndctrl_chan: make(chan string),
connectPlayClient: nil,
connectPublishClient: nil,
startflag: false,
}
}
func (self *RtmpRelay) rcvPlayChunkStream() {
log.Debug("rcvPlayRtmpMediaPacket connectClient.Read...")
for {
var rc core.ChunkStream
if self.startflag == false {
self.connectPlayClient.Close(nil)
log.Debugf("rcvPlayChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
break
}
err := self.connectPlayClient.Read(&rc)
if err != nil && err == io.EOF {
break
}
//log.Debugf("connectPlayClient.Read return rc.TypeID=%v length=%d, err=%v", rc.TypeID, len(rc.Data), err)
switch rc.TypeID {
case 20, 17:
r := bytes.NewReader(rc.Data)
vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)
log.Debugf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err)
case 18:
log.Debug("rcvPlayRtmpMediaPacket: metadata....")
case 8, 9:
self.cs_chan <- rc
}
}
}
func (self *RtmpRelay) sendPublishChunkStream() {
for {
select {
case rc := <-self.cs_chan:
//log.Debugf("sendPublishChunkStream: rc.TypeID=%v length=%d", rc.TypeID, len(rc.Data))
self.connectPublishClient.Write(rc)
case ctrlcmd := <-self.sndctrl_chan:
if ctrlcmd == STOP_CTRL {
self.connectPublishClient.Close(nil)
log.Debugf("sendPublishChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
return
}
}
}
}
func (self *RtmpRelay) Start() error {
if self.startflag {
return fmt.Errorf("The rtmprelay already started, playurl=%s, publishurl=%s\n", self.PlayUrl, self.PublishUrl)
}
self.connectPlayClient = core.NewConnClient()
self.connectPublishClient = core.NewConnClient()
log.Debugf("play server addr:%v starting....", self.PlayUrl)
err := self.connectPlayClient.Start(self.PlayUrl, av.PLAY)
if err != nil {
log.Debugf("connectPlayClient.Start url=%v error", self.PlayUrl)
return err
}
log.Debugf("publish server addr:%v starting....", self.PublishUrl)
err = self.connectPublishClient.Start(self.PublishUrl, av.PUBLISH)
if err != nil {
log.Debugf("connectPublishClient.Start url=%v error", self.PublishUrl)
self.connectPlayClient.Close(nil)
return err
}
self.startflag = true
go self.rcvPlayChunkStream()
go self.sendPublishChunkStream()
return nil
}
func (self *RtmpRelay) Stop() {
if !self.startflag {
log.Debugf("The rtmprelay already stoped, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
return
}
self.startflag = false
self.sndctrl_chan <- STOP_CTRL
}
rtmp 协议中,对于流可以进行 chan,这样对于服务端的 cpu 利用率可以很好地提升,同时,避免了 cpu 利用不上去,而 memory 的溢出,最终会导致服务被 crash。我们接下来再看看 hls 协议,hls 协议其实也是基于 http 协议的一种自适应码率的传输协议,我们可以先看看其对缓存的 flush:
const (
videoHZ = 90000
aacSampleLen = 1024
maxQueueNum = 512
h264_default_hz uint64 = 90
)
type Source struct {
av.RWBaser
seq int
info av.Info
bwriter *bytes.Buffer
btswriter *bytes.Buffer
demuxer *flv.Demuxer
muxer *ts.Muxer
pts, dts uint64
stat *status
align *align
cache *audioCache
tsCache *TSCacheItem
tsparser *parser.CodecParser
closed bool
packetQueue chan *av.Packet
}
func NewSource(info av.Info) *Source {
info.Inter = true
s := &Source{
info: info,
align: &align{},
stat: newStatus(),
RWBaser: av.NewRWBaser(time.Second * 10),
cache: newAudioCache(),
demuxer: flv.NewDemuxer(),
muxer: ts.NewMuxer(),
tsCache: NewTSCacheItem(info.Key),
tsparser: parser.NewCodecParser(),
bwriter: bytes.NewBuffer(make([]byte, 100*1024)),
packetQueue: make(chan *av.Packet, maxQueueNum),
}
go func() {
err := s.SendPacket()
if err != nil {
log.Debug("send packet error: ", err)
s.closed = true
}
}()
return s
}
func (source *Source) GetCacheInc() *TSCacheItem {
return source.tsCache
}
func (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Warningf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
// try to don't drop audio
if ok && tmpPkt.IsAudio {
if len(pktQue) > maxQueueNum-2 {
<-pktQue
} else {
pktQue <- tmpPkt
}
}
if ok && tmpPkt.IsVideo {
videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader)
// dont't drop sps config and dont't drop key frame
if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) {
pktQue <- tmpPkt
}
if len(pktQue) > maxQueueNum-10 {
<-pktQue
}
}
}
log.Debug("packet queue len: ", len(pktQue))
}
func (source *Source) Write(p *av.Packet) (err error) {
err = nil
if source.closed {
err = fmt.Errorf("hls source closed")
return
}
source.SetPreTime()
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("hls source has already been closed:%v", e)
}
}()
if len(source.packetQueue) >= maxQueueNum-24 {
source.DropPacket(source.packetQueue, source.info)
} else {
if !source.closed {
source.packetQueue <- p
}
}
return
}
func (source *Source) SendPacket() error {
defer func() {
log.Debugf("[%v] hls sender stop", source.info)
if r := recover(); r != nil {
log.Warning("hls SendPacket panic: ", r)
}
}()
log.Debugf("[%v] hls sender start", source.info)
for {
if source.closed {
return fmt.Errorf("closed")
}
p, ok := <-source.packetQueue
if ok {
if p.IsMetadata {
continue
}
err := source.demuxer.Demux(p)
if err == flv.ErrAvcEndSEQ {
log.Warning(err)
continue
} else {
if err != nil {
log.Warning(err)
return err
}
}
compositionTime, isSeq, err := source.parse(p)
if err != nil {
log.Warning(err)
}
if err != nil || isSeq {
continue
}
if source.btswriter != nil {
source.stat.update(p.IsVideo, p.TimeStamp)
source.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime))
source.tsMux(p)
}
} else {
return fmt.Errorf("closed")
}
}
}
func (source *Source) Info() (ret av.Info) {
return source.info
}
func (source *Source) cleanup() {
close(source.packetQueue)
source.bwriter = nil
source.btswriter = nil
source.cache = nil
source.tsCache = nil
}
func (source *Source) Close(err error) {
log.Debug("hls source closed: ", source.info)
if !source.closed && !configure.Config.GetBool("hls_keep_after_end") {
source.cleanup()
}
source.closed = true
}
func (source *Source) cut() {
newf := true
if source.btswriter == nil {
source.btswriter = bytes.NewBuffer(nil)
} else if source.btswriter != nil && source.stat.durationMs() >= duration {
source.flushAudio()
source.seq++
filename := fmt.Sprintf("/%s/%d.ts", source.info.Key, time.Now().Unix())
item := NewTSItem(filename, int(source.stat.durationMs()), source.seq, source.btswriter.Bytes())
source.tsCache.SetItem(filename, item)
source.btswriter.Reset()
source.stat.resetAndNew()
} else {
newf = false
}
if newf {
source.btswriter.Write(source.muxer.PAT())
source.btswriter.Write(source.muxer.PMT(av.SOUND_AAC, true))
}
}
func (source *Source) parse(p *av.Packet) (int32, bool, error) {
var compositionTime int32
var ah av.AudioPacketHeader
var vh av.VideoPacketHeader
if p.IsVideo {
vh = p.Header.(av.VideoPacketHeader)
if vh.CodecID() != av.VIDEO_H264 {
return compositionTime, false, ErrNoSupportVideoCodec
}
compositionTime = vh.CompositionTime()
if vh.IsKeyFrame() && vh.IsSeq() {
return compositionTime, true, source.tsparser.Parse(p, source.bwriter)
}
} else {
ah = p.Header.(av.AudioPacketHeader)
if ah.SoundFormat() != av.SOUND_AAC {
return compositionTime, false, ErrNoSupportAudioCodec
}
if ah.AACPacketType() == av.AAC_SEQHDR {
return compositionTime, true, source.tsparser.Parse(p, source.bwriter)
}
}
source.bwriter.Reset()
if err := source.tsparser.Parse(p, source.bwriter); err != nil {
return compositionTime, false, err
}
p.Data = source.bwriter.Bytes()
if p.IsVideo && vh.IsKeyFrame() {
source.cut()
}
return compositionTime, false, nil
}
func (source *Source) calcPtsDts(isVideo bool, ts, compositionTs uint32) {
source.dts = uint64(ts) * h264_default_hz
if isVideo {
source.pts = source.dts + uint64(compositionTs)*h264_default_hz
} else {
sampleRate, _ := source.tsparser.SampleRate()
source.align.align(&source.dts, uint32(videoHZ*aacSampleLen/sampleRate))
source.pts = source.dts
}
}
func (source *Source) flushAudio() error {
return source.muxAudio(1)
}
func (source *Source) muxAudio(limit byte) error {
if source.cache.CacheNum() < limit {
return nil
}
var p av.Packet
_, pts, buf := source.cache.GetFrame()
p.Data = buf
p.TimeStamp = uint32(pts / h264_default_hz)
return source.muxer.Mux(&p, source.btswriter)
}
func (source *Source) tsMux(p *av.Packet) error {
if p.IsVideo {
return source.muxer.Mux(p, source.btswriter)
} else {
source.cache.Cache(p.Data, source.pts)
return source.muxAudio(cache_max_frames)
}
}
对于 hls 本身的流传输,还是遵循 http 协议:
const (
duration = 3000
)
var (
ErrNoPublisher = fmt.Errorf("no publisher")
ErrInvalidReq = fmt.Errorf("invalid req url path")
ErrNoSupportVideoCodec = fmt.Errorf("no support video codec")
ErrNoSupportAudioCodec = fmt.Errorf("no support audio codec")
)
var crossdomainxml = []byte(`<?xml version="1.0" ?>
<cross-domain-policy>
<allow-access-from domain="*" />
<allow-http-request-headers-from domain="*" headers="*"/>
</cross-domain-policy>`)
type Server struct {
listener net.Listener
conns *sync.Map
}
func NewServer() *Server {
ret := &Server{
conns: &sync.Map{},
}
go ret.checkStop()
return ret
}
func (server *Server) Serve(listener net.Listener) error {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
server.handle(w, r)
})
server.listener = listener
http.Serve(listener, mux)
return nil
}
func (server *Server) GetWriter(info av.Info) av.WriteCloser {
var s *Source
v, ok := server.conns.Load(info.Key)
if !ok {
log.Debug("new hls source")
s = NewSource(info)
server.conns.Store(info.Key, s)
} else {
s = v.(*Source)
}
return s
}
func (server *Server) getConn(key string) *Source {
v, ok := server.conns.Load(key)
if !ok {
return nil
}
return v.(*Source)
}
func (server *Server) checkStop() {
for {
<-time.After(5 * time.Second)
server.conns.Range(func(key, val interface{}) bool {
v := val.(*Source)
if !v.Alive() && !configure.Config.GetBool("hls_keep_after_end") {
log.Debug("check stop and remove: ", v.Info())
server.conns.Delete(key)
}
return true
})
}
}
func (server *Server) handle(w http.ResponseWriter, r *http.Request) {
if path.Base(r.URL.Path) == "crossdomain.xml" {
w.Header().Set("Content-Type", "application/xml")
w.Write(crossdomainxml)
return
}
switch path.Ext(r.URL.Path) {
case ".m3u8":
key, _ := server.parseM3u8(r.URL.Path)
conn := server.getConn(key)
if conn == nil {
http.Error(w, ErrNoPublisher.Error(), http.StatusForbidden)
return
}
tsCache := conn.GetCacheInc()
if tsCache == nil {
http.Error(w, ErrNoPublisher.Error(), http.StatusForbidden)
return
}
body, err := tsCache.GenM3U8PlayList()
if err != nil {
log.Debug("GenM3U8PlayList error: ", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Content-Type", "application/x-mpegURL")
w.Header().Set("Content-Length", strconv.Itoa(len(body)))
w.Write(body)
case ".ts":
key, _ := server.parseTs(r.URL.Path)
conn := server.getConn(key)
if conn == nil {
http.Error(w, ErrNoPublisher.Error(), http.StatusForbidden)
return
}
tsCache := conn.GetCacheInc()
item, err := tsCache.GetItem(r.URL.Path)
if err != nil {
log.Debug("GetItem error: ", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "video/mp2ts")
w.Header().Set("Content-Length", strconv.Itoa(len(item.Data)))
w.Write(item.Data)
}
}
func (server *Server) parseM3u8(pathstr string) (key string, err error) {
pathstr = strings.TrimLeft(pathstr, "/")
key = strings.Split(pathstr, path.Ext(pathstr))[0]
return
}
func (server *Server) parseTs(pathstr string) (key string, err error) {
pathstr = strings.TrimLeft(pathstr, "/")
paths := strings.SplitN(pathstr, "/", 3)
if len(paths) != 3 {
err = fmt.Errorf("invalid path=%s", pathstr)
return
}
key = paths[0] + "/" + paths[1]
return
}
最后来看看 http-flv 协议,它其实是一种结合了 RTMP 的低延时,以及可以复用现有 HTTP 分发资源的流式协议。
FLV 文件头和文件体,还有其 AudioTag、VideoTag,其本身透露的特点是:
可以在一定程度上避免防火墙的干扰
可以使用 HTTPS 做加密通道
可以很好的兼容 HTTP 302 跳转,做到灵活调度
最后能很好的支持 ios、安卓
func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *streams {
rtmpStream := server.handler.(*rtmp.RtmpStream)
if rtmpStream == nil {
return nil
}
msgs := new(streams)
rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
if s, ok := val.(*rtmp.Stream); ok {
if s.GetReader() != nil {
msg := stream{key.(string), s.GetReader().Info().UID}
msgs.Publishers = append(msgs.Publishers, msg)
}
}
return true
})
rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
ws := val.(*rtmp.Stream).GetWs()
ws.Range(func(k, v interface{}) bool {
if pw, ok := v.(*rtmp.PackWriterCloser); ok {
if pw.GetWriter() != nil {
msg := stream{key.(string), pw.GetWriter().Info().UID}
msgs.Players = append(msgs.Players, msg)
}
}
return true
})
return true
})
return msgs
}
func (server *Server) getStream(w http.ResponseWriter, r *http.Request) {
msgs := server.getStreams(w, r)
if msgs == nil {
return
}
resp, _ := json.Marshal(msgs)
w.Header().Set("Content-Type", "application/json")
w.Write(resp)
}
func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) {
defer func() {
if r := recover(); r != nil {
log.Error("http flv handleConn panic: ", r)
}
}()
url := r.URL.String()
u := r.URL.Path
if pos := strings.LastIndex(u, "."); pos < 0 || u[pos:] != ".flv" {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv")
paths := strings.SplitN(path, "/", 2)
log.Debug("url:", u, "path:", path, "paths:", paths)
if len(paths) != 2 {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
// 判断流是否发布,如果没有发布,直接返回404
msgs := server.getStreams(w, r)
if msgs == nil || len(msgs.Publishers) == 0 {
http.Error(w, "invalid path", http.StatusNotFound)
return
} else {
include := false
for _, item := range msgs.Publishers {
if item.Key == path {
include = true
break
}
}
if include == false {
http.Error(w, "invalid path", http.StatusNotFound)
return
}
}
w.Header().Set("Access-Control-Allow-Origin", "*")
writer := NewFLVWriter(paths[0], paths[1], url, w)
server.handler.HandleWriter(writer)
writer.Wait()
}
流媒体视频容器格式
所谓容器,就是把编码器生成的多媒体内容(视频,音频,字幕,章节信息等)混合封装在一起的标准。容器使得不同多媒体内容同步播放变得很简单。
视频格式是视频播放软件为了能够播放视频文件而赋予视频文件的一种识别符号。简之,视频格式规定了和播放器的通信协议。
对于容器格式,我们支持 flv、ts 格式,flv 格式是一种比较常见的格式,比如:爱奇艺、短视频等。当然,flv 格式相对而言还是比较简单的,主要是由两部分组成:FLV header、FLV body。所以 header、body 都需要进行 parse:
const (
headerLen = 11
)
type FLVWriter struct {
Uid string
av.RWBaser
app, title, url string
buf []byte
closed chan struct{}
ctx *os.File
closedWriter bool
}
func NewFLVWriter(app, title, url string, ctx *os.File) *FLVWriter {
ret := &FLVWriter{
Uid: uid.NewId(),
app: app,
title: title,
url: url,
ctx: ctx,
RWBaser: av.NewRWBaser(time.Second * 10),
closed: make(chan struct{}),
buf: make([]byte, headerLen),
}
ret.ctx.Write(flvHeader)
pio.PutI32BE(ret.buf[:4], 0)
ret.ctx.Write(ret.buf[:4])
return ret
}
func (writer *FLVWriter) Write(p *av.Packet) error {
writer.RWBaser.SetPreTime()
h := writer.buf[:headerLen]
typeID := av.TAG_VIDEO
if !p.IsVideo {
if p.IsMetadata {
var err error
typeID = av.TAG_SCRIPTDATAAMF0
p.Data, err = amf.MetaDataReform(p.Data, amf.DEL)
if err != nil {
return err
}
} else {
typeID = av.TAG_AUDIO
}
}
dataLen := len(p.Data)
timestamp := p.TimeStamp
timestamp += writer.BaseTimeStamp()
writer.RWBaser.RecTimeStamp(timestamp, uint32(typeID))
preDataLen := dataLen + headerLen
timestampbase := timestamp & 0xffffff
timestampExt := timestamp >> 24 & 0xff
pio.PutU8(h[0:1], uint8(typeID))
pio.PutI24BE(h[1:4], int32(dataLen))
pio.PutI24BE(h[4:7], int32(timestampbase))
pio.PutU8(h[7:8], uint8(timestampExt))
if _, err := writer.ctx.Write(h); err != nil {
return err
}
if _, err := writer.ctx.Write(p.Data); err != nil {
return err
}
pio.PutI32BE(h[:4], int32(preDataLen))
if _, err := writer.ctx.Write(h[:4]); err != nil {
return err
}
return nil
}
func (writer *FLVWriter) Wait() {
select {
case <-writer.closed:
return
}
}
func (writer *FLVWriter) Close(error) {
if writer.closedWriter {
return
}
writer.closedWriter = true
writer.ctx.Close()
close(writer.closed)
}
func (writer *FLVWriter) Info() (ret av.Info) {
ret.UID = writer.Uid
ret.URL = writer.url
ret.Key = writer.app + "/" + writer.title
return
}
type FlvDvr struct{}
func (f *FlvDvr) GetWriter(info av.Info) av.WriteCloser {
paths := strings.SplitN(info.Key, "/", 2)
if len(paths) != 2 {
log.Warning("invalid info")
return nil
}
flvDir := configure.Config.GetString("flv_dir")
err := os.MkdirAll(path.Join(flvDir, paths[0]), 0755)
if err != nil {
log.Error("mkdir error: ", err)
return nil
}
fileName := fmt.Sprintf("%s_%d.%s", path.Join(flvDir, info.Key), time.Now().Unix(), "flv")
log.Debug("flv dvr save stream to: ", fileName)
w, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0755)
if err != nil {
log.Error("open file error: ", err)
return nil
}
writer := NewFLVWriter(paths[0], paths[1], info.URL, w)
log.Debug("new flv dvr: ", writer.Info())
return writer
}
每个 FLV tag 包含音频、视频、脚本、可选的加密元数据以及负载等信息,其实说的就是 tag 的三种类型:音频流、视频流、脚本流,那么对于其 parse,我们来看看:
type Tag struct {
flvt flvTag
mediat mediaTag
}
func (tag *Tag) SoundFormat() uint8 {
return tag.mediat.soundFormat
}
func (tag *Tag) AACPacketType() uint8 {
return tag.mediat.aacPacketType
}
func (tag *Tag) IsKeyFrame() bool {
return tag.mediat.frameType == av.FRAME_KEY
}
func (tag *Tag) IsSeq() bool {
return tag.mediat.frameType == av.FRAME_KEY &&
tag.mediat.avcPacketType == av.AVC_SEQHDR
}
func (tag *Tag) CodecID() uint8 {
return tag.mediat.codecID
}
func (tag *Tag) CompositionTime() int32 {
return tag.mediat.compositionTime
}
// ParseMediaTagHeader, parse video, audio, tag header
func (tag *Tag) ParseMediaTagHeader(b []byte, isVideo bool) (n int, err error) {
switch isVideo {
case false:
n, err = tag.parseAudioHeader(b)
case true:
n, err = tag.parseVideoHeader(b)
}
return
}
func (tag *Tag) parseAudioHeader(b []byte) (n int, err error) {
if len(b) < n+1 {
err = fmt.Errorf("invalid audiodata len=%d", len(b))
return
}
flags := b[0]
tag.mediat.soundFormat = flags >> 4
tag.mediat.soundRate = (flags >> 2) & 0x3
tag.mediat.soundSize = (flags >> 1) & 0x1
tag.mediat.soundType = flags & 0x1
n++
switch tag.mediat.soundFormat {
case av.SOUND_AAC:
tag.mediat.aacPacketType = b[1]
n++
}
return
}
func (tag *Tag) parseVideoHeader(b []byte) (n int, err error) {
if len(b) < n+5 {
err = fmt.Errorf("invalid videodata len=%d", len(b))
return
}
flags := b[0]
tag.mediat.frameType = flags >> 4
tag.mediat.codecID = flags & 0xf
n++
if tag.mediat.frameType == av.FRAME_INTER || tag.mediat.frameType == av.FRAME_KEY {
tag.mediat.avcPacketType = b[1]
for i := 2; i < 5; i++ {
tag.mediat.compositionTime = tag.mediat.compositionTime<<8 + int32(b[i])
}
n += 4
}
return
}
大家知道在数据传输、数据存储时,为了保证数据的正确性,需要采用检错的手段来处理,crc 在很多检错手段中是最出名的一种,其检错能力极强,开销小,易于用编码器及检测电路实现。所以这里对于 ts 格式的多路调制,采用了 crc32 法校验:
func (muxer *Muxer) Mux(p *av.Packet, w io.Writer) error {
first := true
wBytes := 0
pesIndex := 0
tmpLen := byte(0)
dataLen := byte(0)
var pes pesHeader
dts := int64(p.TimeStamp) * int64(h264DefaultHZ)
pts := dts
pid := audioPID
var videoH av.VideoPacketHeader
if p.IsVideo {
pid = videoPID
videoH, _ = p.Header.(av.VideoPacketHeader)
pts = dts + int64(videoH.CompositionTime())*int64(h264DefaultHZ)
}
err := pes.packet(p, pts, dts)
if err != nil {
return err
}
pesHeaderLen := pes.len
packetBytesLen := len(p.Data) + int(pesHeaderLen)
for {
if packetBytesLen <= 0 {
break
}
if p.IsVideo {
muxer.videoCc++
if muxer.videoCc > 0xf {
muxer.videoCc = 0
}
} else {
muxer.audioCc++
if muxer.audioCc > 0xf {
muxer.audioCc = 0
}
}
i := byte(0)
//sync byte
muxer.tsPacket[i] = 0x47
i++
//error indicator, unit start indicator,ts priority,pid
muxer.tsPacket[i] = byte(pid >> 8) //pid high 5 bits
if first {
muxer.tsPacket[i] = muxer.tsPacket[i] | 0x40 //unit start indicator
}
i++
//pid low 8 bits
muxer.tsPacket[i] = byte(pid)
i++
//scram control, adaptation control, counter
if p.IsVideo {
muxer.tsPacket[i] = 0x10 | byte(muxer.videoCc&0x0f)
} else {
muxer.tsPacket[i] = 0x10 | byte(muxer.audioCc&0x0f)
}
i++
//关键帧需要加pcr
if first && p.IsVideo && videoH.IsKeyFrame() {
muxer.tsPacket[3] |= 0x20
muxer.tsPacket[i] = 7
i++
muxer.tsPacket[i] = 0x50
i++
muxer.writePcr(muxer.tsPacket[0:], i, dts)
i += 6
}
//frame data
if packetBytesLen >= tsDefaultDataLen {
dataLen = tsDefaultDataLen
if first {
dataLen -= (i - 4)
}
} else {
muxer.tsPacket[3] |= 0x20 //have adaptation
remainBytes := byte(0)
dataLen = byte(packetBytesLen)
if first {
remainBytes = tsDefaultDataLen - dataLen - (i - 4)
} else {
remainBytes = tsDefaultDataLen - dataLen
}
muxer.adaptationBufInit(muxer.tsPacket[i:], byte(remainBytes))
i += remainBytes
}
if first && i < tsPacketLen && pesHeaderLen > 0 {
tmpLen = tsPacketLen - i
if pesHeaderLen <= tmpLen {
tmpLen = pesHeaderLen
}
copy(muxer.tsPacket[i:], pes.data[pesIndex:pesIndex+int(tmpLen)])
i += tmpLen
packetBytesLen -= int(tmpLen)
dataLen -= tmpLen
pesHeaderLen -= tmpLen
pesIndex += int(tmpLen)
}
if i < tsPacketLen {
tmpLen = tsPacketLen - i
if tmpLen <= dataLen {
dataLen = tmpLen
}
copy(muxer.tsPacket[i:], p.Data[wBytes:wBytes+int(dataLen)])
wBytes += int(dataLen)
packetBytesLen -= int(dataLen)
}
if w != nil {
if _, err := w.Write(muxer.tsPacket[0:]); err != nil {
return err
}
}
first = false
}
return nil
}
//PAT return pat data
func (muxer *Muxer) PAT() []byte {
i := 0
remainByte := 0
tsHeader := []byte{0x47, 0x40, 0x00, 0x10, 0x00}
patHeader := []byte{0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00, 0x00, 0x01, 0xf0, 0x01}
if muxer.patCc > 0xf {
muxer.patCc = 0
}
tsHeader[3] |= muxer.patCc & 0x0f
muxer.patCc++
copy(muxer.pat[i:], tsHeader)
i += len(tsHeader)
copy(muxer.pat[i:], patHeader)
i += len(patHeader)
crc32Value := GenCrc32(patHeader)
muxer.pat[i] = byte(crc32Value >> 24)
i++
muxer.pat[i] = byte(crc32Value >> 16)
i++
muxer.pat[i] = byte(crc32Value >> 8)
i++
muxer.pat[i] = byte(crc32Value)
i++
remainByte = int(tsPacketLen - i)
for j := 0; j < remainByte; j++ {
muxer.pat[i+j] = 0xff
}
return muxer.pat[0:]
}
// PMT return pmt data
func (muxer *Muxer) PMT(soundFormat byte, hasVideo bool) []byte {
i := int(0)
j := int(0)
var progInfo []byte
remainBytes := int(0)
tsHeader := []byte{0x47, 0x50, 0x01, 0x10, 0x00}
pmtHeader := []byte{0x02, 0xb0, 0xff, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x00}
if !hasVideo {
pmtHeader[9] = 0x01
progInfo = []byte{0x0f, 0xe1, 0x01, 0xf0, 0x00}
} else {
progInfo = []byte{0x1b, 0xe1, 0x00, 0xf0, 0x00, //h264 or h265*
0x0f, 0xe1, 0x01, 0xf0, 0x00, //mp3 or aac
}
}
pmtHeader[2] = byte(len(progInfo) + 9 + 4)
if muxer.pmtCc > 0xf {
muxer.pmtCc = 0
}
tsHeader[3] |= muxer.pmtCc & 0x0f
muxer.pmtCc++
if soundFormat == 2 ||
soundFormat == 14 {
if hasVideo {
progInfo[5] = 0x4
} else {
progInfo[0] = 0x4
}
}
copy(muxer.pmt[i:], tsHeader)
i += len(tsHeader)
copy(muxer.pmt[i:], pmtHeader)
i += len(pmtHeader)
copy(muxer.pmt[i:], progInfo[0:])
i += len(progInfo)
crc32Value := GenCrc32(muxer.pmt[5 : 5+len(pmtHeader)+len(progInfo)])
muxer.pmt[i] = byte(crc32Value >> 24)
i++
muxer.pmt[i] = byte(crc32Value >> 16)
i++
muxer.pmt[i] = byte(crc32Value >> 8)
i++
muxer.pmt[i] = byte(crc32Value)
i++
remainBytes = int(tsPacketLen - i)
for j = 0; j < remainBytes; j++ {
muxer.pmt[i+j] = 0xff
}
return muxer.pmt[0:]
}
func (muxer *Muxer) adaptationBufInit(src []byte, remainBytes byte) {
src[0] = byte(remainBytes - 1)
if remainBytes == 1 {
} else {
src[1] = 0x00
for i := 2; i < len(src); i++ {
src[i] = 0xff
}
}
return
}
func (muxer *Muxer) writePcr(b []byte, i byte, pcr int64) error {
b[i] = byte(pcr >> 25)
i++
b[i] = byte((pcr >> 17) & 0xff)
i++
b[i] = byte((pcr >> 9) & 0xff)
i++
b[i] = byte((pcr >> 1) & 0xff)
i++
b[i] = byte(((pcr & 0x1) << 7) | 0x7e)
i++
b[i] = 0x00
return nil
}
type pesHeader struct {
len byte
data [tsPacketLen]byte
}
//pesPacket return pes packet
func (header *pesHeader) packet(p *av.Packet, pts, dts int64) error {
//PES header
i := 0
header.data[i] = 0x00
i++
header.data[i] = 0x00
i++
header.data[i] = 0x01
i++
sid := audioSID
if p.IsVideo {
sid = videoSID
}
header.data[i] = byte(sid)
i++
flag := 0x80
ptslen := 5
dtslen := ptslen
headerSize := ptslen
if p.IsVideo && pts != dts {
flag |= 0x40
headerSize += 5 //add dts
}
size := len(p.Data) + headerSize + 3
if size > 0xffff {
size = 0
}
header.data[i] = byte(size >> 8)
i++
header.data[i] = byte(size)
i++
header.data[i] = 0x80
i++
header.data[i] = byte(flag)
i++
header.data[i] = byte(headerSize)
i++
header.writeTs(header.data[0:], i, flag>>6, pts)
i += ptslen
if p.IsVideo && pts != dts {
header.writeTs(header.data[0:], i, 1, dts)
i += dtslen
}
header.len = byte(i)
return nil
}
func (header *pesHeader) writeTs(src []byte, i int, fb int, ts int64) {
val := uint32(0)
if ts > 0x1ffffffff {
ts -= 0x1ffffffff
}
val = uint32(fb<<4) | ((uint32(ts>>30) & 0x07) << 1) | 1
src[i] = byte(val)
i++
val = ((uint32(ts>>15) & 0x7fff) << 1) | 1
src[i] = byte(val >> 8)
i++
src[i] = byte(val)
i++
val = (uint32(ts&0x7fff) << 1) | 1
src[i] = byte(val >> 8)
i++
src[i] = byte(val)
}
scheduler
scheduler 主要是来调度任务,那么主要是哪些任务呢?主要是那些普通 api 无法立即给结果的任务。比如:我们视频网站需要一些视频审核、数据恢复的需求。这时候,我们需要做一些 short delay,用户看不到,但后台还是存在的。这就需要 scheduler 异步处理。还比如有些周期性的任务。在 Scheduler 中,还存在 Timer,定时器主要用来作定时处理 task 的。
所以,我们的架构图:
在本小节中,我们采用 runner 的生产、消费者模式实现。具体代码如下:
package taskrunner
import (
)
type Runner struct {
Controller controlChan
Error controlChan
Data dataChan
dataSize int
longLived bool
Dispatcher fn
Executor fn
}
func NewRunner(size int, longlived bool, d fn, e fn) *Runner {
return &Runner {
Controller: make(chan string, 1),
Error: make(chan string, 1),
Data: make(chan interface{}, size),
longLived: longlived,
dataSize: size,
Dispatcher: d,
Executor: e,
}
}
func (r *Runner) startDispatch() {
defer func() {
if !r.longLived {
close(r.Controller)
close(r.Data)
close(r.Error)
}
}()
for {
select {
case c :=<- r.Controller:
if c == READY_TO_DISPATCH {
err := r.Dispatcher(r.Data)
if err != nil {
r.Error <- CLOSE
} else {
r.Controller <- READY_TO_EXECUTE
}
}
if c == READY_TO_EXECUTE {
err := r.Executor(r.Data)
if err != nil {
r.Error <- CLOSE
} else {
r.Controller <- READY_TO_DISPATCH
}
}
case e :=<- r.Error:
if e == CLOSE {
return
}
default:
}
}
}
func (r *Runner) StartAll() {
r.Controller <- READY_TO_DISPATCH
r.startDispatch()
}
Runner 是可以复用的,而接下来介绍的 Task 是定制 Runner 的。比如:我们延迟删除视频。
我们先拿到数据,看看:
package dbops
import (
"log"
_ "github.com/go-sql-driver/mysql"
)
func ReadVideoDeletionRecord(count int) ([]string, error) {
stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?")
var ids []string
if err != nil {
return ids, err
}
rows, err := stmtOut.Query(count)
if err != nil {
log.Printf("Query VideoDeletionRecord error: %v", err)
return ids, err
}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return ids, err
}
ids = append(ids, id)
}
defer stmtOut.Close()
return ids, nil
}
func DelVideoDeletionRecord(vid string) error {
stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?")
if err != nil {
return err
}
_, err = stmtDel.Exec(vid)
if err != nil {
log.Printf("Deleting VideoDeletionRecord error: %v", err)
return err
}
defer stmtDel.Close()
return nil
}
拿到数据后,需要处理,这时需要 task:
package taskrunner
import (
"os"
"errors"
"log"
"sync"
"github.com/avenssi/video_server/scheduler/dbops"
)
func deleteVideo(vid string) error {
err := os.Remove(VIDEO_PATH + vid)
if err != nil && !os.IsNotExist(err) {
log.Printf("Deleting video error: %v", err)
return err
}
return nil
}
func VideoClearDispatcher(dc dataChan) error {
res, err := dbops.ReadVideoDeletionRecord(3)
if err != nil {
log.Printf("Video clear dispatcher error: %v", err)
return err
}
if len(res) == 0 {
return errors.New("All tasks finished")
}
for _, id := range res {
dc <- id
}
return nil
}
func VideoClearExecutor(dc dataChan) error {
errMap := &sync.Map{}
var err error
forloop:
for {
select {
case vid :=<- dc:
go func(id interface{}) {
if err := deleteVideo(id.(string)); err != nil {
errMap.Store(id, err)
return
}
if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil {
errMap.Store(id, err)
return
}
}(vid)
default:
break forloop
}
}
errMap.Range(func(k, v interface{}) bool {
err = v.(error)
if err != nil {
return false
}
return true
})
return err
}
以上就是关于异步、定时处理视频流信息过程。
stream server
Streaming
Upload files
Streaming 主要区别于普通的链接,它需要保持长链接,与短链接是不一样的,当发送一个 request 过来,会不断与客户端输出数据流,而且会很长。所以在多路长链接同时保持的时候,出现一个问题,如果不断的发起链接、打开网页,最终会把我们的服务给 crash 掉,所以,我们需要进行流控:limit,这里的流控可能只在 connect 时候进行限制。
package main
import (
"log"
)
type ConnLimiter struct {
concurrentConn int
bucket chan int
}
func NewConnLimiter(cc int) *ConnLimiter {
return &ConnLimiter {
concurrentConn: cc,
bucket: make(chan int, cc),
}
}
func (cl *ConnLimiter) GetConn() bool {
if len(cl.bucket) >= cl.concurrentConn {
log.Printf("Reached the rate limitation.")
return false
}
cl.bucket <- 1
return true
}
func (cl *ConnLimiter) ReleaseConn() {
c :=<- cl.bucket
log.Printf("New connction coming: %d", c)
}
加了流控后,我们需要在 http middleware 中嵌入流控,同样,我们在启动时,都需要注册 router 以及 http server,所以代码如下:
package main
import (
"net/http"
"github.com/julienschmidt/httprouter"
)
type middleWareHandler struct {
r *httprouter.Router
l *ConnLimiter
}
func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler {
m := middleWareHandler{}
m.r = r
m.l = NewConnLimiter(cc)
return m
}
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.GET("/videos/:vid-id", streamHandler)
router.POST("/upload/:vid-id", uploadHandler)
router.GET("/testpage", testPageHandler)
return router
}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !m.l.GetConn() {
sendErrorResponse(w, http.StatusTooManyRequests, "Too many requests")
return
}
m.r.ServeHTTP(w, r)
defer m.l.ReleaseConn()
}
func main() {
r := RegisterHandlers()
mh := NewMiddleWareHandler(r, 2)
http.ListenAndServe(":2000", mh)
}
最后,我们来看看 streamHandler 如何来处理:
func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
vid := p.ByName("vid-id")
vl := VIDEO_DIR + vid
video, err := os.Open(vl)
if err != nil {
log.Printf("Error when try to open file: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
w.Header().Set("Content-Type", "video/mp4")
http.ServeContent(w, r, "", time.Now(), video)
defer video.Close()
}
我们这里采用比较通用的做法:在拿到流唯一信息后,直接处理。Upload files 时,我们需要做静态检查,然后把数据从中读取:
func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE)
if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil {
sendErrorResponse(w, http.StatusBadRequest, "File is too big")
return
}
file, _, err := r.FormFile("file")
if err != nil {
log.Printf("Error when try to get file: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
data, err := ioutil.ReadAll(file)
if err != nil {
log.Printf("Read file error: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
}
fn := p.ByName("vid-id")
err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666)
if err != nil {
log.Printf("Write file error: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
w.WriteHeader(http.StatusCreated)
io.WriteString(w, "Uploaded successfully")
}
网站部署
云原生是一个抽象又具体的存在。它不是一个具体的产品,而是一套技术体系和一套方法论,随着围绕着云原生架构的各类开源技术的进一步发展,云原生技术体系必将成为主流,进而影响到每一个技术人员、每一个企业。所以我们跟随潮流,迈入云原生时代,今天主要是使用 k8s 来部署我们的网站服务。
先想给之前的代码进行编译打包:
FROM ubuntu:16.04 as build
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get update && apt-get install -y --no-install-recommends \
g++ \
ca-certificates \
wget && \
rm -rf /var/lib/apt/lists/*
ENV GOLANG_VERSION 1.15.1
RUN wget -nv -O - https://studygolang.com/dl/golang/go1.15.1.linux-amd64.tar.gz \
| tar -C /usr/local -xz
ENV GOPROXY=https://goproxy.cn,direct
ENV GO111MODULE=on
ENV GOPATH /go
ENV PATH $GOPATH/bin:/usr/local/go/bin:$PATH
WORKDIR /go/src
COPY . .
WORKDIR /go/src/video-service
RUN sed -i "/runmode/crunmode=pro" /go/src/video-service/conf/app.conf
RUN export CGO_LDFLAGS_ALLOW='-Wl,--unresolved-symbols=ignore-in-object-files' && \
go install -ldflags="-s -w" -v /go/src/video-service
FROM ubuntu:16.04
WORKDIR /video-service
RUN mkdir -p log
COPY --from=build /go/bin/video-service /video-service
CMD ["./video-service"]
k8s 部署服务的脚本很简单,通过简单的 yml 或 json 格式的数据调用 k8s 本身的 api 服务,即可完成 k8s 对服务的部署。接下来,补充部署脚本:
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
app: video-service
name: video-service
namespace: system-server
spec:
replicas: 1
selector:
matchLabels:
app: video-service
template:
metadata:
labels:
app: video-service
spec:
containers:
- image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}video-service
imagePullPolicy: Always
name: video-service
ports:
- containerPort: 1000
#livenessProbe:
#httpGet:
#path: /api/v1/healthz
#port: 1000
#scheme: HTTP
#initialDelaySeconds: 15
#periodSeconds: 10
#timeoutSeconds: 3
#failureThreshold: 5
volumeMounts:
- name: video-service-config
mountPath: /video-service/conf
volumes:
- name: video-service-config
configMap:
name: video-service-config
nodeSelector:
video-service: "true"
restartPolicy: Always
执行编译打包后,部署脚本:
sh build/build.sh
kubectl create -f deploy.yml
这里使用 K8s 部署,k8s 的主要好处就是,在执行命令后,会自动给我们创建底层容器 pod,并且管理这些容器,我们来看看服务启动的情况:
tess@cm001:~$ kubectl describe po video-service-646ccc4c5c-qrpgp -n system-server
Name: video-service-646ccc4c5c-qrpgp
Namespace: system-server
Priority: 0
Node: cm001/10.11.32.21
Start Time: Wed, 21 Apr 2021 15:38:48 +0800
Labels: app=video-service
pod-template-hash=646ccc4c5c
Annotations: cni.projectcalico.org/podIP: 20.247.87.138/32
Status: Running
IP: 20.247.87.138
Controlled By: ReplicaSet/video-service-646ccc4c5c
Containers:
video-service:
Container ID: docker://f284ceac649f4e1a29ac77cdd425ccc852caf67cc9c133144a7d1c8747e32aea
Image: minicub/video-service
Image ID: docker-pullable://minicub/video-service@sha256:3036927b55c6be9efc4cf34d6ad04aba093e04b49807c18ce0f7a418b2577bf4
Port: 1000/TCP
Host Port: 0/TCP
State: Running
Started: Wed, 28 Apr 2021 16:01:15 +0800
Ready: True
Restart Count: 1
Environment: <none>
Mounts:
/video-service/conf from video-service-config (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-62wgr (ro)
Conditions:
Type Status
Initialized True
Ready True
ContainersReady True
PodScheduled True
Volumes:
video-service-config:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: video-service-config
Optional: false
default-token-62wgr:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-62wgr
Optional: false
QoS Class: BestEffort
Node-Selectors: video-service=true
Tolerations: node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events: <none>
这里使用 K8s 部署到机器上。部署后的服务访问地址:http://10.11.32.21:1000
最后,我们打开网址,访问视频,我们可以看到一些界面效果:
版权声明: 本文为 InfoQ 作者【Lavender】的原创文章。
原文链接:【http://xie.infoq.cn/article/8a14983454bfaa0bf7676d44c】。文章转载请联系作者。
Lavender
还未添加个人签名 2020.04.07 加入
还未添加个人简介
评论