写点什么

使用 Go 语言开发流媒体视频网站

用户头像
Lavender
关注
发布于: 2021 年 05 月 18 日

简介

流媒体如今已经成为工业上的一个重要技术了,比如:直播网站、视频监控传输、APP 直播等,如何实现一个高并发的视频网站,这就涉及到语言技术的选型以及流媒体技术的使用,本节将主要介绍如何使用 Golang 来实现一个流媒体视频网站。

目录

  • 为什么选择 Go 以及 Go 的一些优势

  • GoLang 的简介以及实现一个 webserver 工具链

  • Golang 的 channel 并发模式

  • 用 Golang 完成一个流媒体网站

  • 网站部署

为什么选择 Go 以及 Go 的一些优势

为什么会选择 Go 来开发视频网站呢?这其实主要体现在 Go 语言的优势。那么 Go 有哪些优势呢?

  • 开发效率高,不管用其他语言,都需要很多其他的配置或插件,就连全家桶配套齐全的 Java 语言都会需要一个 Servlet 引擎,如:tomcat、jetty 等。但 Go 在这方面,提供了得天独厚的能力。大部分功能和内容已经集成在了 pkg。包括开发完整的开发工具链(tools、test、benchmark、builtin.etc),包括 Go 命令(go test、go install、go build)。这些都是完整的,直接下载 Go 后即可使用。包括音视频相关的插件、配置,都已经被包含在了 pkg。所以用 go 开发音视频,可以完美诠释 go 语言的优势。

  • 另一方面,部署简单,go 属于编译性语言,而且是能够编译多个平台可执行文件的语言。Compile once,run everywhere,直接编译后生成二进制文件,直接运行。

  • 良好的 native http 库、集成模板引擎,无需添加第三方框架。

GoLang 的简介以及实现一个 webserver 工具链

Go 语言是一种编译性语言,一个开源的编程语言,能够让构造简单、可靠且高效的软件变得容易。而且它的目标是兼具 Python 等动态语言的开发速度和集成 C/C++等编译语言的性能与安全性。它的主要特性是:

简洁、快速、安全、并行、内存管理、数组安全、编译迅速。


Go 中有一些常见的工具链,比如:

  • go build,编译 go 文件,可以跨平台编译:env GOOS=linux GOARCH=amd64 go build,在 CI/CD 中,这是一个非常有用的命令。

  • go install,这也是编译,但与 build 的区别是编译后将输出文件打包成库放在 pkg 下。

  • go get,用于获取 go 的第三方包,常见的是:go get -u git 地址,表示从 git 上获取某个资源并安装到本地。

  • go fmt,统一代码风格、排版,这将使得 go 代码更加易读、易理解。

  • go test,运行当前目录下的 tests,"go test -v" 会打印所有的信息,而"go test"只会打印测试的结果。

  • go 的 test 文件一般以 XXX_test.go 命名,这样,在执行"go test"的时候,程序会自动去执行那些被加了 test 的文件,这是一种约定的方式。

要点:

  • 使用 TestMain 作为初始化 test,并且使用 Run()来调用其它 tests 可以完成一些需要初始化操作的 testing,如:音视频资源数据库、文件加载等,这些有的可能需要被多次使用,但在设计模式中,只会加载一次到内存,这样,可以减少过多的内存占用,同时可以一次性的进行清理。

func TestMain(m *testing.M) {    fmt.Println("Test begin")    m.Run()}
复制代码
  • 如果没在其中加 Run(),除了 TestMain 的其它的 tests 都不被执行。

func TestPrint(t *testing.T) {    fmt.Println("Test print")}
func TestMain(m *testing.M) {    fmt.Println("Test begin")    //m.Run()}
复制代码

按照上面说的,如果没有执行 Run()方法,则 TestPrint 函数不会被执行。

Golang 的 channel 并发模式

在 Go 中,既然有了协程,那么这些协程之间如何通信呢?Go 提供了一个 channel(通道) 来解决。

声明一个 channel

在 Go 语言中,声明一个 channel 非常简单,使用内置的 make 函数即可,如下:

ch:=make(chan string)
复制代码

其中 chan 是一个关键字,表示是 channel 类型。后面的 string 表示 channel 里的数据是 string 类型。通过 channel 的声明也可以看到,chan 是一个集合类型。

定义好 chan 后就可以使用了,一个 chan 的操作只有两种:发送和接收:

  • 发送:向 chan 发送值,把值放在 chan 中,操作符为 chan <-

  • 接收:获取 chan 中的值,操作符为 <- chan

示例:

package main
import "fmt"
func main() {
 ch := make(chan string)
 go func() {
  fmt.Println("La")
  ch <- "发送数据者:La"
 }()
 fmt.Println("I am main goroutine")
 v := <- ch
 fmt.Println("接收到的chan中的值为:",v)
}
复制代码

我们先来执行看看打印结果:

I am main goroutine
La
接收到的chan中的值为:送数据者:La
复制代码

从运行结果可以看出:达到了使用 time.Sleep 函数的效果。

相信应该明白为什么程序不会在新的 goroutine 完成之前退出了,因为通过 make 创建的 chan 中没有值,而 main goroutine 又想从 chan 中获取值,获取不到就一直等待,等到另一个 goroutine 向 chan 发送值为止。

无缓冲 channel

上面的示例中,使用 make 创建的 chan 就是一个无缓冲 channel,它的容量是 0,不能存储任何数据。所以无缓冲 channel 只起到传输数据的作用,数据并不会在 channel 中做任何停留。这也意味着,无缓冲 channel 的发送和接收操作是同时进行的,它也被称为同步 channel。

有缓冲 channel

有缓冲 channel 类似一个可阻塞的队列,内部的元素先进先出。通过 make 函数的第二个参数可以指定 channel 容量的大小,进而创建一个有缓冲 channel,如:

cacheCh := make(chan int,5)
复制代码

定义了一个容量为 5 的元素为 int 类型的 chan。

一个有缓冲 channel 具备以下特点:

  • 有缓冲 channel 的内部有一个缓冲队列

  • 发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行,接收操作释放队列的空间

  • 接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行,发送操作插入新的元素

cache := make(chan int,5)
cache <- 2
cache <- 3
fmt.Println("容量:",cap(cache),",元素个数:",len(cache))
复制代码

无缓冲 channel 其实就是一个容量大小为 0 的 channel。比如 make(chan int,0)

关闭 channel

通过内置函数 close 即可关闭 channel。如果一个 channel 被关闭了,就不能向里面发送数据了,如果发送的话,会引起 painc 异常。但是还可以接收 channel 里的数据,如果 channel 里没有数据的话,接收的数据是元素类型的零值。

单向 channel

所谓单向,即可要不发送,要么只能接收。所以单向 channel 的声明也很简单,只需要在声明的时候带上 <- 操作符即可,如下:

send := make(chan <- int)receive := make(<- chan int)
复制代码

用 Golang 完成一个流媒体网站

业务模块

API 接口设计

  • 分层

  • Restful 风格设计

  • CRUD 区分资源操作

  • 返回码规范

首先,我们写个启动类:

package main 
import ( "net/http" "github.com/julienschmidt/httprouter")
type middleWareHandler struct { r *httprouter.Router}
func NewMiddleWareHandler(r *httprouter.Router) http.Handler { m := middleWareHandler{} m.r = r return m}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { //check session validateUserSession(r)
 m.r.ServeHTTP(w, r)}
func RegisterHandlers() *httprouter.Router { router := httprouter.New()
 router.POST("/user", CreateUser)
 router.POST("/user/:user_name", Login)
 return router}
func main() { r := RegisterHandlers() mh := NewMiddleWareHandler(r) http.ListenAndServe(":1000", mh)}
复制代码

在这里我们实现了注册、登录以及一些初始化监听端口。接下来,我们需要看看对于后端视频处理时,主要关心的是 session:

package session
import ( "time" "sync" "github.com/avenssi/video_server/api/defs" "github.com/avenssi/video_server/api/dbops" "github.com/avenssi/video_server/api/utils")
var sessionMap *sync.Map 
func init() { sessionMap = &sync.Map{}}
func nowInMilli() int64{ return time.Now().UnixNano()/1000000}
func deleteExpiredSession(sid string) { sessionMap.Delete(sid) dbops.DeleteSession(sid)}
func LoadSessionsFromDB() { r, err := dbops.RetrieveAllSessions() if err != nil {  return }
 r.Range(func(k, v interface{}) bool{  ss := v.(*defs.SimpleSession)  sessionMap.Store(k, ss)  return true })}
func GenerateNewSessionId(un string) string { id, _ := utils.NewUUID() ct := nowInMilli() ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min
 ss := &defs.SimpleSession{Username: un, TTL: ttl} sessionMap.Store(id, ss) dbops.InsertSession(id, ttl, un)
 return id}
func IsSessionExpired(sid string) (string, bool) { ss, ok := sessionMap.Load(sid) if ok {  ct := nowInMilli()  if ss.(*defs.SimpleSession).TTL < ct {   deleteExpiredSession(sid)   return "", true  }
  return ss.(*defs.SimpleSession).Username, false }
 return "", true}
复制代码

从上面的代码中,可以看到,Go 主要引用了相关的视频插件库:avenssi/video_server 等来处理缓存 session。这也是为什么选择 go 开发后端的一个原因。

同时,我们还定义了一个错误码信息:

package defs
type Err struct { Error string `json:"error"` ErrorCode string `json:"error_code"`  }
type ErrResponse struct { HttpSC int Error Err}
var ( ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: "Request body is not correct", ErrorCode: "001"}} ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: "User authentication failed.", ErrorCode: "002"}} ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: "DB ops failed", ErrorCode: "003"}} ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: "Internal service error", ErrorCode: "004"}})
复制代码

