写点什么

Go 语言实战流媒体视频网站

用户头像
Lavender
关注
发布于: 19 小时前

简介

流媒体如今已经成为工业上的一个重要技术了,比如:直播网站、视频监控传输、APP 直播等,如何实现一个高并发的视频网站,这就涉及到语言技术的选型以及流媒体技术的使用,本节将主要介绍如何使用 Golang 来实现一个流媒体视频网站。

目录

  • 为什么选择 Go 以及 Go 的一些优势

  • GoLang 的简介以及实现一个 webserver 工具链

  • Golang 的 channel 并发模式

  • 用 Golang 完成一个流媒体网站

为什么选择 Go 以及 Go 的一些优势

为什么会选择 Go 来开发视频网站呢?这其实主要体现在 Go 语言的优势。那么 Go 有哪些优势呢?

  • 开发效率高,不管用其他语言,都需要很多其他的配置或插件,就连全家桶配套齐全的 Java 语言都会需要一个 Servlet 引擎,如:tomcat、jetty 等。但 Go 在这方面,提供了得天独厚的能力。大部分功能和内容已经集成在了 pkg。包括开发完整的开发工具链(tools、test、benchmark、builtin.etc),包括 Go 命令(go test、go install、go build)。这些都是完整的,直接下载 Go 后即可使用。包括音视频相关的插件、配置,都已经被包含在了 pkg。所以用 go 开发音视频,可以完美诠释 go 语言的优势。

  • 另一方面,部署简单,go 属于编译性语言,而且是能够编译多个平台可执行文件的语言。Compile once,run everywhere,直接编译后生成二进制文件,直接运行。

  • 良好的 native http 库、集成模板引擎,无需添加第三方框架。


GoLang 的简介以及实现一个 webserver 工具链


Go 语言是一种编译性语言,一个开源的编程语言,能够让构造简单、可靠且高效的软件变得容易。而且它的目标是兼具 Python 等动态语言的开发速度和集成 C/C++等编译语言的性能与安全性。它的主要特性是:


简洁、快速、安全、并行、内存管理、数组安全、编译迅速。


Go 中有一些常见的工具链,比如:


go build,编译 go 文件,可以跨平台编译:env GOOS=linux GOARCH=amd64 go build,在 CI/CD 中,这是一个非常有用的命令。


go install,这也是编译,但与 build 的区别是编译后将输出文件打包成库放在 pkg 下。


go get,用于获取 go 的第三方包,常见的是:go get -u git 地址,表示从 git 上获取某个资源并安装到本地。


go fmt,统一代码风格、排版,这将使得 go 代码更加易读、易理解。


go test,运行当前目录下的 tests,"go test -v" 会打印所有的信息,而"go test"只会打印测试的结果。


go 的 test 文件一般以 XXX_test.go 命名,这样,在执行"go test"的时候,程序会自动去执行那些被加了 test 的文件,这是一种约定的方式。


要点:


使用 TestMain 作为初始化 test,并且使用 Run()来调用其它 tests 可以完成一些需要初始化操作的 testing,如:音视频资源数据库、文件加载等,这些有的可能需要被多次使用,但在设计模式中,只会加载一次到内存,这样,可以减少过多的内存占用,同时可以一次性的进行清理。


func TestMain(m *testing.M) {    fmt.Println("Test begin")    m.Run()}


如果没在其中加 Run(),除了 TestMain 的其它的 tests 都不被执行。


func TestPrint(t *testing.T) {    fmt.Println("Test print")}


