写点什么

OpenHarmony 3.2 Beta 多媒体系列——音视频播放 gstreamer

  • 2022-11-24
    上海
  • 本文字数:8778 字

    阅读完需:约 29 分钟

OpenHarmony 3.2 Beta多媒体系列——音视频播放gstreamer

一、 简介

多媒体播放框架主要的实现在 PlayerServer 服务中,这个服务提供了媒体播放框架所需要的实现环境,继续跟踪代码分析发现,PlayerServer 主要通过 gstreamer 适配层,对 gstreamer 进行调用。gstreamer 属于更加具体的实现,所以本篇文章主要是分析 PlayerServer 通过适配层调用到 gstreamer 的过程。


此前,我在《OpenHarmony 3.2 Beta多媒体系列-音视频播放框架》一文中,主要分析了多媒体播放的框架层代码,本地接口通过服务端的 proxy 代理类进行 IPC 调用,最终调用到 PlayerServer 服务端。本篇主要分析了多媒体 gstreamer 的调用,涉及到从 PlayerServer 到 gstreamer 的整体流程。


二、目录

 gstreamer    ├── BUILD.gn    ├── common    │   ├── BUILD.gn    │   ├── playbin_adapter    │   │   ├── i_playbin_ctrler.h    │   │   ├── playbin2_ctrler.cpp    │   │   ├── playbin2_ctrler.h    │   │   ├── playbin_ctrler_base.cpp    │   │   ├── playbin_ctrler_base.h    │   │   ├── playbin_msg_define.h    │   │   ├── playbin_sink_provider.h    │   │   ├── playbin_state.cpp    │   │   ├── playbin_state.h    │   │   ├── playbin_task_mgr.cpp    │   │   └── playbin_task_mgr.h    │   ├── state_machine    │   │   ├── state_machine.cpp    │   │   └── state_machine.h    ├── factory    │   ├── BUILD.gn    │   └── engine_factory.cpp    └── player        ├── BUILD.gn        ├── player_codec_ctrl.cpp        ├── player_codec_ctrl.h        ├── player_engine_gst_impl.cpp        ├── player_engine_gst_impl.h        ├── player_sinkprovider.cpp        ├── player_sinkprovider.h        ├── player_track_parse.cpp        └── player_track_parse.h
复制代码


目录主要是多媒体子系统中的 engine 部分,涉及到了 gstreamer 的适配层,gstreamer 具体的实现是在 third_party/gstreamer 目录中。


三 、Gstreamer 介绍


  1. 简介

Gstreamer 是一个跨平台的多媒体框架,应用程序可以通过管道(Pipeline)的方式,将多媒体处理的各个步骤串联起来,达到预期的效果。每个步骤通过元素(Element)基于 GObject 对象系统通过插件(plugins)的方式实现,方便了各项功能的扩展。


2.Gstreamer 几个重要的概念

Element

Element 是 Gstreamer 中最重要的对象类型之一。一个 element 实现一个功能(读取文件,解码,输出等),程序需要创建多个 element,并按顺序将其串联起来,构成一个完整的 Pipeline。


Pad

Pad 是一个 element 的输入/输出接口,分为 src pad(生产数据)和 sink pad(消费数据)两种。


两个 element 必须通过 pad 才能连接起来,pad 拥有当前 element 能处理数据类型的能力(capabilities),会在连接时通过比较 src pad 和 sink pad 中所支持的能力,来选择最恰当的数据类型用于传输,如果 element 不支持,程序会直接退出。在 element 通过 pad 连接成功后,数据会从上一个 element 的 src pad 传到下一个 element 的 sink pad 然后进行处理。


Bin 和 Pipeline

Bin 是一个容器,用于管理多个 element,改变 bin 的状态时,bin 会自动去修改所包含的 element 的状态,也会转发所收到的消息。如果没有 bin,我们需要依次操作我们所使用的 element。通过 bin 降低了应用的复杂度。