以上对于业务层中处理主要逻辑就是这些了,下面主要讲推流和播放。

推流

通过 RTMP 协议推送视频流,这里在推流时,需要 cache 缓存,这样可以避免服务端的不断通信导致压力增加而被 crash,因为由于每个进程都是保持长链接,当每个客户端进程不断的发起请求,那么对于服务端的进程是会堵塞的,所以,在传输协议中,大部分音视频这种,流媒体处理方式,是需要加上 cache 来缓冲对于服务器的压力,避免整个服务的瘫痪,导致客户端一直无响应,从而导致客户端进入卡死状态:

type Cache struct {	gop      *GopCache	videoSeq *SpecialCache	audioSeq *SpecialCache	metadata *SpecialCache}
func NewCache() *Cache { return &Cache{ gop: NewGopCache(configure.Config.GetInt("gop_num")), videoSeq: NewSpecialCache(), audioSeq: NewSpecialCache(), metadata: NewSpecialCache(), }}
func (cache *Cache) Write(p av.Packet) { if p.IsMetadata { cache.metadata.Write(&p) return } else { if !p.IsVideo { ah, ok := p.Header.(av.AudioPacketHeader) if ok { if ah.SoundFormat() == av.SOUND_AAC && ah.AACPacketType() == av.AAC_SEQHDR { cache.audioSeq.Write(&p) return } else { return } }
} else { vh, ok := p.Header.(av.VideoPacketHeader) if ok { if vh.IsSeq() { cache.videoSeq.Write(&p) return } } else { return }
} } cache.gop.Write(&p)}
func (cache *Cache) Send(w av.WriteCloser) error { if err := cache.metadata.Send(w); err != nil { return err }
if err := cache.videoSeq.Send(w); err != nil { return err }
if err := cache.audioSeq.Send(w); err != nil { return err }
if err := cache.gop.Send(w); err != nil { return err }
return nil}
复制代码

在推流时,主要还是先进行选择:

