Hive 抽取数据时的存储方式。
在 MySQL 中的表结构
1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `product` (
`product_id` int(11) NOT NULL COMMENT '商品ID',
`product_type` int(11) NOT NULL COMMENT '商品类型',
`product_name` varchar(1000) NOT NULL COMMENT '商品名称',
`shop_id` int(11) NOT NULL COMMENT '店铺ID',
`shop_type` int(11) NOT NULL COMMENT '店铺类型',
`shop_name` varchar(1000) NOT NULL COMMENT '店铺名称',
`create_time` varchar(1000) DEFAULT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`product_id`)
);
ETL 流程
全量快照
- 适用场景:数据量小、数据变化大的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
sqoop import \
--delete-target-dir \
--connect 'jdbc:mysql://localhost:3306/mysql' \
--username 'root' \
--password 'root' \
--query \
"
select
product_id,
product_type,
product_name,
shop_id,
shop_type,
shop_name,
create_time,
update_time,
now() as etl_time
from product
where \$CONDITIONS
" \
--split-by product_id \
-m 1 \
--fields-terminated-by '\001' \
--lines-terminated-by '\n' \
--hive-drop-import-delims \
--null-string '\\N' \
--null-non-string '\\N' \
--target-dir 'hdfs://localhost:9000/user/hive/warehouse/dw_ods.db/product/trans_date=2019-01-01'
不保留历史的增量更新
适合场景:变化量比较小,历史数据对现有业务没有影响
根据数据的更新日期拉取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
sqoop import \
--delete-target-dir \
--connect 'jdbc:mysql://localhost:3306/mysql' \
--username 'root' \
--password 'root' \
--query \
"
select
product_id,
product_type,
product_name,
shop_id,
shop_type,
shop_name,
create_time,
update_time,
now() as etl_time
from product
where \$CONDITIONS
" \
--split-by product_id \
-m 1 \
--fields-terminated-by '\001' \
--lines-terminated-by '\n' \
--hive-drop-import-delims \
--null-string '\\N' \
--null-non-string '\\N' \
--target-dir 'hdfs://localhost:9000/user/hive/warehouse/dw_tmp.db/product'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
insert overwrite table dw_ods.product
select
product_id,
product_type,
product_name,
shop_id,
shop_type,
shop_name,
create_time,
update_time,
etl_time
from (
select
*,
row_number() over(partition by product_id order by etl_time desc) rn
from (
select * from dw_tmp.product
union all
select * from dw_ods.product
) t1
) t2
where rn = 1
还有一种
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from (
select
case when product_id1 is not null then struct1 else struct2 end as final_data,
case when product_id1 is not null then from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') else etl_time end etl_time
from (
select
t1.product_id as product_id1,
struct(t1.product_id,t1.product_type,t1.product_name,t1.shop_id,t1.shop_type,t1.shop_name,t1.create_time,t1.update_time) as struct1,
t2.product_id as product_id2,
struct(t2.product_id,t2.product_type,t2.product_name,t2.shop_id,t2.shop_type,t2.shop_name,t2.create_time,t2.update_time) as struct2,
t2.etl_time
from dw_tmp.product t1
full join dw_ods.product t2 on t1.product_id = t2.product_id
) t1
) t2
--更新目标表
insert overwrite table dw_ods.product
select
final_data.col1,
final_data.col2,
final_data.col3,
final_data.col4,
final_data.col5,
final_data.col6,
final_data.col7,
final_data.col8,
etl_time;