5、大表join大表优化

      如果Hive优化实战2中mapjoin中小表dim_seller很大呢?比如超过了1GB大小?这种就是大表join大表的问题。首先引入一个具体的问题场景,然后基于此介绍各自优化方案。

   5.1、问题场景

      问题场景如下:

      A表为一个汇总表,汇总的是卖家买家最近N天交易汇总信息,即对于每个卖家最近N天,其每个买家共成交了多少单,总金额是多少,假设N取90天,汇总值仅取成交单数。

      A表的字段有:buyer_id、seller_id、pay_cnt_90day。

      B表为卖家基本信息表,其字段有seller_id、sale_level,其中sale_levels是卖家的一个分层评级信息,比如吧卖家分为6个级别:S0、S1、S2、S3、S4和S5。

      要获得的结果是每个买家在各个级别的卖家的成交比例信息,比如:

      某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S5:10%。

      正如mapjoin中的例子一样,第一反应是直接join两表并统计:

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (select seller_id,  sale_level  from table_B)  b

        on  a.seller_id  = b.seller_id

        )  m

      group by m.buyer_id

      但是此SQL会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分的卖家90天内买家的数目并不多,join table_A和table_B的时候,

    ODPS会按照seller_id进行分发,table_A的大卖家引起了数据倾斜。

      但是数据本身无法用mapjoin table_B解决,因为卖家超过千万条,文件大小有几个GB,超过了1GB的限制。

   5.2、优化方案1

      一个很正常的想法是,尽管B表无法直接mapjoin, 但是是否可以间接mapjoin它呢?

      实际上此思路有两种途径:限制行和限制列。

      限制行的思路是不需要join B全表,而只需要join其在A表中存在的,对于本问题场景,就是过滤掉90天内没有成交的卖家。

      限制列的思路是只取需要的字段。

      加上如上的限制后,检查过滤后的B表是否满足了Hive  mapjoin的条件,如果能满足,那么添加过滤条件生成一个临时B表,然后mapjoin该表即可。采用此思路的语句如下:

      

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  /*+mapjoin(b)*/

          a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (

           select seller_id,  sale_level  from table_B b0

           join

           (select seller_id from table_A group by seller_id) a0

             on b0.seller_id = a0.selller_id

          )  b

        on  a.seller_id  = b.seller_id

        )  m

      group by m.buyer_id

      此方案在一些情况可以起作用,但是很多时候还是无法解决上述问题,因为大部分卖家尽管90天内买家不多,但还是有一些的,过滤后的B表仍然很多。

  5.3、优化方案2

      此种解决方案应用场景是:倾斜的值是明确的而且数量很少,比如null值引起的倾斜。其核心是将这些引起倾斜的值随机分发到Reduce,其主要核心逻辑在于join时对这些特殊值concat随机数,

    从而达到随机分发的目的。此方案的核心逻辑如下:

       select a.user_id, a.order_id, b.user_id

      from table_a a join table_b b

      on (case when a.user_is is null then concat('hive', rand()) else a.user_id end) = b.user_id

      Hive 已对此进行了优化,只需要设置参数skewinfo和skewjoin参数,不修改SQL代码,例如,由于table_B的值“0” 和“1”引起了倾斜,值需要做如下设置:

      set hive.optimize.skewinfo=table_B:(selleer_id) [ ( "0") ("1") ) ]

      set hive.optimize.skewjoin = true;

      但是方案2因为无法解决本问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化。

  

  5.4 、优化方案3:倍数B表,在取模join

     1、通用方案

      此方案的思路是建立一个numbers表,其值只有一列int 行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。代码如下:

      

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (

          select  /*+mapjoin(members)*/

            seller_id,  sale_level ,member

          from table_B

         join members

          )  b

        on  a.seller_id  = b.seller_id

          and mod(a.pay_cnt_90day,10)+1 = b.number

        )  m

      group by m.buyer_id

         此思路的核心在于,既然按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少到原来的1/10,可以通过配置numbers表改放大倍数来降低倾斜程度,

      但这样做的一个弊端是B表也会膨胀N倍。

    2、专用方案

        通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(

      比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。

        在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持不变。具体代码如下:

      

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (

          select  /*+mapjoin(big)*/

             buyer_id,  seller_id,  pay_cnt_90day,

             if(big.seller_id is not null, concat(  table_A.seller_id,  'rnd',  cast(  rand() * 1000 as bigint ), table_A.seller_id)  as seller_id_joinkey

              from table_A

               left outer join

             --big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变

             (select seller_id  from dim_big_seller  group by seller_id)big

             on table_A.seller_id = big.seller_id

        )  a

        join

               (

          select  /*+mapjoin(big)*/

            seller_id,  sale_level ,

            --big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样

            coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey

          from table_B

         left out join

          --table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变

          (select seller_id, seller_id_joinkey from dim_big_seller) big

          on table_B.seller_id= big.seller_id

          )  b

        on  a.seller_id_joinkey= b.seller_id_joinkey

          and mod(a.pay_cnt_90day,10)+1 = b.number

        )  m

      group by m.buyer_id

      相比通用方案,专用方案的运行效率明细好了许多,因为只是将B表中大卖家的行数放大了1000倍,其它卖家的行数保持不变,但同时代码复杂了很多,而且必须首先建立大数据表。

   5.5 、动态一分为二

      实际上方案2和3都用了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案是动态一分为二,即对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,
    倾斜的把他们找出来做mapjoin,最后union all其结果即可。

      但是此种解决方案比较麻烦,代码复杂而且需要一个临时表存放倾斜的键值。代码如下:

      --由于数据倾斜,先找出90天买家超过10000的卖家

      insert overwrite table  temp_table_B

      select

        m.seller_id,  n.sale_level

      from (

        select   seller_id

        from (

          select seller_id,count(buyer_id) as byr_cnt

          from table_A

          group by seller_id

          ) a

        where a.byr_cnt >10000

        ) m

      left join

      (

       select seller_id, sale_level  from table_B

      ) n

        on m.seller_id = n.seller_id;

      

      --对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可。

      

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (

          select seller_id,  a.sale_level

           from table_A  a

           left join temp_table_B b

          on a.seller_id = b.seller_id

          where b.seller_id is not null

          )  b

        on  a.seller_id  = b.seller_id

       union all

       

       select  /*+mapjoin(b)*/

          a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (

           select buyer_id,  seller_id,  pay_cnt_90day

          from table_A

          )  a

        join

               (

           select seller_id,  sale_level  from table_B

          )  b

        on  a.seller_id  = b.seller_id

     )  m  group by m.buyer_id

     ) m

     group by m.buyer_id

    总结:方案1、2以及方案3中的同用方案不能保证解决大表join大表问题,因为它们都存在种种不同的限制和特定使用场景。而方案3的专用方案和方案4是推荐的优化方案,但是它们都需要新建一个临时表

       来存储每日动态变化的大卖家。相对方案4来说,方案3的专用方案不需要对代码框架进行修改,但是B表会被放大,所以一定要是是维度表,不然统计结果会是错误的。方案4最通用,自由度最高,

       但是对代码的更改也最大,甚至修改更难代码框架,可以作为终极方案使用。

    

    参考资料:《离线和实时大数据开发实战》

