写点什么

手把手教程 | 基于 Amazon Kinesis Video Streams 实现 IP Camera 云存项目

  • 2021 年 12 月 24 日
  • 本文字数:6514 字

    阅读完需:约 21 分钟

手把手教程 | 基于Amazon Kinesis Video Streams实现IP Camera云存项目



近年来市场上 IP Camera 产品价格持续走低,硬件利润单薄,很多厂商通过增值服务发力,增值服务比较成熟的业务形态是云端存储,其实现方式是设备检测报警事件触发一段视频上传云端并提供终端用户回看从而产生收费。Amazon Kinesis Video Streams 实现云存业务优势明显,提供 Device SDK 摄取视频并上传 Endpoint,终端用户可以基于 HLS/DASH 主流的协议播放观看。


本案例基于 Serverless 方式实现云存方案,Devices 推流过程中记录 metadata,通过 Amazon API Gateway,Amazon Lambda,Amazon DynamoDB 留存 video metadata,前端应用通过 video metadata 生成播放 URL,通过 HLS Player 播放。



📢 想要了解更多亚马逊云科技最新技术发布和实践创新,敬请关注在上海、北京、深圳三地举办的 2021 亚马逊云科技中国峰会!点击图片报名吧~


 准备工作


  • 下载 amazon-kinesis-video-streams-producer-sdk-cpp 源码并编译 libKinesisVideoProducerJNI


  • 下载 amazon-kinesis-video-streams-producer-sdk-java 源码


  • 安装 IntelliJ IDEA


架构图



创建 Amazon Kinesis Video Streams


登录 console,区域选择新加坡, 选择 Kinesis Video Streams 服务,创建视频流名称“kvs-stream”, Data retention 选择 7(7 天云存)




创建访问密钥


新建一个用户”CloudStorageUser”, 勾选“Programmatic access”



增加 2 个权限如下图,这里仅用于测试和演示,生产环境建议最小化权限。



拷贝并保存 Access Key ID 和 Secret Access Key,选择 Amazon Secrets Manager 服务添加密钥,新增 Secret key/value, 将 AK/SK 填入



指定密钥名称“ipc-cloudstorage-access-kvs-secretkey”




拷贝 Python3 的 sample code, 后续 Lambda Function 中使用。


创建 Lambda


创建 function 选择名称“save_devices_video_metadata”,Runtime 选择 Python 3.6



HttpMethod 为 PUT,接收 metadata 并存表


if event['httpMethod'] == 'PUT' :
        id = _body['clientID']
        deviceID = _body['deviceID']
        streamName = _body['streamName']
        begTime = _body['begTime']
        endTime = _body['endTime']
        duration = _body['duration']
        res = save_dynamodb_tb(id,deviceID,begTime,endTime,duration,streamName)
复制代码


HTTPMethod 为 GET,获取 HLS URL


if event['httpMethod'] == 'GET' :
            streamName = _body['streamName']
            begTime = _body['begTime']
            endTime = _body['endTime']
            duration = _body['duration']
            descStream =kvs.describe_stream(StreamName=streamName)
            Stream_ARN = descStream['StreamInfo']['StreamARN']
            get_hls_response =
 kvs.get_data_endpoint(APIName="GET_HLS_STREAMING_SESSION_URL",StreamARN=Stream_ARN)
            hls_endpoint = get_hls_response['DataEndpoint']
            kvs_client = boto3.client("kinesis-video-archived-media",
                                endpoint_url=hls_endpoint,
                                region_name=REGION_NAME,
 aws_access_key_id=AWS_ACCESS_KEY_ID,
                                aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
            # 获取hls url
            res = get_hls_url(kvs_client,streamName,begTime,endTime,duration)
复制代码


Lambda 源码:

https://github.com/beiyue/save_devices_video_metadata/blob/main/lambda_function.py


Lambda 授权


IAM 创建 Policy 选择 JSON, 编辑内容如下, 保存 Policy 名称“getSecretValueForIPC”


{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "secretsmanager:GetSecretValue",
        "Resource": "<SECRET_ARN>"
    }
}
复制代码



选择 Lambda Configuration 编辑 Role



附加策略“getSecretValueForIPC”



附件策略“AmazonKinesisVideoStreamsReadOnlyAccess”



创建 DynamoDB 表


创建 DynamoDB 表,新建表名“tb_device_metadata”




元数据包含设备 ID,Kinesis Stream 名称,视频片段开始时间和结束时间,视频时长,写入时间



创建 API


在 Amazon API Gateway 中选择 ”Create API”,选择 ”REST API” Build, 选择 API 名称 “video_record_metadata” , Endpoint Type 选择“Regional”。



完成后,选择“Actions”,选择“Create Resource”资源名称为“ipc-video-metadata”



选择“Actions”,选择“Create Method”,选择 PUT,选择 Lambda Function, 勾选 Use Lambda Proxy integration,选型 Lambda Region “ap-southeast-1”,选择 Lambda Function“save_device_video_metadata”并保存



GET 方法同上