Pipeline 继承自 bin,为程序提供一个 bus 用于传输消息,并且对所有子 element 进行同步。当将 Pipeline 的状态设置为 PLAYING 时,Pipeline 会在一个/多个新的线程中通过 element 处理数据。


四、调用流程


五、源码分析


  1. PrepareAsync 分析

首先,在 PlayerServer 的 PrepareAsync 中会调用 OnPrepare(false),具体是在 OnPrepare(false)中实现,参数传入 false,表明调用的是异步方法。

int32_t PlayerServer::PrepareAsync(){    std::lock_guard<std::mutex> lock(mutex_);    MEDIA_LOGW("KPI-TRACE: PlayerServer PrepareAsync in");
if (lastOpStatus_ == PLAYER_INITIALIZED || lastOpStatus_ == PLAYER_STOPPED) { return OnPrepare(false); } else { MEDIA_LOGE("Can not Prepare, currentState is %{public}s", GetStatusDescription(lastOpStatus_).c_str()); return MSERR_INVALID_OPERATION; }}
复制代码


OnPrepare 方法中,先通过 playerEngine_调用 SerVideoSurface 的方法,将 surface_设置到 PlayerEngineGstImpl 中(producerSurface_),接着启动一个任务,调用目前状态的 Prepare()方法。


int32_t PlayerServer::OnPrepare(bool sync){    CHECK_AND_RETURN_RET_LOG(playerEngine_ != nullptr, MSERR_NO_MEMORY, "playerEngine_ is nullptr");    int32_t ret = MSERR_OK;
#ifdef SUPPORT_VIDEO if (surface_ != nullptr) { ret = playerEngine_->SetVideoSurface(surface_); CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, MSERR_INVALID_OPERATION, "Engine SetVideoSurface Failed!"); }#endif
lastOpStatus_ = PLAYER_PREPARED;
auto preparedTask = std::make_shared<TaskHandler<int32_t>>([this]() { MediaTrace::TraceBegin("PlayerServer::PrepareAsync", FAKE_POINTER(this)); auto currState = std::static_pointer_cast<BaseState>(GetCurrState()); return currState->Prepare(); });
ret = taskMgr_.LaunchTask(preparedTask, PlayerServerTaskType::STATE_CHANGE); CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "Prepare launch task failed");
if (sync) { (void)preparedTask->GetResult(); // wait HandlePrpare } return MSERR_OK;}

复制代码


进入 Preparing 状态后,会触发 PlayerServer 的 HandlePrepare()方法被调用,在这个方法里会通过 playerEngine_调用 PrepareAsync 方法,这个方法调用的是 PlayerEngineGstImpl 对应的 PrepareAsync 方法。

int32_t PlayerServer::HandlePrepare(){    int32_t ret = playerEngine_->PrepareAsync();    CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, MSERR_INVALID_OPERATION, "Server Prepare Failed!");    if (config_.leftVolume <= 1.0f || config_.rightVolume <= 1.0f) {        ret = playerEngine_->SetVolume(config_.leftVolume, config_.rightVolume);        MEDIA_LOGD("Prepared SetVolume leftVolume:%{public}f rightVolume:%{public}f, ret:%{public}d", \                   config_.leftVolume, config_.rightVolume, ret);    }    (void)playerEngine_->SetLooping(config_.looping);
{ auto rateTask = std::make_shared<TaskHandler<void>>([this]() { auto currState = std::static_pointer_cast<BaseState>(GetCurrState()); (void)currState->SetPlaybackSpeed(config_.speedMode); });
(void)taskMgr_.LaunchTask(rateTask, PlayerServerTaskType::RATE_CHANGE); } return MSERR_OK;}

复制代码


首先初始化 playBinCtrler_,后续的操作都是通过 PlayBinCtrlerBase 对象来操作的,所以 PlayBinCtrlerInit()方法会创建 PlayBinCtrlerBase 对象(playBinCtrler_),创建好以后通过 playBinCtrler_进行 SetSource 和 SetXXXListener 的设置。