func TestMain(m *testing.M) {    fmt.Println("Test begin")    //m.Run()}


按照上面说的,如果没有执行 Run()方法,则 TestPrint 函数不会被执行。

Golang 的 channel 并发模式


在 Go 中,既然有了协程,那么这些协程之间如何通信呢?Go 提供了一个 channel(通道) 来解决。


声明一个 channel


在 Go 语言中,声明一个 channel 非常简单,使用内置的 make 函数即可,如下:


ch:=make(chan string)


其中 chan 是一个关键字,表示是 channel 类型。后面的 string 表示 channel 里的数据是 string 类型。通过 channel 的声明也可以看到,chan 是一个集合类型。


定义好 chan 后就可以使用了,一个 chan 的操作只有两种:发送和接收:


发送:向 chan 发送值,把值放在 chan 中,操作符为 chan <-


接收:获取 chan 中的值,操作符为 <- chan


示例:

package main
import "fmt"
func main() {
ch := make(chan string)
go func() {
fmt.Println("La")
ch <- "发送数据者:La"
}()
fmt.Println("I am main goroutine")
v := <- ch
fmt.Println("接收到的chan中的值为:",v)
}

复制代码

我们先来执行看看打印结果:


I am main goroutine
La
复制代码


接收到的 chan 中的值为:送数据者:La


从运行结果可以看出:达到了使用 time.Sleep 函数的效果。


相信应该明白为什么程序不会在新的 goroutine 完成之前退出了,因为通过 make 创建的 chan 中没有值,而 main goroutine 又想从 chan 中获取值,获取不到就一直等待,等到另一个 goroutine 向 chan 发送值为止。


无缓冲 channel


上面的示例中,使用 make 创建的 chan 就是一个无缓冲 channel,它的容量是 0,不能存储任何数据。所以无缓冲 channel 只起到传输数据的作用,数据并不会在 channel 中做任何停留。这也意味着,无缓冲 channel 的发送和接收操作是同时进行的,它也被称为同步 channel。


有缓冲 channel


有缓冲 channel 类似一个可阻塞的队列,内部的元素先进先出。通过 make 函数的第二个参数可以指定 channel 容量的大小,进而创建一个有缓冲 channel,如:


cacheCh := make(chan int,5)


定义了一个容量为 5 的元素为 int 类型的 chan。


一个有缓冲 channel 具备以下特点:


有缓冲 channel 的内部有一个缓冲队列


发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行,接收操作释放队列的空间


接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行,发送操作插入新的元素


cache := make(chan int,5)

cache <- 2

cache <- 3

fmt.Println("容量:",cap(cache),",元素个数:",len(cache))


无缓冲 channel 其实就是一个容量大小为 0 的 channel。比如 make(chan int,0)


关闭 channel


通过内置函数 close 即可关闭 channel。如果一个 channel 被关闭了,就不能向里面发送数据了,如果发送的话,会引起 painc 异常。但是还可以接收 channel 里的数据,如果 channel 里没有数据的话,接收的数据是元素类型的零值。


单向 channel


所谓单向,即可要不发送,要么只能接收。所以单向 channel 的声明也很简单,只需要在声明的时候带上 <- 操作符即可,如下:


send := make(chan <- int)receive := make(<- chan int)

用 Golang 完成一个流媒体网站


首先,我们写个启动类:

package main
import ( "net/http" "github.com/julienschmidt/httprouter")
type middleWareHandler struct { r *httprouter.Router}
func NewMiddleWareHandler(r *httprouter.Router) http.Handler { m := middleWareHandler{} m.r = r return m}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { //check session validateUserSession(r)
m.r.ServeHTTP(w, r)}
func RegisterHandlers() *httprouter.Router { router := httprouter.New()
router.POST("/user", CreateUser)
router.POST("/user/:user_name", Login)
return router}
func main() { r := RegisterHandlers() mh := NewMiddleWareHandler(r) http.ListenAndServe(":1000", mh)}
复制代码


在这里我们实现了注册、登录以及一些初始化监听端口。接下来,我们需要看看对于后端视频处理时,主要关心的是 session:


package session
import ( "time" "sync" "github.com/avenssi/video_server/api/defs" "github.com/avenssi/video_server/api/dbops" "github.com/avenssi/video_server/api/utils")
var sessionMap *sync.Map
func init() { sessionMap = &sync.Map{}}
func nowInMilli() int64{ return time.Now().UnixNano()/1000000}
func deleteExpiredSession(sid string) { sessionMap.Delete(sid) dbops.DeleteSession(sid)}
func LoadSessionsFromDB() { r, err := dbops.RetrieveAllSessions() if err != nil {  return }
r.Range(func(k, v interface{}) bool{  ss := v.(*defs.SimpleSession)  sessionMap.Store(k, ss)  return true })}
func GenerateNewSessionId(un string) string { id, _ := utils.NewUUID() ct := nowInMilli() ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min
ss := &defs.SimpleSession{Username: un, TTL: ttl} sessionMap.Store(id, ss) dbops.InsertSession(id, ttl, un)
return id}
func IsSessionExpired(sid string) (string, bool) { ss, ok := sessionMap.Load(sid) if ok {  ct := nowInMilli()  if ss.(*defs.SimpleSession).TTL < ct {   deleteExpiredSession(sid)   return "", true  }
return ss.(*defs.SimpleSession).Username, false }
return "", true}
复制代码


从上面的代码中,可以看到,Go 主要引用了相关的视频插件库:avenssi/video_server 等来处理缓存 session。这也是为什么选择 go 开发后端的一个原因。


同时,我们还定义了一个错误码信息:


package defs
type Err struct { Error string json:"error" ErrorCode string json:"error_code"  }
type ErrResponse struct { HttpSC int Error Err}
var ( ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: "Request body is not correct", ErrorCode: "001"}} ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: "User authentication failed.", ErrorCode: "002"}} ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: "DB ops failed", ErrorCode: "003"}} ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: "Internal service error", ErrorCode: "004"}})
复制代码


