在Spark中通过YDB实现比原生Spark性能高100倍的多表关联

Spark中通过YDB实现比原生Spark性能高100倍的多表关联

多表关联JoinSpark数据分析中是一个不可或缺的一部,我们以商品交易记录表(trade表)与用户信息表为例(user表)来阐述下如何实现高性能的多表关联分析。

经常会遇到这种情形,我们需要先找出【某一个省份】【工商银行】【交易金额在150~160元】的所有用户,并通过与用户信息表进行关联得到该用户的手机号,性别,年龄,职业等信息。

Spark的传统做法是对双表都进行暴力扫描,需要对trade表与user表都进行一次完整的扫描,根据扫描出来的结果在进行进一步的关联。用户信息表(User)的数据量通常来说也是非常大的,运营商,阿里,腾讯都会有达数十亿条,数万维度的用户信息表,Spark默认方式的为将User表全部读取后在过滤的方式,性能是很慢的,我们可以换个方式,将这个过程对调一下,先进行过滤,在对过滤后的结果进行扫描,经过过滤后,需要处理的数据条数,是原先总量的万分之一或者千分之一,性能自然就会得到数百倍的提升。

其实这种方式大家并不陌生,传统数据库如MySQL在做多表关联的时候,对关联的字段创建索引与不创建索引关联的效率相差很多,主要原因也是因为创建索引后,就不需要在对用户表,进行全表扫描了,仅仅读取需要的数据,关联效率自然就高了。

YDB中,针对这种大表与小表的关联,可以通过广播将小表的数据广播到每个节点上去,与原生spark不同的是,广播到每个节点后,广播数据是直接基于索引进行关联,而不是采用暴力遍历的方式,这样性能就会有较大幅度的提升。

 

 

下文为关于ydb进行join的使用示例:

示例一:

原本写法

with
shu as
(
/*ydb.pushdown('->')*/
select usernick,count(*) as scnt,avg(amtdouble) as samt from ydb_example_shu where ydbpartion='3000w'  group by usernick
/*('<-')pushdown.ydb*/
),
trade as (
/*ydb.pushdown('->')*/
select nickname,count(*) as tcnt,avg(amt) as tamt from ydb_example_trade where ydbpartion='20151011'  and nickname='*'  group by nickname
/*('<-')pushdown.ydb*/
)

select * from (
select  shu.usernick as n1,shu.scnt  as n2,shu.samt  as n3,trade.nickname  as n4,trade.tamt,trade.tcnt
from shu LEFT JOIN  trade on (shu.usernick=trade.nickname)
) tmp where tmp.n4 is not null

order by tmp.n1  limit 100;

采用广播后的写法

YBroadCastBlock@0000000001:10240:over_all@
select  distinct nickname from (
/*ydb.pushdown('->')*/
select nickname  from ydb_example_trade where ydbpartion='20151011'  and nickname='*'
/*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

with
shu as
(
/*ydb.pushdown('->')*/
select usernick,count(*) as scnt,avg(amtdouble) as samt from ydb_example_shu where ydbpartion='3000w'
and ydb_raw_query_s like 'YBroadCastQuery@term@usernick@0@{YBroadCastId:0000000001}'
group by usernick
/*('<-')pushdown.ydb*/
),
trade as (
/*ydb.pushdown('->')*/
select nickname,count(*) as tcnt,avg(amt) as tamt from ydb_example_trade where ydbpartion='20151011'  and nickname='*'
and ydb_raw_query_s like 'YBroadCastQuery@term@nickname@0@{YBroadCastId:0000000001}'
group by nickname
/*('<-')pushdown.ydb*/
)

select * from (
select  shu.usernick as n1,shu.scnt  as n2,shu.samt  as n3,trade.nickname  as n4,trade.tamt,trade.tcnt
from shu LEFT JOIN  trade on (shu.usernick=trade.nickname)
) tmp where tmp.n4 is not null

order by tmp.n1  limit 100;

 

示例二:

原本写法

with
shu as
(
/*ydb.pushdown('->')*/
select amtlong,usernick,phonenum,ydb_sex,ydb_grade from ydb_example_shu where ydbpartion='3000w'
/*('<-')pushdown.ydb*/
),
trade as (
/*ydb.pushdown('->')*/
select max(amt) as maxamt,avg(amt) as avgamt, min(amt) as minamt,nickname,count(*) as tcnt from ydb_example_trade where ydbpartion='20151011'  and nickname='*'  group by nickname
/*('<-')pushdown.ydb*/
)