int32_t PlayerEngineGstImpl::PrepareAsync(){    std::unique_lock<std::mutex> lock(mutex_);    MEDIA_LOGD("Prepare in");
int32_t ret = PlayBinCtrlerInit(); CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, MSERR_INVALID_VAL, "PlayBinCtrlerInit failed");
CHECK_AND_RETURN_RET_LOG(playBinCtrler_ != nullptr, MSERR_INVALID_VAL, "playBinCtrler_ is nullptr"); ret = playBinCtrler_->PrepareAsync(); CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "PrepareAsync failed");
// The duration of some resources without header information cannot be obtained. MEDIA_LOGD("Prepared ok out"); return MSERR_OK;}
复制代码


初始化完成以后,接下来进行 playBinCtrler_的 PrepareAsync 的调用,PlayBinCtrlerBase 中的 PrepareAsync 的方法间接地调用了 PrepareAsyncInternal。

int32_t PlayBinCtrlerBase::PrepareAsync(){    MEDIA_LOGD("enter");
std::unique_lock<std::mutex> lock(mutex_); return PrepareAsyncInternal();}
复制代码


PrepareAsyncInternal 首先判断当前的状态,如果是 preparingState 或 preparedState,那么就直接返回成功,否则继续向下调用。接下来会调用 EnterInitializedState(),这个方法中会创建 playbin,设置 signal 的回调以及 gstreamer 参数的设置。最后调用目前状态的 Prepare 方法,此时的状态是 InitializedState。


int32_t PlayBinCtrlerBase::PrepareAsyncInternal(){    if ((GetCurrState() == preparingState_) || (GetCurrState() == preparedState_)) {        MEDIA_LOGI("already at preparing state, skip");        return MSERR_OK;    }
CHECK_AND_RETURN_RET_LOG((!uri_.empty() || appsrcWrap_), MSERR_INVALID_OPERATION, "Set uri firsty!");
int32_t ret = EnterInitializedState(); CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
auto currState = std::static_pointer_cast<BaseState>(GetCurrState()); ret = currState->Prepare(); CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "PrepareAsyncInternal failed");
return MSERR_OK;}

复制代码


InitializedState 的 Prepare 方法又通过 ctrler_调回到 PlayBinCtrlerBase 的 ChangeState 方法,这个方法是在 PlayBinCtrlerBase 的父类 StateMachine 中,它是一个状态机,管理着各种状态的切换。

int32_t PlayBinCtrlerBase::InitializedState::Prepare(){    ctrler_.ChangeState(ctrler_.preparingState_);    return MSERR_OK;}
复制代码


很多表示状态的类在 PlayBinCtrlerBase 中进行声明,这些子类的具体实现功能在 playbin_state.cpp 中。

private:    class BaseState;    class IdleState;    class InitializedState;    class PreparingState;    class PreparedState;    class PlayingState;    class PausedState;    class StoppedState;    class StoppingState;    class PlaybackCompletedState;
复制代码


接下来看一下状态机的 ChangeState 方法,可以看出切换状态的时候,先调用切换前状态的 StateExit()方法,再调用切换后状态的 StateEnter()。如果需要一些操作,我们可以在状态的 StateEnter 和 StateExit 中进行。

void StateMachine::ChangeState(const std::shared_ptr<State> &state){    ......    if (currState_ != nullptr && currState_->GetStateName() == "stopping_state" && state->GetStateName() != "stopped_state") {        return;    }    if (currState_) {        currState_->StateExit();    }    currState_ = state;    state->StateEnter();}
复制代码


因为上面切换状态调用的是 ctrler_.ChangeState(ctrler_.preparingState_),所以接下来看一下 PreparingState 状态的 StateEnter 方法。这个方法中首先是调用了 ctrler_.ReportMessage(msg),字面上看是用来上报 msg 信息的。


