近年来市场上 IP Camera 产品价格持续走低,硬件利润单薄,很多厂商通过增值服务发力,增值服务比较成熟的业务形态是云端存储,其实现方式是设备检测报警事件触发一段视频上传云端并提供终端用户回看从而产生收费。Amazon Kinesis Video Streams 实现云存业务优势明显,提供 Device SDK 摄取视频并上传 Endpoint,终端用户可以基于 HLS/DASH 主流的协议播放观看。
本案例基于 Serverless 方式实现云存方案,Devices 推流过程中记录 metadata,通过 Amazon API Gateway,Amazon Lambda,Amazon DynamoDB 留存 video metadata,前端应用通过 video metadata 生成播放 URL,通过 HLS Player 播放。
📢 想要了解更多亚马逊云科技最新技术发布和实践创新,敬请关注在上海、北京、深圳三地举办的 2021 亚马逊云科技中国峰会!点击图片报名吧~
准备工作
架构图
创建 Amazon Kinesis Video Streams
登录 console,区域选择新加坡, 选择 Kinesis Video Streams 服务,创建视频流名称“kvs-stream”, Data retention 选择 7(7 天云存)
创建访问密钥
新建一个用户”CloudStorageUser”, 勾选“Programmatic access”
增加 2 个权限如下图,这里仅用于测试和演示,生产环境建议最小化权限。
拷贝并保存 Access Key ID 和 Secret Access Key,选择 Amazon Secrets Manager 服务添加密钥,新增 Secret key/value, 将 AK/SK 填入
指定密钥名称“ipc-cloudstorage-access-kvs-secretkey”
拷贝 Python3 的 sample code, 后续 Lambda Function 中使用。
创建 Lambda
创建 function 选择名称“save_devices_video_metadata”,Runtime 选择 Python 3.6
HttpMethod 为 PUT,接收 metadata 并存表
if event['httpMethod'] == 'PUT' :
id = _body['clientID']
deviceID = _body['deviceID']
streamName = _body['streamName']
begTime = _body['begTime']
endTime = _body['endTime']
duration = _body['duration']
res = save_dynamodb_tb(id,deviceID,begTime,endTime,duration,streamName)
复制代码
HTTPMethod 为 GET,获取 HLS URL
if event['httpMethod'] == 'GET' :
streamName = _body['streamName']
begTime = _body['begTime']
endTime = _body['endTime']
duration = _body['duration']
descStream =kvs.describe_stream(StreamName=streamName)
Stream_ARN = descStream['StreamInfo']['StreamARN']
get_hls_response =
kvs.get_data_endpoint(APIName="GET_HLS_STREAMING_SESSION_URL",StreamARN=Stream_ARN)
hls_endpoint = get_hls_response['DataEndpoint']
kvs_client = boto3.client("kinesis-video-archived-media",
endpoint_url=hls_endpoint,
region_name=REGION_NAME,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
# 获取hls url
res = get_hls_url(kvs_client,streamName,begTime,endTime,duration)
复制代码
Lambda 源码:
https://github.com/beiyue/save_devices_video_metadata/blob/main/lambda_function.py
Lambda 授权
IAM 创建 Policy 选择 JSON, 编辑内容如下, 保存 Policy 名称“getSecretValueForIPC”
{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": "<SECRET_ARN>"
}
}
复制代码
选择 Lambda Configuration 编辑 Role
附加策略“getSecretValueForIPC”
附件策略“AmazonKinesisVideoStreamsReadOnlyAccess”
创建 DynamoDB 表
创建 DynamoDB 表,新建表名“tb_device_metadata”
元数据包含设备 ID,Kinesis Stream 名称,视频片段开始时间和结束时间,视频时长,写入时间
创建 API
在 Amazon API Gateway 中选择 ”Create API”,选择 ”REST API” Build, 选择 API 名称 “video_record_metadata” , Endpoint Type 选择“Regional”。
完成后,选择“Actions”,选择“Create Resource”资源名称为“ipc-video-metadata”
选择“Actions”,选择“Create Method”,选择 PUT,选择 Lambda Function, 勾选 Use Lambda Proxy integration,选型 Lambda Region “ap-southeast-1”,选择 Lambda Function“save_device_video_metadata”并保存
GET 方法同上
创建 API 模型
左侧选择 Models,创建 Model,模型名称“IPCMetaData”
Model shema 复制如下
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "IPCMetaData",
"type": "object",
"properties": {
"clientID": { "type": "string" },
"deviceID": { "type": "string" },
"streamName": { "type": "string" },
"begTime": { "type": "string" },
"endTime": { "type": "string" },
"duration": { "type": "string" }
}
}
复制代码
PUT Method 增加请求模型
选择 Resources, 选择 PUT,选择 Method Request,选择 Request Body,增加 Model
部署 API
选择 Actions , 选择 Deploy API,Stage name 选择“test”, 选择 Deploy.
创建 JAVA SDK
在 test Stage Editor 选择 SDK Generation,选择 Platform,选择“Java SDK”. 输入参数如下:
点击 Generate SDK, 下载 genrate code.zip 文件,解压打开,打开 Terminal 执行 mvn install
Maven 安装本地仓库
mvn install:install-file -Dfile=./target/cloudstorage-demo-1.0.jar -DgroupId=ipc-sdk
–DartifactId=cloudstorage-demo -Dversion=1.0 -Dpackaging=jar
复制代码
pom.xml 中增加 dependency
<dependency>
<groupId>ipc-sdk</groupId>
<artifactId>cloudstorage-demo</artifactId>
<version>1.0</version>
</dependency>
复制代码
创建推流端
下载 amazon-kinesis-video-streams-producer-sdk-java,修改 DemoAppMain.java
public static void main(final String[] args) {
try {
final KinesisVideoClient kinesisVideoClient = KinesisVideoJavaClientFactory
.createKinesisVideoClient(
Regions.AP_SOUTHEAST_1, //指定新加坡区域
AuthHelper.getSystemPropertiesCredentialsProvider());
//文件媒体源
final MediaSource mediaSource = createFileMediaSource();
kinesisVideoClient.registerMediaSource(mediaSource);
mediaSource.start();
} catch (final KinesisVideoException e) {
throw new RuntimeException(e);
}
}
复制代码
NativeKinesisVideoProducerStream.java 增加方法 putDurationMetaData,提交元数据,这里用到前面的 SDK,IPCMetaData 用于封装元数据
//记录一段视频片段的Metadata
private void putDurationMetaData(@Nonnull final KinesisVideoFrame frame){
// 每一段视频以关键帧为起始
if(startTime == 0L && frame.getFlags() == FrameFlags.FRAME_FLAG_KEY_FRAME){
startTime = frame.getDecodingTs();
// 这里视频时长以20秒为单位进行记录
}else if(frame.getFlags() == FrameFlags.FRAME_FLAG_KEY_FRAME && (frame.getDecodingTs() - startTime) > 20 * Time.HUNDREDS_OF_NANOS_IN_A_SECOND ){
long index = frame.getIndex();
long endTime = frame.getDecodingTs();
long durationTime = (endTime - startTime)/Time.HUNDREDS_OF_NANOS_IN_A_SECOND;
IPCCloudStorageSdk client = IPCCloudStorageSdk.builder().connectionConfiguration(
new ConnectionConfiguration()
.maxConnections(100)
.connectionMaxIdleMillis(1000))
.timeoutConfiguration(
new TimeoutConfiguration()
.httpRequestTimeout(5000)
.totalExecutionTimeout(10000)
.socketTimeout(3000))
.build();
//API Gateway JAVA SDK 生成的Model类
IPCMetaData metaData = new IPCMetaData();
//视频片段的唯一标示
metaData.setClientID(String.valueOf(Time.getCurrentTime())+String.format("%06d", index));
//设备唯一标示
metaData.setDeviceID(mDeviceInfo.getName());
//开始时间,毫秒为单位
metaData.setBegTime(String.valueOf(startTime/Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
//流名称,对应KVS Stream Name
metaData.setStreamName(mStreamInfo.getName());
//结束时间, 毫秒为单位
metaData.setEndTime(String.valueOf(endTime/Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
//时长,秒为单位
metaData.setDuration(String.valueOf(durationTime));
// API Gateway JAVA SDK 生成的方法请求类
MetaDataInputRequest req = new MetaDataInputRequest().iPCMetaData(metaData);
MetaDataInputResult result = client.metaDataInput(req);
mLog.info("Duration Metadata : %s, %s, %s ,%s ,%s ,%s ",metaData.getClientID(),metaData.getDeviceID(),metaData.getStreamName(),metaData.getBegTime(),metaData.getEndTime(),metaData.getDuration());
//重置开始时间,上一个视频结束时间为下一个视频的开始时间。
startTime = frame.getDecodingTs();
}
}
复制代码
推流源码:
https://github.com/beiyue/amazon-kinesis-video-streams-producer-sdk-java.git
推流测试
注意这里 AK/SK 方式仅用于测试和演示,实际生产环境建议使用更安全方式,比如通过 IoT 证书获取临时身份,参见https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/how-iot.html
Amazon DynamoDB 中可以看到生成了视频段的 metadata
获取播放 URL
从 Amazon DynamoDB 中随机选取一个 item 的客户端 ID,流名称,开始时间,结束时间, 时长,terminal 执行:
curl -v -X GET 'https://xxxxxx.execute-api.ap-southeast-1.amazonaws.com/test/ipc-video-metadata' -d ' { "clientID": "162369215xxxxxxxx52","streamName": "kvs-stream", "begTime":"1623728297399","endTime":"1623728321043","duration":"22"}'
* Trying 52.221.1XX.1XX...
* TCP_NODELAY set
* Connected to xxxxxx.execute-api.ap-southeast-1.amazonaws.com (52.221.1XX.1XX) port 443 (#0)
* ALPN, offering h2
* ALPN, offering http/1.1
* Cipher selection: ALL:!EXPORT:!EXPORT40:!EXPORT56:!aNULL:!LOW:!RC4:@STRENGTH
* successfully set certificate verify locations:
* CAfile: /etc/ssl/cert.pem
CApath: none
* TLSv1.2 (OUT), TLS handshake, Client hello (1):
* TLSv1.2 (IN), TLS handshake, Server hello (2):
* TLSv1.2 (IN), TLS handshake, Certificate (11):
* TLSv1.2 (IN), TLS handshake, Server key exchange (12):
* TLSv1.2 (IN), TLS handshake, Server finished (14):
* TLSv1.2 (OUT), TLS handshake, Client key exchange (16):
* TLSv1.2 (OUT), TLS change cipher, Client hello (1):
* TLSv1.2 (OUT), TLS handshake, Finished (20):
* TLSv1.2 (IN), TLS change cipher, Client hello (1):
* TLSv1.2 (IN), TLS handshake, Finished (20):
* SSL connection using TLSv1.2 / ECDHE-RSA-AES128-GCM-SHA256
* ALPN, server accepted to use h2
* Server certificate:
* subject: CN=*.execute-api.ap-southeast-1.amazonaws.com
* start date: Aug 29 00:00:00 2020 GMT
* expire date: Sep 29 12:00:00 2021 GMT
* subjectAltName: host " xxxxxx.execute-api.ap-southeast-1.amazonaws.com" matched cert's "*.execute-api.ap-southeast-1.amazonaws.com"
* issuer: C=US; O=Amazon; OU=Server CA 1B; CN=Amazon
* SSL certificate verify ok.
* Using HTTP2, server supports multi-use
* Connection state changed (HTTP/2 confirmed)
* Copying HTTP/2 data in stream buffer to connection buffer after upgrade: len=0
* Using Stream ID: 1 (easy handle 0x7f80e8005600)
> GET /test/ipc-video-metadata HTTP/2
> Host: xxxxxx.execute-api.ap-southeast-1.amazonaws.com
> User-Agent: curl/7.54.0
> Accept: */*
> content-type: application/json
> day: Thursday
> Content-Length: 138
>
* Connection state changed (MAX_CONCURRENT_STREAMS updated)!
* We are completely uploaded and fine
< HTTP/2 200
< date: Tue, 15 Jun 2021 08:00:11 GMT
< content-type: application/json
< content-length: 261
< x-amzn-requestid: 9323XXX-4ff1-4dcf-ae57-365551b74636
< x-amz-apigw-id: A9Oh0FJWXXXX_Q=
< x-amzn-trace-id: Root=1-60c85e0b-01f095137c54211e74c3e1f3;Sampled=0
<
* Connection #0 to host xxxxxx.execute-api.ap-southeast-1.amazonaws.com left intact
"https://b-xxxxxx.kinesisvideo.ap-southeast-1.amazonaws.com/hls/v1/getHLSMasterPlaylist.m3u8?SessionToken=XXXXdf9Ro85h4AeC_n2yjc96_YLu1vigKX5qterUpPzxIQYibs4go4_KCqXEiWnXXXXWng92QY5HiGbEIkSa38f9d3XXXXX:~ "
复制代码
打开https://www.hlsplayer.org/填入 URL 地址
播放展示
小结
本方案展示了 IP Camera 运行 Amazon Kinesis Video Streams 实现推流功能;运行 Amazon API Gateway + Amazon Lambda + Amazon DynamoDB 留存视频的 metadata,根据 metadata 获取指定时段的视频片段的 session url, 播放器可正常播放。很多客户希望构建云存场景,结合这几个服务一起构建完整的云存解决方案。
本篇作者
周晓明
亚马逊云科技解决方案架构师
负责基于亚马逊云科技的云计算方案架构的咨询和设计,同时致力于物联网方向研究和推广,在安防监控领域有丰富实践经验。
评论