创建 API 模型


左侧选择 Models,创建 Model,模型名称“IPCMetaData”



Model shema 复制如下


{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "title": "IPCMetaData",
  "type": "object",
  "properties": {
    "clientID": { "type": "string" },
    "deviceID": { "type": "string" },
    "streamName": { "type": "string" },
    "begTime": { "type": "string" },
    "endTime": { "type": "string" },
    "duration": { "type": "string" }
    }
}
复制代码


PUT Method 增加请求模型


选择 Resources, 选择 PUT,选择 Method Request,选择 Request Body,增加 Model



部署 API


选择 Actions , 选择 Deploy API,Stage name 选择“test”, 选择 Deploy.



创建 JAVA SDK


在 test Stage Editor 选择 SDK Generation,选择 Platform,选择“Java SDK”. 输入参数如下:



点击 Generate SDK, 下载 genrate code.zip 文件,解压打开,打开 Terminal 执行 mvn install



Maven 安装本地仓库


mvn install:install-file -Dfile=./target/cloudstorage-demo-1.0.jar -DgroupId=ipc-sdk
–DartifactId=cloudstorage-demo -Dversion=1.0 -Dpackaging=jar
复制代码


pom.xml 中增加 dependency


<dependency>
    <groupId>ipc-sdk</groupId>
    <artifactId>cloudstorage-demo</artifactId>
    <version>1.0</version>
</dependency>
复制代码


创建推流端


下载 amazon-kinesis-video-streams-producer-sdk-java,修改 DemoAppMain.java


public static void main(final String[] args) {    try {        final KinesisVideoClient kinesisVideoClient = KinesisVideoJavaClientFactory                .createKinesisVideoClient(                        Regions.AP_SOUTHEAST_1, //指定新加坡区域                        AuthHelper.getSystemPropertiesCredentialsProvider());        //文件媒体源        final MediaSource mediaSource = createFileMediaSource();        kinesisVideoClient.registerMediaSource(mediaSource);        mediaSource.start();
    } catch (final KinesisVideoException e) {        throw new RuntimeException(e);    }}
复制代码


NativeKinesisVideoProducerStream.java 增加方法 putDurationMetaData,提交元数据,这里用到前面的 SDK,IPCMetaData 用于封装元数据


//记录一段视频片段的Metadataprivate void putDurationMetaData(@Nonnull final KinesisVideoFrame frame){
// 每一段视频以关键帧为起始
    if(startTime == 0L && frame.getFlags() == FrameFlags.FRAME_FLAG_KEY_FRAME){
        startTime = frame.getDecodingTs();
// 这里视频时长以20秒为单位进行记录
    }else if(frame.getFlags() == FrameFlags.FRAME_FLAG_KEY_FRAME && (frame.getDecodingTs() - startTime) > 20 * Time.HUNDREDS_OF_NANOS_IN_A_SECOND ){
        long index = frame.getIndex();
        long endTime = frame.getDecodingTs();
        long durationTime = (endTime - startTime)/Time.HUNDREDS_OF_NANOS_IN_A_SECOND;
        IPCCloudStorageSdk client = IPCCloudStorageSdk.builder().connectionConfiguration(
                new ConnectionConfiguration()
                        .maxConnections(100)
                        .connectionMaxIdleMillis(1000))
                .timeoutConfiguration(
                        new TimeoutConfiguration()
                                .httpRequestTimeout(5000)
                                .totalExecutionTimeout(10000)
                                .socketTimeout(3000))
                .build();    //API Gateway JAVA SDK 生成的Model类
        IPCMetaData metaData = new IPCMetaData();   //视频片段的唯一标示
metaData.setClientID(String.valueOf(Time.getCurrentTime())+String.format("%06d", index));    //设备唯一标示
        metaData.setDeviceID(mDeviceInfo.getName());
        //开始时间,毫秒为单位metaData.setBegTime(String.valueOf(startTime/Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND));    //流名称,对应KVS Stream Name
        metaData.setStreamName(mStreamInfo.getName());
        //结束时间, 毫秒为单位metaData.setEndTime(String.valueOf(endTime/Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND));    //时长,秒为单位
        metaData.setDuration(String.valueOf(durationTime));
        // API Gateway JAVA SDK 生成的方法请求类
        MetaDataInputRequest req = new MetaDataInputRequest().iPCMetaData(metaData);
        MetaDataInputResult result = client.metaDataInput(req);
        mLog.info("Duration Metadata : %s, %s, %s ,%s ,%s ,%s ",metaData.getClientID(),metaData.getDeviceID(),metaData.getStreamName(),metaData.getBegTime(),metaData.getEndTime(),metaData.getDuration());
        //重置开始时间,上一个视频结束时间为下一个视频的开始时间。
        startTime = frame.getDecodingTs();
    }
}
复制代码


推流源码:

https://github.com/beiyue/amazon-kinesis-video-streams-producer-sdk-java.git


推流测试