select * from (
select  shu.amtlong as n1,shu.phonenum  as n2,shu.ydb_sex  as n3,shu.ydb_grade,trade.maxamt  as n4,trade.tcnt,shu.usernick n5,trade.nickname
from shu LEFT JOIN  trade on ( shu.usernick=trade.nickname and shu.amtlong>=trade.minamt and shu.amtlong<=trade.maxamt)
) tmp where tmp.n4 is not null

order by tmp.n1,tmp.n5  limit 100;

 

采用广播后的写法

YBroadCastBlock@0000000001:10240:over_all@

select cast (tmp.maxamt as bigint) as c0, tmp.avgamt as c1, cast (tmp.minamt as bigint) as c2,tmp.nickname as c3,tmp.tcnt  from (
/*ydb.pushdown('->')*/
select max(amt) as maxamt,avg(amt) as avgamt, min(amt) as minamt,nickname,count(*) as tcnt from ydb_example_trade where ydbpartion='20151011'  and nickname='*'  group by nickname
/*('<-')pushdown.ydb*/

) tmp
@YBroadCastBlock

with
shu as
(
/*ydb.pushdown('->')*/
select amtlong,usernick,phonenum,ydb_sex,ydb_grade from ydb_example_shu where ydbpartion='3000w'
and ydb_raw_query_s like 'YBroadCastQuery@query@usernick:{YBroadCastparam} AND amtlong:[{YBroadCastparam} TO {YBroadCastparam}]@3,2,0@{YBroadCastId:0000000001}'

/*('<-')pushdown.ydb*/
),
trade as (
/*ydb.pushdown('->')*/
select max(amt) as maxamt,avg(amt) as avgamt, min(amt) as minamt,nickname,count(*) as tcnt from ydb_example_trade where ydbpartion='20151011'  and nickname='*'  group by nickname
/*('<-')pushdown.ydb*/
)

select * from (
select  shu.amtlong as n1,shu.phonenum  as n2,shu.ydb_sex  as n3,shu.ydb_grade,trade.maxamt  as n4,trade.tcnt,shu.usernick n5,trade.nickname
from shu LEFT JOIN  trade on ( shu.usernick=trade.nickname and shu.amtlong>=trade.minamt and shu.amtlong<=trade.maxamt)
) tmp where tmp.n4 is not null

order by tmp.n1,tmp.n5  limit 100;

示例三,巧用over_all 实现left semi join exists in 查询

----原本写法
with
shu as
(
/*ydb.pushdown('->')*/
select usernick,count(*) as scnt,avg(amtdouble) as samt from ydb_example_shu where ydbpartion='3000w'  group by usernick
/*('<-')pushdown.ydb*/
),
trade as (

select  distinct nickname from (
/*ydb.pushdown('->')*/
select nickname  from ydb_example_trade where ydbpartion='20151011'  and nickname='*'        /*('<-')pushdown.ydb*/
) tmpdist
)

select  shu.usernick as n1,shu.scnt  as n2,shu.samt  as n3 from shu LEFT SEMI JOIN  trade on (shu.usernick=trade.nickname)
order by shu.usernick  limit 100;

----使用broadcast后的写法
YBroadCastBlock@0000000001:10240:over_all@
select  distinct nickname from (
/*ydb.pushdown('->')*/
select nickname  from ydb_example_trade where ydbpartion='20151011'  and nickname='*'
/*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

with
shu as
(
/*ydb.pushdown('->')*/
select usernick,count(*) as scnt,avg(amtdouble) as samt from ydb_example_shu where ydbpartion='3000w'
and ydb_raw_query_s like 'YBroadCastQuery@term@usernick@0@{YBroadCastId:0000000001}'
group by usernick
/*('<-')pushdown.ydb*/
),
trade as (
select  distinct nickname from (
/*ydb.pushdown('->')*/
select nickname  from ydb_example_trade where ydbpartion='20151011'  and nickname='*'
and ydb_raw_query_s like 'YBroadCastQuery@term@nickname@0@{YBroadCastId:0000000001}'
/*('<-')pushdown.ydb*/
) tmpdist
)

