写点什么

如何透过 Serverless 与 API 的方式异步搜寻数据湖中的数据

  • 2022 年 5 月 22 日
  • 本文字数:2135 字

    阅读完需:约 7 分钟

背景介绍

为了要解决数据孤岛的问题 (Data Silos),我们需要建立一个集中式的数据湖,更进一步在上面满足各式各样的数据分析与机器学习相关的需求。而构建数据湖的通常可以分成以下几个步骤,设定 Amazon S3 作为数据湖。然后开始摄取数据到 Amazon S3,开始处理并产生数据的元数据,有了这些元数据我们就可以用 Amazon Athena 来搜寻这些资料了。


客户在建立数据平台后遇到会遇到一个挑战是如何透过 API 的方式,让使用者可以搜寻数据平台上的资料,Athena 是我们最常使用的服务。但是对于资料的使用者而言,我们可以透过异步的方式呼叫 Athena API 但是需要一直来问该次搜寻是否完成,GetQueryResults API 也有 1000 笔的限制。本文将讲述基于整合亚马逊云科技 serverless 服务的方式,实作出由使用者在发送请求时,提供一个 callback URL,然后透过 Amazon Step Functions 控制流程,Amazon Athena 完成搜寻后,把搜寻结果回传给使用端,降低使用端需要一直等待并询问的工作。

解决方案

Github aws-sample https://github.com/aws-samples 提供了本篇部落格的实作范例,可见于 asynchronously-query-data-lake-through-serverless-service。



数据湖可以透过 API Gateway 配合预先约定好的 json 请求格式,包含以下资讯:


{  "Query": "<your_sql_query>",  "CallbackUrl": "<your_api_endpoint>",}
复制代码

左滑查看更多


命名上采用与 Athena API 一致的大写开头,中间用大写分隔的做法。在 API Gateway 中我们会去 Mapping object 成以下格式,并呼叫 Step Function 开始执行工作流程服务:


{  "ApiRequest": {    "Query": "<your_sql_query>",    "CallbackUrl": "<your_api_endpoint>"  }}
复制代码

左滑查看更多


首先我们需要设定 InputPath: $.ApiRequest,过滤出 ApiRequest 内的 json 物件,然后我们可以开始构建所传送给 Athena Start Query Execution API,的 json 物件如下:


{  "QueryString.$": "$.Query",  "WorkGroup": "<your_work_group>",  "ResultConfiguration": {    "OutputLocation.$": "<your_output_location>"  }}
复制代码

左滑查看更多


Athena Start Query Execution API 會回傳 QueryExecutionId,我們會透過 ResultPath: $.AthenaRequest 合併回傳的結果,到 AthenaRequest 的欄位,此時我們 output 結果如下:


{  "ApiRequest": {    "Query": "<your_sql_query>",    "CallbackUrl": "<your_api_endpoint>"  },  "AthenaRequest": {    "QueryExecutionId": "<execution_id>"  }}
复制代码

左滑查看更多


接下来我们会等候 15 秒,然后用 InputPath: $.AthenaRequest,开始构建呼叫 Athena Get Query Execution API 所需参数如下:


{  "QueryExecutionId.$": "$.QueryExecutionId"}
复制代码

左滑查看更多


回传结果,我们透过 ResultPath: $.AthenaResult 合并回传的结果,到 AthenaResult 的栏位,此时我们 output 结果如下:


{  "ApiRequest": {    "Query": "<your_sql_query>",    "CallbackUrl": "<your_api_endpoint>"  },  "AthenaRequest": {    "QueryExecutionId": "<execution_id>"  },  "AthenaResult": {    "QueryExecution": {       "Status": {          "State": "<QUEUED | RUNNING | SUCCEEDED | FAILED | CANCELLED>",          ...       },       ...    }  } }
复制代码

左滑查看更多


下一步我们透过 Choice 流程,判断 AthenaResult.QueryExecution.Status.State 栏位如结果为 SUCCEEDED | FAILED | CANCELLED 则把结果送入 Lambda 作为 payload,利用 Lambda 产生 S3 Presigned URL,并呼叫使用者提供的 Callback URL 回传下列资讯。


{  "State": {    "CompletionDateTime": 1645062487295,    "State": "SUCCEEDED | FAILED | CANCELLED",    "SubmissionDateTime": 1645062485193  }  "QueryResult": {    "PresignedUrl": "https://xxxx/query-results-path/yyy.csv",    "ExpiredIn": 3600  }}
复制代码

左滑查看更多

总结

API Gateway 部分,我们简化了前端使用者需要传入的参数,并开始建构所需要的独立物件为 Step Function 的执行做准备。Step Function 中间传递参数过程,我们尽量用独立的 object 来储存我们关注的资讯,避免参数彼此互相覆盖而遗失后面的工作流程所关注的参数。最后 Amazon Lambda 来处理完成的资料,这边是使用 Presigned URL 的方式而非 API 直接回传结果的原因是,我们需要回传的资料可能超过 Payload 上限,而使用者 Presigned URL 的方式会有 ExpiredIn 所以我们也会告诉前端使用者,需要去做这一块的确认。


透过上述方式,我们可以透过亚马逊云科技服务整合透过 serverless 的方式,实作出由使用者在发送请求时,提供一个 callback URL 然后,透过 Step Function 控制流程,Amazon Athena 完成搜寻后,把搜寻结果回传给使用端,降低使用端需要一直等待并询问的工作。


本篇作者



王祥瑞


亚马逊云科技解决方案架构师


主要基于亚马逊云科技的云计算方案架构的咨询和设计,研究在云上构建现代解决方案与微服务架构设计,尤其是在大数据和机器学习方面。通过全部 12 张的亚马逊云科技认证,扩展技术的深度与广度。



李定远


亚马逊云科技解决方案架构师


负责亚马逊云科技无服务器计算相关的服务。对 Amazon Lambda, Amazon API Gateway 等相关产品有深入了解。



用户头像

还未添加个人签名 2019.09.17 加入

还未添加个人简介

评论

发布
暂无评论
如何透过 Serverless 与 API 的方式异步搜寻数据湖中的数据_Serverless_亚马逊云科技 (Amazon Web Services)_InfoQ写作社区