void PlayBinCtrlerBase::PreparingState::StateEnter(){    PlayBinMessage msg = { PLAYBIN_MSG_SUBTYPE, PLAYBIN_SUB_MSG_BUFFERING_START, 0, {} };    ctrler_.ReportMessage(msg);
GstStateChangeReturn ret; (void)ChangePlayBinState(GST_STATE_PAUSED, ret);
MEDIA_LOGD("PreparingState::StateEnter finished");}
复制代码


ctrler_是 PlayBinCtrlerBase 类型的变量,直接看 PlayBinCtrlerBase 的 ReportMessage 方法,这个方法的核心,是创建一个任务后,将任务放入消息队列中,等待消息被处理,这里我们最想知道的是这个消息会在什么地方被处理。msgReportHandler 创建了 TaskHandler,这个里面会调用 notifier_(msg),这里的 notifier_比较重要,我们可以顺着这个变量向上分析。

void PlayBinCtrlerBase::ReportMessage(const PlayBinMessage &msg){    ......    auto msgReportHandler = std::make_shared<TaskHandler<void>>([this, msg]() { notifier_(msg); });    int32_t ret = msgQueue_->EnqueueTask(msgReportHandler);    if (ret != MSERR_OK) {        MEDIA_LOGE("async report msg failed, type: %{public}d, subType: %{public}d, code: %{public}d",                   msg.type, msg.subType, msg.code);    };
if (msg.type == PlayBinMsgType::PLAYBIN_MSG_EOS) { ProcessEndOfStream(); }}
复制代码


notifier_是在 PlayBinCtrlerBase 被创建的时候赋值的。