select  shu.usernick as n1,shu.scnt  as n2,shu.samt  as n3
from shu LEFT SEMI JOIN  trade on (shu.usernick=trade.nickname)

order by shu.usernick  limit 100;

---在来个复杂点的例子
with
shu as
(
/*ydb.pushdown('->')*/
select usernick,count(*) as scnt,avg(amtdouble) as samt from ydb_example_shu where ydbpartion='3000w' and amtlong >='10' and amtlong<='3000'  group by usernick
/*('<-')pushdown.ydb*/
),
trade as (

select  distinct nickname from (
/*ydb.pushdown('->')*/
select nickname  from ydb_example_trade where ydbpartion='20151011'  and nickname='*'        /*('<-')pushdown.ydb*/
) tmpdist
)

select  shu.usernick as n1,shu.scnt  as n2,shu.samt  as n3 from shu LEFT SEMI JOIN  trade on (shu.usernick=trade.nickname)
order by shu.usernick  limit 100;

新写法

YBroadCastBlock@0000000001:10240:over_all@
select  distinct nickname from (
/*ydb.pushdown('->')*/
select nickname  from ydb_example_trade where ydbpartion='20151011'  and nickname='*'
/*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

YBroadCastBlock@0000000002:10240:over_all@
select  distinct usernick from (
/*ydb.pushdown('->')*/
select usernick  from ydb_example_shu where ydbpartion='3000w'  and amtlong >='10' and amtlong<='3000'
/*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

with
shu as
(
/*ydb.pushdown('->')*/
select usernick,count(*) as scnt,avg(amtdouble) as samt from ydb_example_shu where ydbpartion='3000w' and amtlong >='10' and amtlong<='3000'
and ydb_raw_query_s like 'YBroadCastQuery@term@usernick@0@{YBroadCastId:0000000001}'
and ydb_raw_query_s like 'YBroadCastQuery@term@usernick@0@{YBroadCastId:0000000002}'

group by usernick
/*('<-')pushdown.ydb*/
),
trade as (
select  distinct nickname from (
/*ydb.pushdown('->')*/
select nickname  from ydb_example_trade where ydbpartion='20151011'  and nickname='*'
and ydb_raw_query_s like 'YBroadCastQuery@term@nickname@0@{YBroadCastId:0000000001}'
and ydb_raw_query_s like 'YBroadCastQuery@term@usernick@0@{YBroadCastId:0000000002}'

/*('<-')pushdown.ydb*/
) tmpdist
)

select  shu.usernick as n1,shu.scnt  as n2,shu.samt  as n3
from shu LEFT SEMI JOIN  trade on (shu.usernick=trade.nickname)

order by shu.usernick  limit 100;

示例四 巧用over_rand 实现车辆伴随,旅店同宿 查询

对于有严重数据倾斜的人员,进行抽取,通常场景汽车伴随与旅店同住超过千次同住不应该属于常规情况,随机取N个即可
YBroadCastBlock@0000000001:1024:over_rand@
select KKBH ,cast((JGSJ-600)  as bigint) as JGSJ_BEGIN,cast((JGSJ+600) as bigint) as JGSJ_END from (
/*ydb.pushdown('->')*/ select KKBH,JGSJ from ydb_oribit where ydbpartion = '20160619' and  HPHM='COW832'    /*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

select tmp.HPHM,count(*) as rows,size(collect_set(tmp.KKBH)) as dist_kkbh,concat_ws('#', sort_array(collect_set(concat_ws(',',tmp.JGSJ,tmp.KKBH)))) as detail
from (
/*ydb.pushdown('->')*/
select HPHM,JGSJ,KKBH from ydb_oribit where ydbpartion = '20160619' and
ydb_raw_query_s like 'YBroadCastQuery@query@KKBH:{YBroadCastparam} AND JGSJ:{{YBroadCastparam} TO {YBroadCastparam}}@0,1,2@{YBroadCastId:0000000001}'
/*('<-')pushdown.ydb*/
) tmp group by tmp.HPHM order by dist_kkbh desc  limit 10

示例五:基本使用方法