注意这里 AK/SK 方式仅用于测试和演示,实际生产环境建议使用更安全方式,比如通过 IoT 证书获取临时身份,参见https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/how-iot.html


Amazon DynamoDB 中可以看到生成了视频段的 metadata



获取播放 URL


从 Amazon DynamoDB 中随机选取一个 item 的客户端 ID,流名称,开始时间,结束时间, 时长,terminal 执行:


curl -v -X GET  'https://xxxxxx.execute-api.ap-southeast-1.amazonaws.com/test/ipc-video-metadata'  -d ' {  "clientID": "162369215xxxxxxxx52","streamName": "kvs-stream", "begTime":"1623728297399","endTime":"1623728321043","duration":"22"}'
*   Trying 52.221.1XX.1XX...
* TCP_NODELAY set
* Connected to xxxxxx.execute-api.ap-southeast-1.amazonaws.com (52.221.1XX.1XX) port 443 (#0)
* ALPN, offering h2
* ALPN, offering http/1.1
* Cipher selection: ALL:!EXPORT:!EXPORT40:!EXPORT56:!aNULL:!LOW:!RC4:@STRENGTH
* successfully set certificate verify locations:
*   CAfile: /etc/ssl/cert.pem
  CApath: none
* TLSv1.2 (OUT), TLS handshake, Client hello (1):
* TLSv1.2 (IN), TLS handshake, Server hello (2):
* TLSv1.2 (IN), TLS handshake, Certificate (11):
* TLSv1.2 (IN), TLS handshake, Server key exchange (12):
* TLSv1.2 (IN), TLS handshake, Server finished (14):
* TLSv1.2 (OUT), TLS handshake, Client key exchange (16):
* TLSv1.2 (OUT), TLS change cipher, Client hello (1):
* TLSv1.2 (OUT), TLS handshake, Finished (20):
* TLSv1.2 (IN), TLS change cipher, Client hello (1):
* TLSv1.2 (IN), TLS handshake, Finished (20):
* SSL connection using TLSv1.2 / ECDHE-RSA-AES128-GCM-SHA256
* ALPN, server accepted to use h2
* Server certificate:
*  subject: CN=*.execute-api.ap-southeast-1.amazonaws.com
*  start date: Aug 29 00:00:00 2020 GMT
*  expire date: Sep 29 12:00:00 2021 GMT
*  subjectAltName: host " xxxxxx.execute-api.ap-southeast-1.amazonaws.com" matched cert's "*.execute-api.ap-southeast-1.amazonaws.com"
*  issuer: C=US; O=Amazon; OU=Server CA 1B; CN=Amazon
*  SSL certificate verify ok.
* Using HTTP2, server supports multi-use
* Connection state changed (HTTP/2 confirmed)
* Copying HTTP/2 data in stream buffer to connection buffer after upgrade: len=0
* Using Stream ID: 1 (easy handle 0x7f80e8005600)
> GET /test/ipc-video-metadata HTTP/2
> Host: xxxxxx.execute-api.ap-southeast-1.amazonaws.com
> User-Agent: curl/7.54.0
> Accept: */*
> content-type: application/json
> day: Thursday
> Content-Length: 138
>
* Connection state changed (MAX_CONCURRENT_STREAMS updated)!
* We are completely uploaded and fine
< HTTP/2 200
< date: Tue, 15 Jun 2021 08:00:11 GMT
< content-type: application/json
< content-length: 261
< x-amzn-requestid: 9323XXX-4ff1-4dcf-ae57-365551b74636
< x-amz-apigw-id: A9Oh0FJWXXXX_Q=
< x-amzn-trace-id: Root=1-60c85e0b-01f095137c54211e74c3e1f3;Sampled=0
<
* Connection #0 to host xxxxxx.execute-api.ap-southeast-1.amazonaws.com left intact
"https://b-xxxxxx.kinesisvideo.ap-southeast-1.amazonaws.com/hls/v1/getHLSMasterPlaylist.m3u8?SessionToken=XXXXdf9Ro85h4AeC_n2yjc96_YLu1vigKX5qterUpPzxIQYibs4go4_KCqXEiWnXXXXWng92QY5HiGbEIkSa38f9d3XXXXX:~ "
复制代码


打开https://www.hlsplayer.org/填入 URL 地址



播放展示



小结


本方案展示了 IP Camera 运行 Amazon Kinesis Video Streams 实现推流功能;运行 Amazon API Gateway + Amazon Lambda + Amazon DynamoDB 留存视频的 metadata,根据 metadata 获取指定时段的视频片段的 session url, 播放器可正常播放。很多客户希望构建云存场景,结合这几个服务一起构建完整的云存解决方案。


本篇作者


周晓明

亚马逊云科技解决方案架构师

负责基于亚马逊云科技的云计算方案架构的咨询和设计,同时致力于物联网方向研究和推广,在安防监控领域有丰富实践经验。


用户头像

还未添加个人签名 2019.09.17 加入

还未添加个人简介

评论

发布
暂无评论
手把手教程 | 基于Amazon Kinesis Video Streams实现IP Camera云存项目