const (	maxQueueNum           = 1024	SAVE_STATICS_INTERVAL = 5000)
var ( readTimeout = configure.Config.GetInt("read_timeout") writeTimeout = configure.Config.GetInt("write_timeout"))
type Client struct { handler av.Handler getter av.GetWriter}
func NewRtmpClient(h av.Handler, getter av.GetWriter) *Client { return &Client{ handler: h, getter: getter, }}
func (c *Client) Dial(url string, method string) error { connClient := core.NewConnClient() if err := connClient.Start(url, method); err != nil { return err } if method == av.PUBLISH { writer := NewVirWriter(connClient) log.Debugf("client Dial call NewVirWriter url=%s, method=%s", url, method) c.handler.HandleWriter(writer) } else if method == av.PLAY { reader := NewVirReader(connClient) log.Debugf("client Dial call NewVirReader url=%s, method=%s", url, method) c.handler.HandleReader(reader) if c.getter != nil { writer := c.getter.GetWriter(reader.Info()) c.handler.HandleWriter(writer) } } return nil}
func (c *Client) GetHandle() av.Handler { return c.handler}
type Server struct { handler av.Handler getter av.GetWriter}
func NewRtmpServer(h av.Handler, getter av.GetWriter) *Server { return &Server{ handler: h, getter: getter, }}
func (s *Server) Serve(listener net.Listener) (err error) { defer func() { if r := recover(); r != nil { log.Error("rtmp serve panic: ", r) } }()
for { var netconn net.Conn netconn, err = listener.Accept() if err != nil { return } conn := core.NewConn(netconn, 4*1024) log.Debug("new client, connect remote: ", conn.RemoteAddr().String(), "local:", conn.LocalAddr().String()) go s.handleConn(conn) }}
func (s *Server) handleConn(conn *core.Conn) error { if err := conn.HandshakeServer(); err != nil { conn.Close() log.Error("handleConn HandshakeServer err: ", err) return err } connServer := core.NewConnServer(conn)
if err := connServer.ReadMsg(); err != nil { conn.Close() log.Error("handleConn read msg err: ", err) return err }
appname, name, _ := connServer.GetInfo()
if ret := configure.CheckAppName(appname); !ret { err := fmt.Errorf("application name=%s is not configured", appname) conn.Close() log.Error("CheckAppName err: ", err) return err }
log.Debugf("handleConn: IsPublisher=%v", connServer.IsPublisher()) if connServer.IsPublisher() { if configure.Config.GetBool("rtmp_noauth") { key, err := configure.RoomKeys.GetKey(name) if err != nil { err := fmt.Errorf("Cannot create key err=%s", err.Error()) conn.Close() log.Error("GetKey err: ", err) return err } name = key } channel, err := configure.RoomKeys.GetChannel(name) if err != nil { err := fmt.Errorf("invalid key err=%s", err.Error()) conn.Close() log.Error("CheckKey err: ", err) return err } connServer.PublishInfo.Name = channel if pushlist, ret := configure.GetStaticPushUrlList(appname); ret && (pushlist != nil) { log.Debugf("GetStaticPushUrlList: %v", pushlist) } reader := NewVirReader(connServer) s.handler.HandleReader(reader) log.Debugf("new publisher: %+v", reader.Info())
if s.getter != nil { writeType := reflect.TypeOf(s.getter) log.Debugf("handleConn:writeType=%v", writeType) writer := s.getter.GetWriter(reader.Info()) s.handler.HandleWriter(writer) } if configure.Config.GetBool("flv_archive") { flvWriter := new(flv.FlvDvr) s.handler.HandleWriter(flvWriter.GetWriter(reader.Info())) } } else { writer := NewVirWriter(connServer) log.Debugf("new player: %+v", writer.Info()) s.handler.HandleWriter(writer) }
return nil}
type GetInFo interface { GetInfo() (string, string, string)}
type StreamReadWriteCloser interface { GetInFo Close(error) Write(core.ChunkStream) error Read(c *core.ChunkStream) error}
type StaticsBW struct { StreamId uint32 VideoDatainBytes uint64 LastVideoDatainBytes uint64 VideoSpeedInBytesperMS uint64
AudioDatainBytes uint64 LastAudioDatainBytes uint64 AudioSpeedInBytesperMS uint64
LastTimestamp int64}
type VirWriter struct { Uid string closed bool av.RWBaser conn StreamReadWriteCloser packetQueue chan *av.Packet WriteBWInfo StaticsBW}
func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { ret := &VirWriter{ Uid: uid.NewId(), conn: conn, RWBaser: av.NewRWBaser(time.Second * time.Duration(writeTimeout)), packetQueue: make(chan *av.Packet, maxQueueNum), WriteBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0}, }
go ret.Check() go func() { err := ret.SendPacket() if err != nil { log.Warning(err) } }() return ret}
func (v *VirWriter) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) { nowInMS := int64(time.Now().UnixNano() / 1e6)
v.WriteBWInfo.StreamId = streamid if isVideoFlag { v.WriteBWInfo.VideoDatainBytes = v.WriteBWInfo.VideoDatainBytes + length } else { v.WriteBWInfo.AudioDatainBytes = v.WriteBWInfo.AudioDatainBytes + length }
if v.WriteBWInfo.LastTimestamp == 0 { v.WriteBWInfo.LastTimestamp = nowInMS } else if (nowInMS - v.WriteBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL { diffTimestamp := (nowInMS - v.WriteBWInfo.LastTimestamp) / 1000
v.WriteBWInfo.VideoSpeedInBytesperMS = (v.WriteBWInfo.VideoDatainBytes - v.WriteBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000 v.WriteBWInfo.AudioSpeedInBytesperMS = (v.WriteBWInfo.AudioDatainBytes - v.WriteBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.WriteBWInfo.LastVideoDatainBytes = v.WriteBWInfo.VideoDatainBytes v.WriteBWInfo.LastAudioDatainBytes = v.WriteBWInfo.AudioDatainBytes v.WriteBWInfo.LastTimestamp = nowInMS }}
func (v *VirWriter) Check() { var c core.ChunkStream for { if err := v.conn.Read(&c); err != nil { v.Close(err) return } }}
func (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) { log.Warningf("[%v] packet queue max!!!", info) for i := 0; i < maxQueueNum-84; i++ { tmpPkt, ok := <-pktQue // try to don't drop audio if ok && tmpPkt.IsAudio { if len(pktQue) > maxQueueNum-2 { log.Debug("drop audio pkt") <-pktQue } else { pktQue <- tmpPkt }
}
if ok && tmpPkt.IsVideo { videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader) // dont't drop sps config and dont't drop key frame if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) { pktQue <- tmpPkt } if len(pktQue) > maxQueueNum-10 { log.Debug("drop video pkt") <-pktQue } }
} log.Debug("packet queue len: ", len(pktQue))}
//func (v *VirWriter) Write(p *av.Packet) (err error) { err = nil
if v.closed { err = fmt.Errorf("VirWriter closed") return } defer func() { if e := recover(); e != nil { err = fmt.Errorf("VirWriter has already been closed:%v", e) } }() if len(v.packetQueue) >= maxQueueNum-24 { v.DropPacket(v.packetQueue, v.Info()) } else { v.packetQueue <- p }
return}
func (v *VirWriter) SendPacket() error { Flush := reflect.ValueOf(v.conn).MethodByName("Flush") var cs core.ChunkStream for { p, ok := <-v.packetQueue if ok { cs.Data = p.Data cs.Length = uint32(len(p.Data)) cs.StreamID = p.StreamID cs.Timestamp = p.TimeStamp cs.Timestamp += v.BaseTimeStamp()
if p.IsVideo { cs.TypeID = av.TAG_VIDEO } else { if p.IsMetadata { cs.TypeID = av.TAG_SCRIPTDATAAMF0 } else { cs.TypeID = av.TAG_AUDIO } }
v.SaveStatics(p.StreamID, uint64(cs.Length), p.IsVideo) v.SetPreTime() v.RecTimeStamp(cs.Timestamp, cs.TypeID) err := v.conn.Write(cs) if err != nil { v.closed = true return err } Flush.Call(nil) } else { return fmt.Errorf("closed") }
}}
func (v *VirWriter) Info() (ret av.Info) { ret.UID = v.Uid _, _, URL := v.conn.GetInfo() ret.URL = URL _url, err := url.Parse(URL) if err != nil { log.Warning(err) } ret.Key = strings.TrimLeft(_url.Path, "/") ret.Inter = true return}
func (v *VirWriter) Close(err error) { log.Warning("player ", v.Info(), "closed: "+err.Error()) if !v.closed { close(v.packetQueue) } v.closed = true v.conn.Close(err)}
type VirReader struct { Uid string av.RWBaser demuxer *flv.Demuxer conn StreamReadWriteCloser ReadBWInfo StaticsBW}
func NewVirReader(conn StreamReadWriteCloser) *VirReader { return &VirReader{ Uid: uid.NewId(), conn: conn, RWBaser: av.NewRWBaser(time.Second * time.Duration(writeTimeout)), demuxer: flv.NewDemuxer(), ReadBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0}, }}
func (v *VirReader) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) { nowInMS := int64(time.Now().UnixNano() / 1e6)
v.ReadBWInfo.StreamId = streamid if isVideoFlag { v.ReadBWInfo.VideoDatainBytes = v.ReadBWInfo.VideoDatainBytes + length } else { v.ReadBWInfo.AudioDatainBytes = v.ReadBWInfo.AudioDatainBytes + length }
if v.ReadBWInfo.LastTimestamp == 0 { v.ReadBWInfo.LastTimestamp = nowInMS } else if (nowInMS - v.ReadBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL { diffTimestamp := (nowInMS - v.ReadBWInfo.LastTimestamp) / 1000
//log.Printf("now=%d, last=%d, diff=%d", nowInMS, v.ReadBWInfo.LastTimestamp, diffTimestamp) v.ReadBWInfo.VideoSpeedInBytesperMS = (v.ReadBWInfo.VideoDatainBytes - v.ReadBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000 v.ReadBWInfo.AudioSpeedInBytesperMS = (v.ReadBWInfo.AudioDatainBytes - v.ReadBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.ReadBWInfo.LastVideoDatainBytes = v.ReadBWInfo.VideoDatainBytes v.ReadBWInfo.LastAudioDatainBytes = v.ReadBWInfo.AudioDatainBytes v.ReadBWInfo.LastTimestamp = nowInMS }}
func (v *VirReader) Read(p *av.Packet) (err error) { defer func() { if r := recover(); r != nil { log.Warning("rtmp read packet panic: ", r) } }()
v.SetPreTime() var cs core.ChunkStream for { err = v.conn.Read(&cs) if err != nil { return err } if cs.TypeID == av.TAG_AUDIO || cs.TypeID == av.TAG_VIDEO || cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3 { break } }
p.IsAudio = cs.TypeID == av.TAG_AUDIO p.IsVideo = cs.TypeID == av.TAG_VIDEO p.IsMetadata = cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3 p.StreamID = cs.StreamID p.Data = cs.Data p.TimeStamp = cs.Timestamp
v.SaveStatics(p.StreamID, uint64(len(p.Data)), p.IsVideo) v.demuxer.DemuxH(p) return err}
func (v *VirReader) Info() (ret av.Info) { ret.UID = v.Uid _, _, URL := v.conn.GetInfo() ret.URL = URL _url, err := url.Parse(URL) if err != nil { log.Warning(err) } ret.Key = strings.TrimLeft(_url.Path, "/") return}
func (v *VirReader) Close(err error) { log.Debug("publisher ", v.Info(), "closed: "+err.Error()) v.conn.Close(err)}
复制代码

播放

视频流媒体播放,支持多种协议:rtmp、flv、hls,我们先看看 rtmp:

var (	STOP_CTRL = "RTMPRELAY_STOP")
type RtmpRelay struct { PlayUrl string PublishUrl string cs_chan chan core.ChunkStream sndctrl_chan chan string connectPlayClient *core.ConnClient connectPublishClient *core.ConnClient startflag bool}
func NewRtmpRelay(playurl *string, publishurl *string) *RtmpRelay { return &RtmpRelay{ PlayUrl: *playurl, PublishUrl: *publishurl, cs_chan: make(chan core.ChunkStream, 500), sndctrl_chan: make(chan string), connectPlayClient: nil, connectPublishClient: nil, startflag: false, }}
func (self *RtmpRelay) rcvPlayChunkStream() { log.Debug("rcvPlayRtmpMediaPacket connectClient.Read...") for { var rc core.ChunkStream
if self.startflag == false { self.connectPlayClient.Close(nil) log.Debugf("rcvPlayChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) break } err := self.connectPlayClient.Read(&rc)
if err != nil && err == io.EOF { break } //log.Debugf("connectPlayClient.Read return rc.TypeID=%v length=%d, err=%v", rc.TypeID, len(rc.Data), err) switch rc.TypeID { case 20, 17: r := bytes.NewReader(rc.Data) vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)
log.Debugf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err) case 18: log.Debug("rcvPlayRtmpMediaPacket: metadata....") case 8, 9: self.cs_chan <- rc } }}
func (self *RtmpRelay) sendPublishChunkStream() { for { select { case rc := <-self.cs_chan: //log.Debugf("sendPublishChunkStream: rc.TypeID=%v length=%d", rc.TypeID, len(rc.Data)) self.connectPublishClient.Write(rc) case ctrlcmd := <-self.sndctrl_chan: if ctrlcmd == STOP_CTRL { self.connectPublishClient.Close(nil) log.Debugf("sendPublishChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) return } } }}
func (self *RtmpRelay) Start() error { if self.startflag { return fmt.Errorf("The rtmprelay already started, playurl=%s, publishurl=%s\n", self.PlayUrl, self.PublishUrl) }
self.connectPlayClient = core.NewConnClient() self.connectPublishClient = core.NewConnClient()
log.Debugf("play server addr:%v starting....", self.PlayUrl) err := self.connectPlayClient.Start(self.PlayUrl, av.PLAY) if err != nil { log.Debugf("connectPlayClient.Start url=%v error", self.PlayUrl) return err }
log.Debugf("publish server addr:%v starting....", self.PublishUrl) err = self.connectPublishClient.Start(self.PublishUrl, av.PUBLISH) if err != nil { log.Debugf("connectPublishClient.Start url=%v error", self.PublishUrl) self.connectPlayClient.Close(nil) return err }
self.startflag = true go self.rcvPlayChunkStream() go self.sendPublishChunkStream()
return nil}
func (self *RtmpRelay) Stop() { if !self.startflag { log.Debugf("The rtmprelay already stoped, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) return }
self.startflag = false self.sndctrl_chan <- STOP_CTRL}
复制代码

rtmp 协议中,对于流可以进行 chan,这样对于服务端的 cpu 利用率可以很好地提升,同时,避免了 cpu 利用不上去,而 memory 的溢出,最终会导致服务被 crash。我们接下来再看看 hls 协议,hls 协议其实也是基于 http 协议的一种自适应码率的传输协议,我们可以先看看其对缓存的 flush:

const (	videoHZ      = 90000	aacSampleLen = 1024	maxQueueNum  = 512
h264_default_hz uint64 = 90)
type Source struct { av.RWBaser seq int info av.Info bwriter *bytes.Buffer btswriter *bytes.Buffer demuxer *flv.Demuxer muxer *ts.Muxer pts, dts uint64 stat *status align *align cache *audioCache tsCache *TSCacheItem tsparser *parser.CodecParser closed bool packetQueue chan *av.Packet}
func NewSource(info av.Info) *Source { info.Inter = true s := &Source{ info: info, align: &align{}, stat: newStatus(), RWBaser: av.NewRWBaser(time.Second * 10), cache: newAudioCache(), demuxer: flv.NewDemuxer(), muxer: ts.NewMuxer(), tsCache: NewTSCacheItem(info.Key), tsparser: parser.NewCodecParser(), bwriter: bytes.NewBuffer(make([]byte, 100*1024)), packetQueue: make(chan *av.Packet, maxQueueNum), } go func() { err := s.SendPacket() if err != nil { log.Debug("send packet error: ", err) s.closed = true } }() return s}
func (source *Source) GetCacheInc() *TSCacheItem { return source.tsCache}
func (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) { log.Warningf("[%v] packet queue max!!!", info) for i := 0; i < maxQueueNum-84; i++ { tmpPkt, ok := <-pktQue // try to don't drop audio if ok && tmpPkt.IsAudio { if len(pktQue) > maxQueueNum-2 { <-pktQue } else { pktQue <- tmpPkt } }
if ok && tmpPkt.IsVideo { videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader) // dont't drop sps config and dont't drop key frame if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) { pktQue <- tmpPkt } if len(pktQue) > maxQueueNum-10 { <-pktQue } }
} log.Debug("packet queue len: ", len(pktQue))}
func (source *Source) Write(p *av.Packet) (err error) { err = nil if source.closed { err = fmt.Errorf("hls source closed") return } source.SetPreTime() defer func() { if e := recover(); e != nil { err = fmt.Errorf("hls source has already been closed:%v", e) } }() if len(source.packetQueue) >= maxQueueNum-24 { source.DropPacket(source.packetQueue, source.info) } else { if !source.closed { source.packetQueue <- p } } return}
func (source *Source) SendPacket() error { defer func() { log.Debugf("[%v] hls sender stop", source.info) if r := recover(); r != nil { log.Warning("hls SendPacket panic: ", r) } }()
log.Debugf("[%v] hls sender start", source.info) for { if source.closed { return fmt.Errorf("closed") }
p, ok := <-source.packetQueue if ok { if p.IsMetadata { continue }
err := source.demuxer.Demux(p) if err == flv.ErrAvcEndSEQ { log.Warning(err) continue } else { if err != nil { log.Warning(err) return err } } compositionTime, isSeq, err := source.parse(p) if err != nil { log.Warning(err) } if err != nil || isSeq { continue } if source.btswriter != nil { source.stat.update(p.IsVideo, p.TimeStamp) source.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime)) source.tsMux(p) } } else { return fmt.Errorf("closed") } }}
func (source *Source) Info() (ret av.Info) { return source.info}
func (source *Source) cleanup() { close(source.packetQueue) source.bwriter = nil source.btswriter = nil source.cache = nil source.tsCache = nil}
func (source *Source) Close(err error) { log.Debug("hls source closed: ", source.info) if !source.closed && !configure.Config.GetBool("hls_keep_after_end") { source.cleanup() } source.closed = true}
func (source *Source) cut() { newf := true if source.btswriter == nil { source.btswriter = bytes.NewBuffer(nil) } else if source.btswriter != nil && source.stat.durationMs() >= duration { source.flushAudio()
source.seq++ filename := fmt.Sprintf("/%s/%d.ts", source.info.Key, time.Now().Unix()) item := NewTSItem(filename, int(source.stat.durationMs()), source.seq, source.btswriter.Bytes()) source.tsCache.SetItem(filename, item)
source.btswriter.Reset() source.stat.resetAndNew() } else { newf = false } if newf { source.btswriter.Write(source.muxer.PAT()) source.btswriter.Write(source.muxer.PMT(av.SOUND_AAC, true)) }}
func (source *Source) parse(p *av.Packet) (int32, bool, error) { var compositionTime int32 var ah av.AudioPacketHeader var vh av.VideoPacketHeader if p.IsVideo { vh = p.Header.(av.VideoPacketHeader) if vh.CodecID() != av.VIDEO_H264 { return compositionTime, false, ErrNoSupportVideoCodec } compositionTime = vh.CompositionTime() if vh.IsKeyFrame() && vh.IsSeq() { return compositionTime, true, source.tsparser.Parse(p, source.bwriter) } } else { ah = p.Header.(av.AudioPacketHeader) if ah.SoundFormat() != av.SOUND_AAC { return compositionTime, false, ErrNoSupportAudioCodec } if ah.AACPacketType() == av.AAC_SEQHDR { return compositionTime, true, source.tsparser.Parse(p, source.bwriter) } } source.bwriter.Reset() if err := source.tsparser.Parse(p, source.bwriter); err != nil { return compositionTime, false, err } p.Data = source.bwriter.Bytes()
if p.IsVideo && vh.IsKeyFrame() { source.cut() } return compositionTime, false, nil}
func (source *Source) calcPtsDts(isVideo bool, ts, compositionTs uint32) { source.dts = uint64(ts) * h264_default_hz if isVideo { source.pts = source.dts + uint64(compositionTs)*h264_default_hz } else { sampleRate, _ := source.tsparser.SampleRate() source.align.align(&source.dts, uint32(videoHZ*aacSampleLen/sampleRate)) source.pts = source.dts }}func (source *Source) flushAudio() error { return source.muxAudio(1)}
func (source *Source) muxAudio(limit byte) error { if source.cache.CacheNum() < limit { return nil } var p av.Packet _, pts, buf := source.cache.GetFrame() p.Data = buf p.TimeStamp = uint32(pts / h264_default_hz) return source.muxer.Mux(&p, source.btswriter)}
func (source *Source) tsMux(p *av.Packet) error { if p.IsVideo { return source.muxer.Mux(p, source.btswriter) } else { source.cache.Cache(p.Data, source.pts) return source.muxAudio(cache_max_frames) }}
复制代码

对于 hls 本身的流传输,还是遵循 http 协议:

const (	duration = 3000)
var ( ErrNoPublisher = fmt.Errorf("no publisher") ErrInvalidReq = fmt.Errorf("invalid req url path") ErrNoSupportVideoCodec = fmt.Errorf("no support video codec") ErrNoSupportAudioCodec = fmt.Errorf("no support audio codec"))
var crossdomainxml = []byte(`<?xml version="1.0" ?><cross-domain-policy> <allow-access-from domain="*" /> <allow-http-request-headers-from domain="*" headers="*"/></cross-domain-policy>`)
type Server struct { listener net.Listener conns *sync.Map}
func NewServer() *Server { ret := &Server{ conns: &sync.Map{}, } go ret.checkStop() return ret}
func (server *Server) Serve(listener net.Listener) error { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { server.handle(w, r) }) server.listener = listener http.Serve(listener, mux) return nil}
func (server *Server) GetWriter(info av.Info) av.WriteCloser { var s *Source v, ok := server.conns.Load(info.Key) if !ok { log.Debug("new hls source") s = NewSource(info) server.conns.Store(info.Key, s) } else { s = v.(*Source) } return s}
func (server *Server) getConn(key string) *Source { v, ok := server.conns.Load(key) if !ok { return nil } return v.(*Source)}
func (server *Server) checkStop() { for { <-time.After(5 * time.Second)
server.conns.Range(func(key, val interface{}) bool { v := val.(*Source) if !v.Alive() && !configure.Config.GetBool("hls_keep_after_end") { log.Debug("check stop and remove: ", v.Info()) server.conns.Delete(key) } return true }) }}
func (server *Server) handle(w http.ResponseWriter, r *http.Request) { if path.Base(r.URL.Path) == "crossdomain.xml" { w.Header().Set("Content-Type", "application/xml") w.Write(crossdomainxml) return } switch path.Ext(r.URL.Path) { case ".m3u8": key, _ := server.parseM3u8(r.URL.Path) conn := server.getConn(key) if conn == nil { http.Error(w, ErrNoPublisher.Error(), http.StatusForbidden) return } tsCache := conn.GetCacheInc() if tsCache == nil { http.Error(w, ErrNoPublisher.Error(), http.StatusForbidden) return } body, err := tsCache.GenM3U8PlayList() if err != nil { log.Debug("GenM3U8PlayList error: ", err) http.Error(w, err.Error(), http.StatusBadRequest) return }
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Content-Type", "application/x-mpegURL") w.Header().Set("Content-Length", strconv.Itoa(len(body))) w.Write(body) case ".ts": key, _ := server.parseTs(r.URL.Path) conn := server.getConn(key) if conn == nil { http.Error(w, ErrNoPublisher.Error(), http.StatusForbidden) return } tsCache := conn.GetCacheInc() item, err := tsCache.GetItem(r.URL.Path) if err != nil { log.Debug("GetItem error: ", err) http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Content-Type", "video/mp2ts") w.Header().Set("Content-Length", strconv.Itoa(len(item.Data))) w.Write(item.Data) }}
func (server *Server) parseM3u8(pathstr string) (key string, err error) { pathstr = strings.TrimLeft(pathstr, "/") key = strings.Split(pathstr, path.Ext(pathstr))[0] return}
func (server *Server) parseTs(pathstr string) (key string, err error) { pathstr = strings.TrimLeft(pathstr, "/") paths := strings.SplitN(pathstr, "/", 3) if len(paths) != 3 { err = fmt.Errorf("invalid path=%s", pathstr) return } key = paths[0] + "/" + paths[1]
return}
复制代码

最后来看看 http-flv 协议,它其实是一种结合了 RTMP 的低延时,以及可以复用现有 HTTP 分发资源的流式协议。

FLV 文件头和文件体,还有其 AudioTag、VideoTag,其本身透露的特点是:

  • 可以在一定程度上避免防火墙的干扰

  • 可以使用 HTTPS 做加密通道

  • 可以很好的兼容 HTTP 302 跳转,做到灵活调度

  • 最后能很好的支持 ios、安卓

func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *streams {	rtmpStream := server.handler.(*rtmp.RtmpStream)	if rtmpStream == nil {		return nil	}	msgs := new(streams)
rtmpStream.GetStreams().Range(func(key, val interface{}) bool { if s, ok := val.(*rtmp.Stream); ok { if s.GetReader() != nil { msg := stream{key.(string), s.GetReader().Info().UID} msgs.Publishers = append(msgs.Publishers, msg) } } return true })
rtmpStream.GetStreams().Range(func(key, val interface{}) bool { ws := val.(*rtmp.Stream).GetWs()
ws.Range(func(k, v interface{}) bool { if pw, ok := v.(*rtmp.PackWriterCloser); ok { if pw.GetWriter() != nil { msg := stream{key.(string), pw.GetWriter().Info().UID} msgs.Players = append(msgs.Players, msg) } } return true }) return true })
return msgs}
func (server *Server) getStream(w http.ResponseWriter, r *http.Request) { msgs := server.getStreams(w, r) if msgs == nil { return } resp, _ := json.Marshal(msgs) w.Header().Set("Content-Type", "application/json") w.Write(resp)}
func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) { defer func() { if r := recover(); r != nil { log.Error("http flv handleConn panic: ", r) } }()
url := r.URL.String() u := r.URL.Path if pos := strings.LastIndex(u, "."); pos < 0 || u[pos:] != ".flv" { http.Error(w, "invalid path", http.StatusBadRequest) return } path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv") paths := strings.SplitN(path, "/", 2) log.Debug("url:", u, "path:", path, "paths:", paths)
if len(paths) != 2 { http.Error(w, "invalid path", http.StatusBadRequest) return }
// 判断流是否发布,如果没有发布,直接返回404 msgs := server.getStreams(w, r) if msgs == nil || len(msgs.Publishers) == 0 { http.Error(w, "invalid path", http.StatusNotFound) return } else { include := false for _, item := range msgs.Publishers { if item.Key == path { include = true break } } if include == false { http.Error(w, "invalid path", http.StatusNotFound) return } }
w.Header().Set("Access-Control-Allow-Origin", "*") writer := NewFLVWriter(paths[0], paths[1], url, w)
server.handler.HandleWriter(writer) writer.Wait()}
复制代码

流媒体视频容器格式

所谓容器,就是把编码器生成的多媒体内容(视频,音频,字幕,章节信息等)混合封装在一起的标准。容器使得不同多媒体内容同步播放变得很简单。

视频格式是视频播放软件为了能够播放视频文件而赋予视频文件的一种识别符号。简之,视频格式规定了和播放器的通信协议。


对于容器格式,我们支持 flv、ts 格式,flv 格式是一种比较常见的格式,比如:爱奇艺、短视频等。当然,flv 格式相对而言还是比较简单的,主要是由两部分组成:FLV header、FLV body。所以 header、body 都需要进行 parse:

const (	headerLen = 11)
type FLVWriter struct { Uid string av.RWBaser app, title, url string buf []byte closed chan struct{} ctx *os.File closedWriter bool}
func NewFLVWriter(app, title, url string, ctx *os.File) *FLVWriter { ret := &FLVWriter{ Uid: uid.NewId(), app: app, title: title, url: url, ctx: ctx, RWBaser: av.NewRWBaser(time.Second * 10), closed: make(chan struct{}), buf: make([]byte, headerLen), }
ret.ctx.Write(flvHeader) pio.PutI32BE(ret.buf[:4], 0) ret.ctx.Write(ret.buf[:4])
return ret}
func (writer *FLVWriter) Write(p *av.Packet) error { writer.RWBaser.SetPreTime() h := writer.buf[:headerLen] typeID := av.TAG_VIDEO if !p.IsVideo { if p.IsMetadata { var err error typeID = av.TAG_SCRIPTDATAAMF0 p.Data, err = amf.MetaDataReform(p.Data, amf.DEL) if err != nil { return err } } else { typeID = av.TAG_AUDIO } } dataLen := len(p.Data) timestamp := p.TimeStamp timestamp += writer.BaseTimeStamp() writer.RWBaser.RecTimeStamp(timestamp, uint32(typeID))
preDataLen := dataLen + headerLen timestampbase := timestamp & 0xffffff timestampExt := timestamp >> 24 & 0xff
pio.PutU8(h[0:1], uint8(typeID)) pio.PutI24BE(h[1:4], int32(dataLen)) pio.PutI24BE(h[4:7], int32(timestampbase)) pio.PutU8(h[7:8], uint8(timestampExt))
if _, err := writer.ctx.Write(h); err != nil { return err }
if _, err := writer.ctx.Write(p.Data); err != nil { return err }
pio.PutI32BE(h[:4], int32(preDataLen)) if _, err := writer.ctx.Write(h[:4]); err != nil { return err }
return nil}
func (writer *FLVWriter) Wait() { select { case <-writer.closed: return }}
func (writer *FLVWriter) Close(error) { if writer.closedWriter { return } writer.closedWriter = true writer.ctx.Close() close(writer.closed)}
func (writer *FLVWriter) Info() (ret av.Info) { ret.UID = writer.Uid ret.URL = writer.url ret.Key = writer.app + "/" + writer.title return}
type FlvDvr struct{}
func (f *FlvDvr) GetWriter(info av.Info) av.WriteCloser { paths := strings.SplitN(info.Key, "/", 2) if len(paths) != 2 { log.Warning("invalid info") return nil }
flvDir := configure.Config.GetString("flv_dir")
err := os.MkdirAll(path.Join(flvDir, paths[0]), 0755) if err != nil { log.Error("mkdir error: ", err) return nil }
fileName := fmt.Sprintf("%s_%d.%s", path.Join(flvDir, info.Key), time.Now().Unix(), "flv") log.Debug("flv dvr save stream to: ", fileName) w, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0755) if err != nil { log.Error("open file error: ", err) return nil }
writer := NewFLVWriter(paths[0], paths[1], info.URL, w) log.Debug("new flv dvr: ", writer.Info()) return writer}
复制代码

每个 FLV tag 包含音频、视频、脚本、可选的加密元数据以及负载等信息,其实说的就是 tag 的三种类型:音频流、视频流、脚本流,那么对于其 parse,我们来看看:

type Tag struct {	flvt   flvTag	mediat mediaTag}
func (tag *Tag) SoundFormat() uint8 { return tag.mediat.soundFormat}
func (tag *Tag) AACPacketType() uint8 { return tag.mediat.aacPacketType}
func (tag *Tag) IsKeyFrame() bool { return tag.mediat.frameType == av.FRAME_KEY}
func (tag *Tag) IsSeq() bool { return tag.mediat.frameType == av.FRAME_KEY && tag.mediat.avcPacketType == av.AVC_SEQHDR}
func (tag *Tag) CodecID() uint8 { return tag.mediat.codecID}
func (tag *Tag) CompositionTime() int32 { return tag.mediat.compositionTime}
// ParseMediaTagHeader, parse video, audio, tag headerfunc (tag *Tag) ParseMediaTagHeader(b []byte, isVideo bool) (n int, err error) { switch isVideo { case false: n, err = tag.parseAudioHeader(b) case true: n, err = tag.parseVideoHeader(b) } return}
func (tag *Tag) parseAudioHeader(b []byte) (n int, err error) { if len(b) < n+1 { err = fmt.Errorf("invalid audiodata len=%d", len(b)) return } flags := b[0] tag.mediat.soundFormat = flags >> 4 tag.mediat.soundRate = (flags >> 2) & 0x3 tag.mediat.soundSize = (flags >> 1) & 0x1 tag.mediat.soundType = flags & 0x1 n++ switch tag.mediat.soundFormat { case av.SOUND_AAC: tag.mediat.aacPacketType = b[1] n++ } return}
func (tag *Tag) parseVideoHeader(b []byte) (n int, err error) { if len(b) < n+5 { err = fmt.Errorf("invalid videodata len=%d", len(b)) return } flags := b[0] tag.mediat.frameType = flags >> 4 tag.mediat.codecID = flags & 0xf n++ if tag.mediat.frameType == av.FRAME_INTER || tag.mediat.frameType == av.FRAME_KEY { tag.mediat.avcPacketType = b[1] for i := 2; i < 5; i++ { tag.mediat.compositionTime = tag.mediat.compositionTime<<8 + int32(b[i]) } n += 4 } return}
复制代码

大家知道在数据传输、数据存储时,为了保证数据的正确性,需要采用检错的手段来处理,crc 在很多检错手段中是最出名的一种,其检错能力极强,开销小,易于用编码器及检测电路实现。所以这里对于 ts 格式的多路调制,采用了 crc32 法校验:

func (muxer *Muxer) Mux(p *av.Packet, w io.Writer) error {	first := true	wBytes := 0	pesIndex := 0	tmpLen := byte(0)	dataLen := byte(0)
var pes pesHeader dts := int64(p.TimeStamp) * int64(h264DefaultHZ) pts := dts pid := audioPID var videoH av.VideoPacketHeader if p.IsVideo { pid = videoPID videoH, _ = p.Header.(av.VideoPacketHeader) pts = dts + int64(videoH.CompositionTime())*int64(h264DefaultHZ) } err := pes.packet(p, pts, dts) if err != nil { return err } pesHeaderLen := pes.len packetBytesLen := len(p.Data) + int(pesHeaderLen)
for { if packetBytesLen <= 0 { break } if p.IsVideo { muxer.videoCc++ if muxer.videoCc > 0xf { muxer.videoCc = 0 } } else { muxer.audioCc++ if muxer.audioCc > 0xf { muxer.audioCc = 0 } }
i := byte(0)
//sync byte muxer.tsPacket[i] = 0x47 i++
//error indicator, unit start indicator,ts priority,pid muxer.tsPacket[i] = byte(pid >> 8) //pid high 5 bits if first { muxer.tsPacket[i] = muxer.tsPacket[i] | 0x40 //unit start indicator } i++
//pid low 8 bits muxer.tsPacket[i] = byte(pid) i++
//scram control, adaptation control, counter if p.IsVideo { muxer.tsPacket[i] = 0x10 | byte(muxer.videoCc&0x0f) } else { muxer.tsPacket[i] = 0x10 | byte(muxer.audioCc&0x0f) } i++
//关键帧需要加pcr if first && p.IsVideo && videoH.IsKeyFrame() { muxer.tsPacket[3] |= 0x20 muxer.tsPacket[i] = 7 i++ muxer.tsPacket[i] = 0x50 i++ muxer.writePcr(muxer.tsPacket[0:], i, dts) i += 6 }
//frame data if packetBytesLen >= tsDefaultDataLen { dataLen = tsDefaultDataLen if first { dataLen -= (i - 4) } } else { muxer.tsPacket[3] |= 0x20 //have adaptation remainBytes := byte(0) dataLen = byte(packetBytesLen) if first { remainBytes = tsDefaultDataLen - dataLen - (i - 4) } else { remainBytes = tsDefaultDataLen - dataLen } muxer.adaptationBufInit(muxer.tsPacket[i:], byte(remainBytes)) i += remainBytes } if first && i < tsPacketLen && pesHeaderLen > 0 { tmpLen = tsPacketLen - i if pesHeaderLen <= tmpLen { tmpLen = pesHeaderLen } copy(muxer.tsPacket[i:], pes.data[pesIndex:pesIndex+int(tmpLen)]) i += tmpLen packetBytesLen -= int(tmpLen) dataLen -= tmpLen pesHeaderLen -= tmpLen pesIndex += int(tmpLen) }
if i < tsPacketLen { tmpLen = tsPacketLen - i if tmpLen <= dataLen { dataLen = tmpLen } copy(muxer.tsPacket[i:], p.Data[wBytes:wBytes+int(dataLen)]) wBytes += int(dataLen) packetBytesLen -= int(dataLen) } if w != nil { if _, err := w.Write(muxer.tsPacket[0:]); err != nil { return err } } first = false }
return nil}
//PAT return pat datafunc (muxer *Muxer) PAT() []byte { i := 0 remainByte := 0 tsHeader := []byte{0x47, 0x40, 0x00, 0x10, 0x00} patHeader := []byte{0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00, 0x00, 0x01, 0xf0, 0x01}
if muxer.patCc > 0xf { muxer.patCc = 0 } tsHeader[3] |= muxer.patCc & 0x0f muxer.patCc++
copy(muxer.pat[i:], tsHeader) i += len(tsHeader)
copy(muxer.pat[i:], patHeader) i += len(patHeader)
crc32Value := GenCrc32(patHeader) muxer.pat[i] = byte(crc32Value >> 24) i++ muxer.pat[i] = byte(crc32Value >> 16) i++ muxer.pat[i] = byte(crc32Value >> 8) i++ muxer.pat[i] = byte(crc32Value) i++
remainByte = int(tsPacketLen - i) for j := 0; j < remainByte; j++ { muxer.pat[i+j] = 0xff }
return muxer.pat[0:]}
// PMT return pmt datafunc (muxer *Muxer) PMT(soundFormat byte, hasVideo bool) []byte { i := int(0) j := int(0) var progInfo []byte remainBytes := int(0) tsHeader := []byte{0x47, 0x50, 0x01, 0x10, 0x00} pmtHeader := []byte{0x02, 0xb0, 0xff, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x00} if !hasVideo { pmtHeader[9] = 0x01 progInfo = []byte{0x0f, 0xe1, 0x01, 0xf0, 0x00} } else { progInfo = []byte{0x1b, 0xe1, 0x00, 0xf0, 0x00, //h264 or h265* 0x0f, 0xe1, 0x01, 0xf0, 0x00, //mp3 or aac } } pmtHeader[2] = byte(len(progInfo) + 9 + 4)
if muxer.pmtCc > 0xf { muxer.pmtCc = 0 } tsHeader[3] |= muxer.pmtCc & 0x0f muxer.pmtCc++
if soundFormat == 2 || soundFormat == 14 { if hasVideo { progInfo[5] = 0x4 } else { progInfo[0] = 0x4 } }
copy(muxer.pmt[i:], tsHeader) i += len(tsHeader)
copy(muxer.pmt[i:], pmtHeader) i += len(pmtHeader)
copy(muxer.pmt[i:], progInfo[0:]) i += len(progInfo)
crc32Value := GenCrc32(muxer.pmt[5 : 5+len(pmtHeader)+len(progInfo)]) muxer.pmt[i] = byte(crc32Value >> 24) i++ muxer.pmt[i] = byte(crc32Value >> 16) i++ muxer.pmt[i] = byte(crc32Value >> 8) i++ muxer.pmt[i] = byte(crc32Value) i++
remainBytes = int(tsPacketLen - i) for j = 0; j < remainBytes; j++ { muxer.pmt[i+j] = 0xff }
return muxer.pmt[0:]}
func (muxer *Muxer) adaptationBufInit(src []byte, remainBytes byte) { src[0] = byte(remainBytes - 1) if remainBytes == 1 { } else { src[1] = 0x00 for i := 2; i < len(src); i++ { src[i] = 0xff } } return}
func (muxer *Muxer) writePcr(b []byte, i byte, pcr int64) error { b[i] = byte(pcr >> 25) i++ b[i] = byte((pcr >> 17) & 0xff) i++ b[i] = byte((pcr >> 9) & 0xff) i++ b[i] = byte((pcr >> 1) & 0xff) i++ b[i] = byte(((pcr & 0x1) << 7) | 0x7e) i++ b[i] = 0x00
return nil}
type pesHeader struct { len byte data [tsPacketLen]byte}
//pesPacket return pes packetfunc (header *pesHeader) packet(p *av.Packet, pts, dts int64) error { //PES header i := 0 header.data[i] = 0x00 i++ header.data[i] = 0x00 i++ header.data[i] = 0x01 i++
sid := audioSID if p.IsVideo { sid = videoSID } header.data[i] = byte(sid) i++
flag := 0x80 ptslen := 5 dtslen := ptslen headerSize := ptslen if p.IsVideo && pts != dts { flag |= 0x40 headerSize += 5 //add dts } size := len(p.Data) + headerSize + 3 if size > 0xffff { size = 0 } header.data[i] = byte(size >> 8) i++ header.data[i] = byte(size) i++
header.data[i] = 0x80 i++ header.data[i] = byte(flag) i++ header.data[i] = byte(headerSize) i++
header.writeTs(header.data[0:], i, flag>>6, pts) i += ptslen if p.IsVideo && pts != dts { header.writeTs(header.data[0:], i, 1, dts) i += dtslen }
header.len = byte(i)
return nil}
func (header *pesHeader) writeTs(src []byte, i int, fb int, ts int64) { val := uint32(0) if ts > 0x1ffffffff { ts -= 0x1ffffffff } val = uint32(fb<<4) | ((uint32(ts>>30) & 0x07) << 1) | 1 src[i] = byte(val) i++
val = ((uint32(ts>>15) & 0x7fff) << 1) | 1 src[i] = byte(val >> 8) i++ src[i] = byte(val) i++
val = (uint32(ts&0x7fff) << 1) | 1 src[i] = byte(val >> 8) i++ src[i] = byte(val)}
复制代码