----示例一 超过1024条,会返回错误-----
YBroadCastBlock@0000000001:1024:over_error@
select distinct tradeid from (
/*ydb.pushdown('->')*/ select tradeid from ydb_trade_demo where ydbpartion='20151011' and (nickname='*' or nickname='*' or nickname='*' or nickname='*')    /*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

select nickname,count(*) from
(
/*ydb.pushdown('->')*/
select nickname from ydb_trade_demo where ydbpartion='20151011' and ydb_raw_query_s like 'YBroadCastQuery@term@tradeid@0@{YBroadCastId:0000000001}'
/*('<-')pushdown.ydb*/

) tmp group by nickname

----示例二 超过2048条,随机抽取其中的2048-----
YBroadCastBlock@0000000001:2048:over_rand@
select  distinct tradeid from (
/*ydb.pushdown('->')*/ select tradeid from ydb_trade_demo where ydbpartion='20151011' and (nickname='*' or nickname='*' or nickname='*' or nickname='*')    /*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

/*ydb.pushdown('->')*/
select count(*) from ydb_trade_demo where ydbpartion='20151011' and ydb_raw_query_s like 'YBroadCastQuery@term@tradeid@0@{YBroadCastId:0000000001}'
/*('<-')pushdown.ydb*/

----示例三 超过4096条,则匹配所有(一般用来优化join,对于小表匹配使用broadcast性能很好,对于大表则依然采用原先的spark默认逻辑)-----
YBroadCastBlock@0000000001:4096:over_all@
select distinct tradeid from (
/*ydb.pushdown('->')*/ select tradeid from ydb_trade_demo where ydbpartion='20151011' and (nickname='*' or nickname='*' or nickname='*' or nickname='*')    /*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

/*ydb.pushdown('->')*/
select count(*) from ydb_trade_demo where ydbpartion='20151011' and ydb_raw_query_s like 'YBroadCastQuery@term@tradeid@0@{YBroadCastId:0000000001}' limit 10
/*('<-')pushdown.ydb*/

----示例四 字符串类型-----
YBroadCastBlock@0000000001:1024:over_error@
select distinct nickname  from (
/*ydb.pushdown('->')*/ select nickname from ydb_trade_demo where ydbpartion='20151011' and nickname='*'    /*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

select nickname,count(*) from
(
/*ydb.pushdown('->')*/
select nickname from ydb_trade_demo where ydbpartion='20151011' and ydb_raw_query_s like 'YBroadCastQuery@term@nickname@0@{YBroadCastId:0000000001}'
/*('<-')pushdown.ydb*/

) tmp group by nickname  limit 20

----示例五:通过lucene query ,可以实现范围匹配----

YBroadCastBlock@0000000002:1024:over_error@
select cast((amt-5)  as bigint) as amtmin,cast((amt+5) as bigint) as amtmax from (
/*ydb.pushdown('->')*/ select nickname,avg(amt) as amt from ydb_trade_demo where ydbpartion='20151011' and  nickname='*'  and amt>='9' and amt <='10' group by nickname   /*('<-')pushdown.ydb*/
) tmp

@YBroadCastBlock

/*ydb.pushdown('->')*/
select * from ydb_trade_demo where ydbpartion='20151011' and ydb_raw_query_s like 'YBroadCastQuery@query@amt:{{YBroadCastparam} TO {YBroadCastparam}} @0,1@{YBroadCastId:0000000002}'
limit 20
/*('<-')pushdown.ydb*/

----示例六:lucene queryAND OR  多个逻辑支持----

YBroadCastBlock@0000000001:1024:over_error@
select nickname as nickname,cast((amt-5)  as bigint) as amtmin,cast((amt+5) as bigint) as amtmax from (
/*ydb.pushdown('->')*/ select nickname,avg(amt) as amt from ydb_trade_demo where ydbpartion='20151011' and  nickname='*'  and amt>='9' and amt <='10' group by nickname   /*('<-')pushdown.ydb*/
) tmp
@YBroadCastBlock

/*ydb.pushdown('->')*/
select count(*) from ydb_trade_demo where ydbpartion='20151011' and ydb_raw_query_s like 'YBroadCastQuery@query@nickname:{YBroadCastparam} AND amt:{{YBroadCastparam} TO {YBroadCastparam}}@0,1,2@{YBroadCastId:0000000001}'
limit 20
/*('<-')pushdown.ydb*/ ;