以上对于业务层中处理主要逻辑就是这些了,下面主要讲推流。


推流


通过 RTMP 协议推送视频流,这里在推流时,需要 cache 缓存,这样可以避免服务端的不断通信导致压力增加而被 crash,因为由于每个进程都是保持长链接,当每个客户端进程不断的发起请求,那么对于服务端的进程是会堵塞的,所以,在传输协议中,大部分音视频这种,流媒体处理方式,是需要加上 cache 来缓冲对于服务器的压力,避免整个服务的瘫痪,导致客户端一直无响应,从而导致客户端进入卡死状态:


func (cache *Cache) Write(p av.Packet) {if p.IsMetadata {cache.metadata.Write(&p)return} else {if !p.IsVideo {ah, ok := p.Header.(av.AudioPacketHeader)if ok {if ah.SoundFormat() == av.SOUND_AAC &&ah.AACPacketType() == av.AAC_SEQHDR {cache.audioSeq.Write(&p)return} else {return}}} else {    vh, ok := p.Header.(av.VideoPacketHeader)    if ok {      if vh.IsSeq() {        cache.videoSeq.Write(&p)        return      }    } else {      return    }
}}cache.gop.Write(&p)

func (cache *Cache) Send(w av.WriteCloser) error {if err := cache.metadata.Send(w); err != nil {return err}if err := cache.videoSeq.Send(w); err != nil { return err}
if err := cache.audioSeq.Send(w); err != nil { return err}
if err := cache.gop.Send(w); err != nil { return err}
return nil}
复制代码


在推流时,主要还是先进行选择:

const (maxQueueNum           = 1024SAVE_STATICS_INTERVAL = 5000)
var (readTimeout = configure.Config.GetInt("read_timeout")writeTimeout = configure.Config.GetInt("write_timeout"))
type Client struct {handler av.Handlergetter av.GetWriter}
func NewRtmpClient(h av.Handler, getter av.GetWriter) *Client {return &Client{handler: h,getter: getter,}}
for { var netconn net.Conn netconn, err = listener.Accept() if err != nil { return } conn := core.NewConn(netconn, 4*1024) log.Debug("new client, connect remote: ", conn.RemoteAddr().String(), "local:", conn.LocalAddr().String()) go s.handleConn(conn)}
复制代码


发布于: 19 小时前阅读数: 21
用户头像

Lavender

关注

还未添加个人签名 2020.04.07 加入

还未添加个人简介

评论

发布
暂无评论
Go语言实战流媒体视频网站