scheduler

scheduler 主要是来调度任务,那么主要是哪些任务呢?主要是那些普通 api 无法立即给结果的任务。比如:我们视频网站需要一些视频审核、数据恢复的需求。这时候,我们需要做一些 short delay,用户看不到,但后台还是存在的。这就需要 scheduler 异步处理。还比如有些周期性的任务。在 Scheduler 中,还存在 Timer,定时器主要用来作定时处理 task 的。


所以,我们的架构图:


在本小节中,我们采用 runner 的生产、消费者模式实现。具体代码如下:

package taskrunner
import ()
type Runner struct { Controller controlChan Error controlChan Data dataChan dataSize int longLived bool Dispatcher fn  Executor fn}
func NewRunner(size int, longlived bool, d fn, e fn) *Runner { return &Runner {  Controller: make(chan string, 1),  Error: make(chan string, 1),  Data: make(chan interface{}, size),  longLived: longlived,  dataSize: size,  Dispatcher: d,  Executor: e, }}
func (r *Runner) startDispatch() { defer func() {  if !r.longLived {   close(r.Controller)   close(r.Data)   close(r.Error)  } }()
 for {  select {  case c :=<- r.Controller:   if c == READY_TO_DISPATCH {    err := r.Dispatcher(r.Data)    if err != nil {     r.Error <- CLOSE    } else {     r.Controller <- READY_TO_EXECUTE    }   }
   if c == READY_TO_EXECUTE {    err := r.Executor(r.Data)    if err != nil {     r.Error <- CLOSE    } else {     r.Controller <- READY_TO_DISPATCH    }   }  case e :=<- r.Error:   if e == CLOSE {    return   }  default:
  } }}