PlayBinCtrlerBase::PlayBinCtrlerBase(const PlayBinCreateParam &createParam)    : renderMode_(createParam.renderMode),    notifier_(createParam.notifier),    sinkProvider_(createParam.sinkProvider){    MEDIA_LOGD("enter ctor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));}
复制代码


在源码分析的前期 PlayerEngineGstImpl 初始化 PlayBinCtrlerBase 的时候进行了创建 notifier = std::bind(&PlayerEngineGstImpl::OnNotifyMessage, this, std::placeholders::_1) notifier 相当于是调用了 PlayerEngineGstImpl::OnNotifyMessage 方法。所以上述中的处理函数就是 PlayerEngineGstImpl::OnNotifyMessage。


int32_t PlayerEngineGstImpl::PlayBinCtrlerPrepare(){    uint8_t renderMode = IPlayBinCtrler::PlayBinRenderMode::DEFAULT_RENDER;    auto notifier = std::bind(&PlayerEngineGstImpl::OnNotifyMessage, this, std::placeholders::_1);
{ std::unique_lock<std::mutex> lk(trackParseMutex_); sinkProvider_ = std::make_shared<PlayerSinkProvider>(producerSurface_); sinkProvider_->SetAppInfo(appuid_, apppid_); }
IPlayBinCtrler::PlayBinCreateParam createParam = { static_cast<IPlayBinCtrler::PlayBinRenderMode>(renderMode), notifier, sinkProvider_ }; playBinCtrler_ = IPlayBinCtrler::Create(IPlayBinCtrler::PlayBinKind::PLAYBIN2, createParam); ...... return MSERR_OK;}
复制代码


在 OnNotifyMessage 中指定了各种消息类型对应的执行函数,上述代码中创建的 Message 类型是 PLAYBIN_MSG_SUBTYPE,子类型为 PLAYBIN_SUB_MSG_BUFFERING_START。

void PlayerEngineGstImpl::OnNotifyMessage(const PlayBinMessage &msg){    const std::unordered_map<int32_t, MsgNotifyFunc> MSG_NOTIFY_FUNC_TABLE = {        { PLAYBIN_MSG_ERROR, std::bind(&PlayerEngineGstImpl::HandleErrorMessage, this, std::placeholders::_1) },        { PLAYBIN_MSG_SEEKDONE, std::bind(&PlayerEngineGstImpl::HandleSeekDoneMessage, this, std::placeholders::_1) },        { PLAYBIN_MSG_SPEEDDONE, std::bind(&PlayerEngineGstImpl::HandleInfoMessage, this, std::placeholders::_1) },        { PLAYBIN_MSG_BITRATEDONE, std::bind(&PlayerEngineGstImpl::HandleInfoMessage, this, std::placeholders::_1)},        { PLAYBIN_MSG_EOS, std::bind(&PlayerEngineGstImpl::HandleInfoMessage, this, std::placeholders::_1) },        { PLAYBIN_MSG_STATE_CHANGE, std::bind(&PlayerEngineGstImpl::HandleInfoMessage, this, std::placeholders::_1) },        { PLAYBIN_MSG_SUBTYPE, std::bind(&PlayerEngineGstImpl::HandleSubTypeMessage, this, std::placeholders::_1) },        { PLAYBIN_MSG_AUDIO_SINK, std::bind(&PlayerEngineGstImpl::HandleAudioMessage, this, std::placeholders::_1) },        { PLAYBIN_MSG_POSITION_UPDATE, std::bind(&PlayerEngineGstImpl::HandlePositionUpdateMessage, this,            std::placeholders::_1) },    };    if (MSG_NOTIFY_FUNC_TABLE.count(msg.type) != 0) {        MSG_NOTIFY_FUNC_TABLE.at(msg.type)(msg);    }}
复制代码


最终的流程走到了 PlayerEngineGstImpl::HandleBufferingStart(),在这个方法中,主要通过 obs_将 format 传给 IPlayerEngineObs 的 OnInfo 方法。

void PlayerEngineGstImpl::HandleBufferingStart(){    percent_ = 0;    Format format;(void)format.PutIntValue(std::string(PlayerKeys::PLAYER_BUFFERING_START), 0);    std::shared_ptr<IPlayerEngineObs> notifyObs = obs_.lock();    if (notifyObs != nullptr) {        notifyObs->OnInfo(INFO_TYPE_BUFFERING_UPDATE, 0, format);    }}
复制代码


我们重点看一下 obs_是哪里设置的,在 PlayerServer 的初始化 InitPlayEngine。shared_from_this()相当于是把 PlayerServer 自身赋值给 obs,PlayerServer 也是实现了 IPlayerEngineObs 对应的接口。


int32_t PlayerServer::InitPlayEngine(const std::string &url){    ......    int32_t ret = taskMgr_.Init();    auto engineFactory = EngineFactoryRepo::Instance().GetEngineFactory(IEngineFactory::Scene::SCENE_PLAYBACK, url);        playerEngine_ = engineFactory->CreatePlayerEngine(appUid_, appPid_);
if (dataSrc_ == nullptr) { ret = playerEngine_->SetSource(url); } else { ret = playerEngine_->SetSource(dataSrc_); }
std::shared_ptr<IPlayerEngineObs> obs = shared_from_this(); ret = playerEngine_->SetObs(obs);
lastOpStatus_ = PLAYER_INITIALIZED; ChangeState(initializedState_);
return MSERR_OK;}

复制代码


这样我们就跟踪到了 PlayerServer 的 OnInfo()方法。


void PlayerServer::OnInfo(PlayerOnInfoType type, int32_t extra, const Format &infoBody){    std::lock_guard<std::mutex> lockCb(mutexCb_);
int32_t ret = HandleMessage(type, extra, infoBody); if (playerCb_ != nullptr && ret == MSERR_OK) { playerCb_->OnInfo(type, extra, infoBody); }}
复制代码


  1. Play 分析从

PlayerServer 开始跟踪,调用到 PlayerServer 的 OnPlay()方法。

int32_t PlayerServer::Play(){    ......    if (lastOpStatus_ == PLAYER_PREPARED || lastOpStatus_ == PLAYER_PLAYBACK_COMPLETE ||        lastOpStatus_ == PLAYER_PAUSED) {        return OnPlay();    } else {        return MSERR_INVALID_OPERATION;    }}
复制代码


在 OnPlay 中会启动一个任务,在任务中获取当前的状态,然后调用当前状态的 Play()方法。


int32_t PlayerServer::OnPlay(){    ......    auto playingTask = std::make_shared<TaskHandler<void>>([this]() {        auto currState = std::static_pointer_cast<BaseState>(GetCurrState());        (void)currState->Play();    });
int ret = taskMgr_.LaunchTask(playingTask, PlayerServerTaskType::STATE_CHANGE);
lastOpStatus_ = PLAYER_STARTED; return MSERR_OK;}
复制代码


前面调用了 PrepareAsync,所以当前的状态是 Prepared,调用到了 PreparedState 的 Play()方法,这个方法还是按照之前 Prepare 的方式,调回到 PlayerServer 的 HandlePlay()。

int32_t PlayerServer::PreparedState::Play(){    return server_.HandlePlay();}
复制代码

在 PlayServer 中通过播放引擎继续向下调用。


int32_t PlayerServer::HandlePlay(){    int32_t ret = playerEngine_->Play();    CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, MSERR_INVALID_OPERATION, "Engine Play Failed!");
return MSERR_OK;}

复制代码


在 PlayerEngineGstImpl 的 Play()方法会继续调用 playBinCtrler_的 Play()方法。

int32_t PlayerEngineGstImpl::Play(){    ......    playBinCtrler_->Play();    return MSERR_OK;}
复制代码


PlayBinCtrlerBase 的 Play()方法根据当前的 State,调用 currSate->Play()。


int32_t PlayBinCtrlerBase::Play(){    ......    auto currState =    std::static_pointer_cast<BaseState>(GetCurrState());    int32_t ret = currState->Play();
return MSERR_OK;}

复制代码


在 PreparedState 的 Play()方法中改变了 PlayBin 的状态为 playing。

    int32_t PlayBinCtrlerBase::PreparedState::Play(){    GstStateChangeReturn ret;    return ChangePlayBinState(GST_STATE_PLAYING, ret);}
复制代码


ChangePlayBinState 主要是调用了 gst_element_set_state(GST_ELEMENT_CAST(ctrler_.playbin_),GST_STATE_PLAYING),这个直接调用了 gstreamer 三方库的实现,调用完这个方法以后,gstreamer 就开始进行播放了。


int32_t PlayBinCtrlerBase::BaseState::ChangePlayBinState(GstState targetState, GstStateChangeReturn &ret){    ......    ret = gst_element_set_state(GST_ELEMENT_CAST(ctrler_.playbin_), targetState);    if (ret == GST_STATE_CHANGE_FAILURE) {        MEDIA_LOGE("Failed to change playbin's state to %{public}s", gst_element_state_get_name(targetState));        return MSERR_INVALID_OPERATION;    }
return MSERR_OK;}

复制代码


六、总结

本篇文章主要从 PlayerServer 播放服务开始分析音视频播放的流程,涉及到 gstreamer 引擎的调用,相对于多媒体播放框架来说,更加底层,便于熟悉从框架到 gstreamer 的整体流程。


用户头像

OpenHarmony开发者官方账号 2021-12-15 加入

OpenHarmony是由开放原子开源基金会(OpenAtom Foundation)孵化及运营的开源项目,目标是面向全场景、全连接、全智能时代,基于开源的方式,搭建一个智能终端设备操作系统的框架和平台,促进万物互联产业的繁荣发展

评论

发布
暂无评论
OpenHarmony 3.2 Beta多媒体系列——音视频播放gstreamer_OpenHarmony_OpenHarmony开发者社区_InfoQ写作社区