最新文章

  1. Caliburn.Micro学习笔记(四)----IHandle<T>实现多语言功能
  2. Redis执行Lua脚本的情况
  3. 在Entity Framework 中执行T-sql语句
  4. struts2学习笔记(2)——简单的输入验证以及标签库的运用
  5. 深入浅出Java并发包—锁机制(三)
  6. 关于WinRT中c++和c#相互调用的问题
  7. leetcode 第二题Add Two Numbers java
  8. 桥模式设计模式进入Bridge
  9. windows下搭建Cygwin环境
  10. css模拟Bootstrap响应式布局——栅格
  11. TextView的几个属性
  12. 从websphere6.1迁移到weblogic10.3的问题总结
  13. go 实现单链表反转
  14. 微信小程序入门(五)
  15. sql server 一直提示正在还原
  16. 7.20 Codeforces Beta Round #8
  17. VSFTP的使用
  18. unigui 1.90.0 Example
  19. python-flask-SQLAlchemy
  20. 【Web】Nginx下载与安装

热门文章

  1. [POJ1144]Network
  2. FTP传输一定要注意使用二进制模式
  3. STM32F103ZET6 用定时器级联方式输出特定数目的PWM
  4. Temporary ASP.Net Files探究
  5. Git 修复 bug 切换分支时,如何保存修改过的代码(即如何保存现场)?
  6. Linux Delay Accounting
  7. 用格式工厂将mts文件转换成其它格式flv,mpg失败
  8. iOS: 计算 UIWebView 的内容高度
  9. SQL 参考
  10. MEF and AppDomain z