func (r *Runner) StartAll() { r.Controller <- READY_TO_DISPATCH r.startDispatch()}
复制代码

Runner 是可以复用的,而接下来介绍的 Task 是定制 Runner 的。比如:我们延迟删除视频。

我们先拿到数据,看看:

package dbops
import ( "log" _ "github.com/go-sql-driver/mysql")
func ReadVideoDeletionRecord(count int) ([]string, error) { stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?")
 var ids []string
 if err != nil {  return ids, err }
 rows, err := stmtOut.Query(count) if err != nil {  log.Printf("Query VideoDeletionRecord error: %v", err)  return ids, err }
 for rows.Next() {  var id string  if err := rows.Scan(&id); err != nil {   return ids, err  }
  ids = append(ids, id) }
 defer stmtOut.Close() return ids, nil}
func DelVideoDeletionRecord(vid string) error { stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?") if err != nil {  return err }
 _, err = stmtDel.Exec(vid) if err != nil {  log.Printf("Deleting VideoDeletionRecord error: %v", err)  return err }
 defer stmtDel.Close() return nil}
复制代码

拿到数据后,需要处理,这时需要 task:

package taskrunner
import ( "os" "errors" "log" "sync" "github.com/avenssi/video_server/scheduler/dbops")
func deleteVideo(vid string) error { err := os.Remove(VIDEO_PATH + vid)
 if err != nil && !os.IsNotExist(err) {  log.Printf("Deleting video error: %v", err)  return err }
 return nil}
