首页 ETL算法
文章
取消

ETL算法

Hive 抽取数据时的存储方式。

在 MySQL 中的表结构

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `product` (
  `product_id` int(11) NOT NULL COMMENT '商品ID',
  `product_type` int(11) NOT NULL COMMENT '商品类型',
  `product_name` varchar(1000) NOT NULL COMMENT '商品名称',
  `shop_id` int(11) NOT NULL COMMENT '店铺ID',
  `shop_type` int(11) NOT NULL COMMENT '店铺类型',
  `shop_name` varchar(1000) NOT NULL COMMENT '店铺名称',
  `create_time` varchar(1000) DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`product_id`)
);

ETL 流程

全量快照

  • 适用场景:数据量小、数据变化大的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
sqoop import \
--delete-target-dir \
--connect 'jdbc:mysql://localhost:3306/mysql' \
--username 'root' \
--password 'root' \
--query \
"
select
    product_id,
    product_type,
    product_name,
    shop_id,
    shop_type,
    shop_name,
    create_time,
    update_time,
    now() as etl_time
from product
where \$CONDITIONS
" \
--split-by product_id \
-m 1 \
--fields-terminated-by '\001' \
--lines-terminated-by '\n' \
--hive-drop-import-delims  \
--null-string '\\N' \
--null-non-string '\\N' \
--target-dir 'hdfs://localhost:9000/user/hive/warehouse/dw_ods.db/product/trans_date=2019-01-01'

不保留历史的增量更新

  • 适合场景:变化量比较小,历史数据对现有业务没有影响

  • 根据数据的更新日期拉取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
sqoop import \
--delete-target-dir \
--connect 'jdbc:mysql://localhost:3306/mysql' \
--username 'root' \
--password 'root' \
--query \
"
select
    product_id,
    product_type,
    product_name,
    shop_id,
    shop_type,
    shop_name,
    create_time,
    update_time,
    now() as etl_time
from product
where \$CONDITIONS
" \
--split-by product_id \
-m 1 \
--fields-terminated-by '\001' \
--lines-terminated-by '\n' \
--hive-drop-import-delims  \
--null-string '\\N' \
--null-non-string '\\N' \
--target-dir 'hdfs://localhost:9000/user/hive/warehouse/dw_tmp.db/product'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
insert overwrite table dw_ods.product
select
    product_id,
    product_type,
    product_name,
    shop_id,
    shop_type,
    shop_name,
    create_time,
    update_time,
    etl_time
from (
    select 
        *,
        row_number() over(partition by product_id order by etl_time desc) rn
    from (
        select * from dw_tmp.product
        union all 
        select * from dw_ods.product
    ) t1
) t2
where rn = 1

还有一种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from (
    select 
        case when product_id1 is not null then struct1 else struct2 end as final_data,
        case when product_id1 is not null then from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') else etl_time end etl_time
    from (
        select 
            t1.product_id as product_id1,
            struct(t1.product_id,t1.product_type,t1.product_name,t1.shop_id,t1.shop_type,t1.shop_name,t1.create_time,t1.update_time) as struct1,
            t2.product_id as product_id2,
            struct(t2.product_id,t2.product_type,t2.product_name,t2.shop_id,t2.shop_type,t2.shop_name,t2.create_time,t2.update_time) as struct2,
            t2.etl_time
        from dw_tmp.product t1
        full join dw_ods.product t2 on t1.product_id = t2.product_id
    ) t1
) t2

--更新目标表
insert overwrite table dw_ods.product 
select
    final_data.col1,
    final_data.col2,
    final_data.col3,
    final_data.col4,
    final_data.col5,
    final_data.col6,
    final_data.col7,
    final_data.col8,
    etl_time;
本文由作者按照 CC BY 4.0 进行授权