时空碰撞系列·终
我决定放弃所有花里胡哨的发言,就简单直接的上干货吧
①业务场景:
1.需要计算时间和空间都在一定范围内的数据对,数据来源可能有多种类型,人、车、码等
2.单机计算,数据量上亿
②当前方案:
拆分join流程,把三个维度拆成2+1,先用in逻辑过滤一部分数据,再扩充一次,再join
③当前痛点:
1.步骤过于繁琐,会重复调用全量数据
2.join的时候是按时空块join的,按块join会导致每个join成功的块中数据量较大,集群版无所谓,单机版计算上亿数据量shuffle会比较吃力
④预期目标
1.减小计算量
2.减小shuffle内存
⑤优化思路
1.审视数据来源。数据来源于各种固定的采集设备。如果数据来源是卫星或者移动的采集设备,经纬度会存在无法确定个数量,但固定设备的情况下,所有输出的经纬度即为设备本身的经纬度。因此,我们能确定哪些设备之间的距离本身就在我们的要求范围内。
2.抽出核心计算。此前的计算我们都是对经纬度和时间进行全量计算,扩充的时候全字段会浪费很多内存,基于上一步我决定只使用经纬度对应的字段进行一次计算,得到一张虚拟设备关联表,这张表记录的是空间距离信息;再对时间进行全量计算,得到时间距离信息,最终通过时空距离信息join来得到最终的时空碰撞结果
⑥优化方案
test:原表,存储需要计算的数据
时间间隔10S
空间间隔100米
①初始数据处理,构建各种需要的字段
with event as (selectid,type,time,time_part,lat,lon,lat_part,lon_part,locationfrom( select id, type, time, floor(unix_timestamp(time)/10) as time_part, lat, lon, floor(lat/0.001) as lat_part, floor(lon/0.001) as lon_part, location, row_number() over(partition by id,time,location order by time) as rn from test where type in ('man','woman') ) totalwhere rn=1)
②取经纬度字段,按照数据来源和经纬度去重
selecttype,location,lat_part,lon_part,lat,lonfrom ( select type, location, lat_part, lon_part, lat, lon, row_number() over(partition by type,location order by lat) as rn from event )location_originalwhere rn=1
③基于空间配对,计算空间距离在指定范围内的经纬度对,然后基于这些对构建一张虚拟设备关联表(即把配对的经纬度视作同一个虚拟摄像头,然后按照虚拟摄像头来分组计算时间距离)
with source_locations as(selectlocation,ad.lat_part as lat_part,ad.lon_part as lon_part,lat,lonfrom ( select location, lat_part, lon_part, lat, lon from locations where type='man' ) source_location_originallateral view explode(split(concat(lat_part-1,',',lat_part,',',lat_part+1),',')) adas lat_partlateral view explode(split(concat(lon_part-1,',',lon_part,',',lon_part+1),',')) adas lon_part),target_locations as (selectlocation,cast(lat_part as string) as lat_part,cast(lon_part as string) as lon_part,lat,lonfrom locationswhere type='women' and lat_part is not null and lat_part!=0),relation as (selectsource_location,target_locationfrom ( select round(6378138*2*asin(sqrt(pow(sin((source_locations.lat*pi()/180-target_locations.lat*pi()/180)/2),2)+cos(source_locations.lat*pi()/180)*cos(target_locations.lat*pi()/180)* pow(sin((source_locations.lon*pi()/180-target_locations.lon*pi()/180)/2),2)))) as diff, source_locations.location source_location, target_locations.location target_location from source_locations join target_locations on source_locations.lat_part=target_locations.lat_part and source_locations.lon_part=target_locations.lon_part )site_diffwhere diff<='100')
④基于时间配对,计算,取指定时间范围内的数据对,和虚拟设备关联表join,即为指定时空范围内的数据
with source as (selectid,time,ad.time_part as time_part,locationfrom ( select id, time, time_part, location from event where data_type='man' ) source_originallateral view explode(split(concat(time_part-1,',',time_part,',',time_part+1),',')) adas time_part),target as (selectid,time,cast(time_part as string) as time_part,locationfrom event_pre where data_type='women'),result as (selectsource_id,target_id,from (select abs(unix_timestamp(source.time)-unix_timestamp(target.time)) as time_diff, source.id as source_id, target.id as target_id from source join target join relation on source.time_part=target.time_part and source.location=relation.source_location and target.location=relation.target_location ) diffwhere time_diff<='10')
最终代码
with event as (selectid,type,time,time_part,lat,lon,lat_part,lon_part,locationfrom( select id, type, time, floor(unix_timestamp(time)/10) as time_part, lat, lon, floor(lat/0.001) as lat_part, floor(lon/0.001) as lon_part, location, row_number() over(partition by id,time,location order by time) as rn from test where type in ('man','woman') ) totalwhere rn=1),locations as (selecttype,location,lat_part,lon_part,lat,lonfrom ( select type, location, lat_part, lon_part, lat, lon, row_number() over(partition by type,location order by lat) as rn from event )location_originalwhere rn=1),source_locations as(selectlocation,ad.lat_part as lat_part,ad.lon_part as lon_part,lat,lonfrom ( select location, lat_part, lon_part, lat, lon from locations where type='man' ) source_location_originallateral view explode(split(concat(lat_part-1,',',lat_part,',',lat_part+1),',')) adas lat_partlateral view explode(split(concat(lon_part-1,',',lon_part,',',lon_part+1),',')) adas lon_part),target_locations as (selectlocation,cast(lat_part as string) as lat_part,cast(lon_part as string) as lon_part,lat,lonfrom locationswhere type='women' and lat_part is not null and lat_part!=0),relation as (selectsource_location,target_locationfrom ( select round(6378138*2*asin(sqrt(pow(sin((source_locations.lat*pi()/180-target_locations.lat*pi()/180)/2),2)+cos(source_locations.lat*pi()/180)*cos(target_locations.lat*pi()/180)* pow(sin((source_locations.lon*pi()/180-target_locations.lon*pi()/180)/2),2)))) as diff, source_locations.location source_location, target_locations.location target_location from source_locations join target_locations on source_locations.lat_part=target_locations.lat_part and source_locations.lon_part=target_locations.lon_part )site_diffwhere diff<='100'),source as (selectid,time,ad.time_part as time_part,locationfrom ( select id, time, time_part, location from event where data_type='man' ) source_originallateral view explode(split(concat(time_part-1,',',time_part,',',time_part+1),',')) adas time_part),target as (selectid,time,cast(time_part as string) as time_part,locationfrom event_pre where data_type='women')selectsource_id,target_id,from (select abs(unix_timestamp(source.time)-unix_timestamp(target.time)) as time_diff, source.id as source_id, target.id as target_id from source join target join relation on source.time_part=target.time_part and source.location=relation.source_location and target.location=relation.target_location ) diffwhere time_diff<='10'
版权声明: 本文为 InfoQ 作者【誓约·追光者】的原创文章。
原文链接:【http://xie.infoq.cn/article/7fe622e5eebf5c253af5a794b】。文章转载请联系作者。
誓约·追光者
还未添加个人签名 2020.08.11 加入
还未添加个人简介
评论