func VideoClearDispatcher(dc dataChan) error { res, err := dbops.ReadVideoDeletionRecord(3) if err != nil {  log.Printf("Video clear dispatcher error: %v", err)  return err }
 if len(res) == 0 {  return errors.New("All tasks finished") }
 for _, id := range res {  dc <- id }
 return nil}
func VideoClearExecutor(dc dataChan) error { errMap := &sync.Map{} var err error
 forloop:  for {   select {   case vid :=<- dc:    go func(id interface{}) {     if err := deleteVideo(id.(string)); err != nil {      errMap.Store(id, err)      return     }     if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil {      errMap.Store(id, err)      return      }    }(vid)   default:    break forloop   }  }
 errMap.Range(func(k, v interface{}) bool {  err = v.(error)  if err != nil {   return false  }  return true })
 return err}
复制代码

以上就是关于异步、定时处理视频流信息过程。

stream server

  • Streaming

  • Upload files


Streaming 主要区别于普通的链接,它需要保持长链接,与短链接是不一样的,当发送一个 request 过来,会不断与客户端输出数据流,而且会很长。所以在多路长链接同时保持的时候,出现一个问题,如果不断的发起链接、打开网页,最终会把我们的服务给 crash 掉,所以,我们需要进行流控:limit,这里的流控可能只在 connect 时候进行限制。

