基于Hadoop生态圈的数据仓库实践进阶技术四汇总.docx
- 文档编号:12945430
- 上传时间:2023-06-09
- 格式:DOCX
- 页数:20
- 大小:240.33KB
基于Hadoop生态圈的数据仓库实践进阶技术四汇总.docx
《基于Hadoop生态圈的数据仓库实践进阶技术四汇总.docx》由会员分享,可在线阅读,更多相关《基于Hadoop生态圈的数据仓库实践进阶技术四汇总.docx(20页珍藏版)》请在冰点文库上搜索。
基于Hadoop生态圈的数据仓库实践进阶技术四汇总
基于Hadoop生态圈的数据仓库实践——进阶技术(四)
四、角色扮演维度
当一个事实表多次引用一个维度表时会用到角色扮演维度。
例如,一个销售订单有一个是订单日期,还有一个交货日期,这时就需要引用日期维度表两次。
本节将说明两类角色扮演维度的实现,分别是表别名和数据库视图。
这两种都使用了Hive的功能。
表别名是在SQL语句里引用维度表多次,每次引用都赋予维度表一个别名。
而数据库视图,则是按照事实表需要引用维度表的次数,建立相同数量的视图。
1.修改数据库模式
使用下面的脚本修改数据库模式。
分别给数据仓库里的事实表sales_order_fact和源数据库中订单销售表sales_order增加request_delivery_date_sk和request_delivery_date列。
[sql]viewplaincopy在CODE上查看代码片派生到我的代码片
--inhive
USEdw;
--sales_order_fact表是ORC格式,增加列需要重建数据
ALTERTABLEsales_order_factRENAMETOsales_order_fact_old;
CREATETABLEsales_order_fact(
order_skINTcomment'ordersurrogatekey',
customer_skINTcomment'customersurrogatekey',
product_skINTcomment'productsurrogatekey',
order_date_skINTcomment'datesurrogatekey',
request_delivery_date_skINTcomment'requestdeliverydatesurrogatekey',
order_amountDECIMAL(10,2)comment'orderamount',
order_quantityINTCOMMENT'order_quantity'
)
CLUSTEREDBY(order_sk)INTO8BUCKETS
STOREDASORCTBLPROPERTIES('transactional'='true');
INSERTINTOsales_order_fact
SELECTorder_sk,customer_sk,product_sk,order_date_sk,NULL,order_amount,order_quantity
FROMsales_order_fact_old;
DROPTABLEsales_order_fact_old;
USErds;
ALTERTABLEsales_orderADDCOLUMNS(request_delivery_dateDATECOMMENT'requestdeliverydate');
--inmysql
USEsource;
ALTERTABLEsales_orderADDrequest_delivery_dateDATEAFTERorder_date;
修改后源数据库模式如下图所示。
Hive不能像MySQL那样指定新增列的位置,它新增的列都是在表的最后。
2.重建Sqoop作业
使用下面的脚本重建Sqoop作业,增加request_delivery_date列。
[plain]viewplaincopy在CODE上查看代码片派生到我的代码片
last_value=`sqoopjob--showmyjob_incremental_import--meta-connectjdbc:
hsqldb:
hsql:
//cdh2:
16000/sqoop|grepincremental.last.value|awk'{print$3}'`
sqoopjob--deletemyjob_incremental_import--meta-connectjdbc:
hsqldb:
hsql:
//cdh2:
16000/sqoop
sqoopjob\
--meta-connectjdbc:
hsqldb:
hsql:
//cdh2:
16000/sqoop\
--createmyjob_incremental_import\
--\
import\
--connect"jdbc:
mysql:
//cdh1:
3306/source?
useSSL=false&user=root&password=mypassword"\
--tablesales_order\
--columns"order_number,customer_number,product_code,order_date,entry_date,order_amount,order_quantity,request_delivery_date"\
--hive-import\
--hive-tablerds.sales_order\
--incrementalappend\
--check-columnorder_number\
--last-value$last_value
注意columns参数值中列的顺序(MySQL里的source.sales_order)要和rds.sales_order的顺序保持一致。
3.修改定期装载regular_etl.sql文件
定期装载HiveQL脚本需要增加对交货日期列的处理,修改后的脚本如下所示。
[sql]viewplaincopy在CODE上查看代码片派生到我的代码片
--设置变量以支持事务
sethive.support.concurrency=true;
sethive.exec.dynamic.partition.mode=nonstrict;
sethive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
setpactor.initiator.on=true;
setpactor.worker.threads=1;
USEdw;
--设置SCD的生效时间和过期时间
SEThivevar:
cur_date=CURRENT_DATE();
SEThivevar:
pre_date=DATE_ADD(${hivevar:
cur_date},-1);
SEThivevar:
max_date=CAST('2200-01-01'ASDATE);
--设置CDC的上限时间
INSERTOVERWRITETABLErds.cdc_timeSELECTlast_load,${hivevar:
cur_date}FROMrds.cdc_time;
--装载customer维度
--设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。
UPDATEcustomer_dim
SETexpiry_date=${hivevar:
pre_date}
WHEREcustomer_dim.customer_skIN
(SELECTa.customer_sk
FROM(SELECTcustomer_sk,
customer_number,
customer_street_address,
customer_zip_code,
customer_city,
customer_state,
shipping_address,
shipping_zip_code,
shipping_city,
shipping_state
FROMcustomer_dimWHEREexpiry_date=${hivevar:
max_date})aLEFTJOIN
rds.customerbONa.customer_number=b.customer_number
WHEREb.customer_numberISNULLOR
(!
(a.customer_street_address<=>b.customer_street_address)
OR!
(a.customer_zip_code<=>b.customer_zip_code)
OR!
(a.customer_city<=>b.customer_city)
OR!
(a.customer_state<=>b.customer_state)
OR!
(a.shipping_address<=>b.shipping_address)
OR!
(a.shipping_zip_code<=>b.shipping_zip_code)
OR!
(a.shipping_city<=>b.shipping_city)
OR!
(a.shipping_state<=>b.shipping_state)
));
--处理customer_street_addresses列上SCD2的新增行
INSERTINTOcustomer_dim
SELECT
ROW_NUMBER()OVER(ORDERBYt1.customer_number)+t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
t1.shipping_address,
t1.shipping_zip_code,
t1.shipping_city,
t1.shipping_state,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.customer_numbercustomer_number,
t2.customer_namecustomer_name,
t2.customer_street_addresscustomer_street_address,
t2.customer_zip_codecustomer_zip_code,
t2.customer_citycustomer_city,
t2.customer_statecustomer_state,
t2.shipping_addressshipping_address,
t2.shipping_zip_codeshipping_zip_code,
t2.shipping_cityshipping_city,
t2.shipping_stateshipping_state,
t1.version+1version,
${hivevar:
pre_date}effective_date,
${hivevar:
max_date}expiry_date
FROMcustomer_dimt1
INNERJOINrds.customert2
ONt1.customer_number=t2.customer_number
ANDt1.expiry_date=${hivevar:
pre_date}
LEFTJOINcustomer_imt3
ONt1.customer_number=t3.customer_number
ANDt3.expiry_date=${hivevar:
max_date}
WHERE(!
(t1.customer_street_address<=>t2.customer_street_address)
OR!
(t1.customer_zip_code<=>t2.customer_zip_code)
OR!
(t1.customer_city<=>t2.customer_city)
OR!
(t1.customer_state<=>t2.customer_state)
OR!
(t1.shipping_address<=>t2.shipping_address)
OR!
(t1.shipping_zip_code<=>t2.shipping_zip_code)
OR!
(t1.shipping_city<=>t2.shipping_city)
OR!
(t1.shipping_state<=>t2.shipping_state)
)
ANDt3.customer_skISNULL)t1
CROSSJOIN
(SELECTCOALESCE(MAX(customer_sk),0)sk_maxFROMcustomer_dim)t2;
--处理customer_name列上的SCD1
--因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
--因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录
DROPTABLEIFEXISTStmp;
CREATETABLEtmpAS
SELECT
a.customer_sk,
a.customer_number,
b.customer_name,
a.customer_street_address,
a.customer_zip_code,
a.customer_city,
a.customer_state,
a.shipping_address,
a.shipping_zip_code,
a.shipping_city,
a.shipping_state,
a.version,
a.effective_date,
a.expiry_date
FROMcustomer_dima,rds.customerb
WHEREa.customer_number=b.customer_numberAND!
(a.customer_name<=>b.customer_name);
DELETEFROMcustomer_dimWHEREcustomer_dim.customer_skIN(SELECTcustomer_skFROMtmp);
INSERTINTOcustomer_dimSELECT*FROMtmp;
--处理新增的customer记录
INSERTINTOcustomer_dim
SELECT
ROW_NUMBER()OVER(ORDERBYt1.customer_number)+t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
t1.shipping_address,
t1.shipping_zip_code,
t1.shipping_city,
t1.shipping_state,
1,
${hivevar:
pre_date},
${hivevar:
max_date}
FROM
(
SELECTt1.*FROMrds.customert1LEFTJOINcustomer_dimt2ONt1.customer_number=t2.customer_number
WHEREt2.customer_skISNULL)t1
CROSSJOIN
(SELECTCOALESCE(MAX(customer_sk),0)sk_maxFROMcustomer_dim)t2;
--重载PA客户维度
TRUNCATETABLEpa_customer_dim;
INSERTINTOpa_customer_dim
SELECT
customer_sk
customer_number
customer_name
customer_street_address
customer_zip_code
customer_city
customer_state
shipping_address
shipping_zip_code
shipping_city
shipping_state
version
effective_date
expiry_date
FROMcustomer_dim
WHEREcustomer_state='PA';
--装载product维度
--设置已删除记录和product_name、product_category列上SCD2的过期
UPDATEproduct_dim
SETexpiry_date=${hivevar:
pre_date}
WHEREproduct_dim.product_skIN
(SELECTa.product_sk
FROM(SELECTproduct_sk,product_code,product_name,product_category
FROMproduct_dimWHEREexpiry_date=${hivevar:
max_date})aLEFTJOIN
rds.productbONa.product_code=b.product_code
WHEREb.product_codeISNULLOR(a.product_name<>b.product_nameORa.product_category<>b.product_category));
--处理product_name、product_category列上SCD2的新增行
INSERTINTOproduct_dim
SELECT
ROW_NUMBER()OVER(ORDERBYt1.product_code)+t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.product_codeproduct_code,
t2.product_nameproduct_name,
t2.product_categoryproduct_category,
t1.version+1version,
${hivevar:
pre_date}effective_date,
${hivevar:
max_date}expiry_date
FROMproduct_dimt1
INNERJOINrds.productt2
ONt1.product_code=t2.product_code
ANDt1.expiry_date=${hivevar:
pre_date}
LEFTJOINproduct_dimt3
ONt1.product_code=t3.product_code
ANDt3.expiry_date=${hivevar:
max_date}
WHERE(t1.product_name<>t2.product_nameORt1.product_category<>t2.product_category)ANDt3.product_kISNULL)t1
CROSSJOIN
(SELECTCOALESCE(MAX(product_sk),0)sk_maxFROMproduct_dim)t2;
--处理新增的product记录
INSERTINTOproduct_dim
SELECT
ROW_NUMBER()OVER(ORDERBYt1.product_code)+t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
1,
${hivevar:
pre_date},
${hivevar:
max_date}
FROM
(
SELECTt1.*FROMrds.productt1LEFTJOINproduct_dimt2ONt1.product_code=t2.product_code
WHEREt2.product_skISNULL)t1
CROSSJOIN
(SELECTCOALESCE(MAX(product_sk),0)sk_maxFROMproduct_dim)t2;
--装载order维度
INSERTINTOorder_dim
SELECT
ROW_NUMBER()OVER(ORDERBYt1.order_number)+t2.sk_max,
t1.order_number,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
order_numberorder_number,
1version,
order_dateeffective_date,
'2200-01-01'expiry_date
FROM
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- 基于Hadoop生态圈的数据仓库实践 进阶技术四汇总 基于 Hadoop 生态 数据仓库 实践 进阶 技术 汇总