package main 
import ( "log")
type ConnLimiter struct { concurrentConn int bucket chan int}
func NewConnLimiter(cc int) *ConnLimiter { return &ConnLimiter {  concurrentConn: cc,  bucket: make(chan int, cc), }}
func (cl *ConnLimiter) GetConn() bool { if len(cl.bucket) >= cl.concurrentConn {  log.Printf("Reached the rate limitation.")  return false }
 cl.bucket <- 1 return true}
func (cl *ConnLimiter) ReleaseConn() { c :=<- cl.bucket log.Printf("New connction coming: %d", c)}
复制代码

加了流控后,我们需要在 http middleware 中嵌入流控,同样,我们在启动时,都需要注册 router 以及 http server,所以代码如下:

package main 
import ( "net/http" "github.com/julienschmidt/httprouter")
type middleWareHandler struct { r *httprouter.Router l *ConnLimiter}
func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler { m := middleWareHandler{} m.r = r m.l = NewConnLimiter(cc) return m}
func RegisterHandlers() *httprouter.Router { router := httprouter.New()
 router.GET("/videos/:vid-id", streamHandler)
 router.POST("/upload/:vid-id", uploadHandler)
 router.GET("/testpage", testPageHandler)
 return router}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !m.l.GetConn() {  sendErrorResponse(w, http.StatusTooManyRequests, "Too many requests")  return }
 m.r.ServeHTTP(w, r) defer m.l.ReleaseConn()}
func main() { r := RegisterHandlers() mh := NewMiddleWareHandler(r, 2) http.ListenAndServe(":2000", mh)}
复制代码

最后,我们来看看 streamHandler 如何来处理:

func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) { vid := p.ByName("vid-id") vl := VIDEO_DIR + vid
 video, err := os.Open(vl) if err != nil {  log.Printf("Error when try to open file: %v", err)  sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")  return }
 w.Header().Set("Content-Type", "video/mp4") http.ServeContent(w, r, "", time.Now(), video)
 defer video.Close()}
复制代码

我们这里采用比较通用的做法:在拿到流唯一信息后,直接处理。Upload files 时,我们需要做静态检查,然后把数据从中读取:

func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) { r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE) if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil {  sendErrorResponse(w, http.StatusBadRequest, "File is too big")  return  }
 file, _, err := r.FormFile("file") if err != nil {  log.Printf("Error when try to get file: %v", err)  sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")  return  }
 data, err := ioutil.ReadAll(file) if err != nil {  log.Printf("Read file error: %v", err)  sendErrorResponse(w, http.StatusInternalServerError, "Internal Error") }
 fn := p.ByName("vid-id") err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666) if err != nil {  log.Printf("Write file error: %v", err)  sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")  return }
 w.WriteHeader(http.StatusCreated) io.WriteString(w, "Uploaded successfully")}
复制代码

网站部署

云原生是一个抽象又具体的存在。它不是一个具体的产品,而是一套技术体系和一套方法论,随着围绕着云原生架构的各类开源技术的进一步发展,云原生技术体系必将成为主流,进而影响到每一个技术人员、每一个企业。所以我们跟随潮流,迈入云原生时代,今天主要是使用 k8s 来部署我们的网站服务。


先想给之前的代码进行编译打包:

FROM ubuntu:16.04 as build
ENV TZ=Asia/ShanghaiRUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get update && apt-get install -y --no-install-recommends \        g++ \        ca-certificates \        wget && \    rm -rf /var/lib/apt/lists/*
ENV GOLANG_VERSION 1.15.1RUN wget -nv -O - https://studygolang.com/dl/golang/go1.15.1.linux-amd64.tar.gz \     | tar -C /usr/local -xz

ENV GOPROXY=https://goproxy.cn,directENV GO111MODULE=onENV GOPATH /goENV PATH $GOPATH/bin:/usr/local/go/bin:$PATH
WORKDIR /go/srcCOPY . .WORKDIR /go/src/video-serviceRUN  sed -i "/runmode/crunmode=pro" /go/src/video-service/conf/app.confRUN export CGO_LDFLAGS_ALLOW='-Wl,--unresolved-symbols=ignore-in-object-files' && \    go install -ldflags="-s -w" -v /go/src/video-service
FROM ubuntu:16.04WORKDIR /video-service
RUN mkdir -p logCOPY --from=build /go/bin/video-service /video-serviceCMD ["./video-service"]
复制代码

k8s 部署服务的脚本很简单,通过简单的 yml 或 json 格式的数据调用 k8s 本身的 api 服务,即可完成 k8s 对服务的部署。接下来,补充部署脚本:

---apiVersion: apps/v1kind: DaemonSetmetadata:  labels:    app: video-service  name: video-service  namespace: system-serverspec:  replicas: 1  selector:    matchLabels:      app: video-service  template:    metadata:      labels:        app: video-service    spec:      containers:        - image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}video-service          imagePullPolicy: Always          name: video-service          ports:            - containerPort: 1000          #livenessProbe:            #httpGet:              #path: /api/v1/healthz              #port: 1000              #scheme: HTTP            #initialDelaySeconds: 15            #periodSeconds: 10            #timeoutSeconds: 3            #failureThreshold: 5          volumeMounts:            - name: video-service-config              mountPath: /video-service/conf      volumes:        - name: video-service-config          configMap:            name: video-service-config      nodeSelector:        video-service: "true"      restartPolicy: Always
复制代码

执行编译打包后,部署脚本:

sh build/build.shkubectl create -f deploy.yml
复制代码

这里使用 K8s 部署,k8s 的主要好处就是,在执行命令后,会自动给我们创建底层容器 pod,并且管理这些容器,我们来看看服务启动的情况:

tess@cm001:~$ kubectl describe po video-service-646ccc4c5c-qrpgp -n system-server Name:           video-service-646ccc4c5c-qrpgpNamespace:      system-serverPriority:       0Node:           cm001/10.11.32.21Start Time:     Wed, 21 Apr 2021 15:38:48 +0800Labels:         app=video-service                pod-template-hash=646ccc4c5cAnnotations:    cni.projectcalico.org/podIP: 20.247.87.138/32Status:         RunningIP:             20.247.87.138Controlled By:  ReplicaSet/video-service-646ccc4c5cContainers:  video-service:    Container ID:   docker://f284ceac649f4e1a29ac77cdd425ccc852caf67cc9c133144a7d1c8747e32aea    Image:          minicub/video-service    Image ID:       docker-pullable://minicub/video-service@sha256:3036927b55c6be9efc4cf34d6ad04aba093e04b49807c18ce0f7a418b2577bf4    Port:           1000/TCP    Host Port:      0/TCP    State:          Running      Started:      Wed, 28 Apr 2021 16:01:15 +0800    Ready:          True    Restart Count:  1    Environment:    <none>    Mounts:      /video-service/conf from video-service-config (rw)      /var/run/secrets/kubernetes.io/serviceaccount from default-token-62wgr (ro)Conditions:  Type              Status  Initialized       True   Ready             True   ContainersReady   True   PodScheduled      True Volumes:  video-service-config:    Type:      ConfigMap (a volume populated by a ConfigMap)    Name:      video-service-config    Optional:  false  default-token-62wgr:    Type:        Secret (a volume populated by a Secret)    SecretName:  default-token-62wgr    Optional:    falseQoS Class:       BestEffortNode-Selectors:  video-service=trueTolerations:     node.kubernetes.io/not-ready:NoExecute for 300s                 node.kubernetes.io/unreachable:NoExecute for 300sEvents:          <none>
复制代码

这里使用 K8s 部署到机器上。部署后的服务访问地址:http://10.11.32.21:1000


最后,我们打开网址,访问视频,我们可以看到一些界面效果:


发布于: 2021 年 05 月 18 日阅读数: 1353
用户头像

Lavender

关注

还未添加个人签名 2020.04.07 加入

还未添加个人简介

评论

发布
暂无评论
使用Go语言开发流媒体视频网站