Doris_Hive2Doris推数组件

Doris_Hive2Doris推数组件

一、Doris常见离线数据导入方式

Broker Load

在Broker Load模式下,通过部署的Broker程序,DorisDB可读取对应数据源(如HDFS, S3、阿里云 OSS、腾讯 COS)上的数据,利用自身的计算资源对数据进行预处理和导入。这是一种异步的导入方式,用户需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。

Spark Load

Spark Load通过外部的Spark资源实现对导入数据的预处理,提高DorisDB大数据量的导入性能并且节省Doris集群的计算资源。主要用于初次迁移、大数据量导入DorisDB的场景(数据量可到TB级别)。

Stream Load

Stream Load是一种同步的导入方式,用户通过发送HTTP请求将本地文件或数据流导入到DorisDB中。Stream Load同步执行导入并返回导入结果。用户可直接通过请求的返回值判断导入是否成功。

二、Broker Load实战案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
## 提交brokerLoad任务
LOAD LABEL test_ge_acc_minute_cart_08
(
DATA INFILE("hdfs://ns7/user/mart_scr/app.db/app_jdr_ge_s03_sku_addcart_acc_minute_doris_m_a_d_d/dt=2024-12-02/*")
INTO TABLE `ge_acc_minute_cart`
COLUMNS TERMINATED BY "\t"
FORMAT AS "csv"
(skuid,spuid,mainbrandid,brandid,shopid,venderid,buid,deptid1,deptid2,deptid3,deptid4,deptid5,saleserp,buyererp,purconerp,cateerp,categoryid1,categoryid2,categoryid3,categoryid4,price,mqtype,num,channel,model,providercode,poptype,datetime,houridx,minuteidx,tenminidx,date,msgyn,pin,catebuid,catedeptid1,catedeptid2,catedeptid3,catedeptid4,cateerpglb,x,blacklist)
COLUMNS FROM PATH AS (dt)
SET(skuId=skuid,spuId=spuid,mainBrandId=mainbrandid,brandId=brandid,shopId=shopid,venderId=venderid,buId=buid,deptId1=deptid1,deptId2=deptid2,deptId3=deptid3,deptId4=deptid4,deptId5=deptid5,salesErp=saleserp,buyerErp=buyererp,purConErp=purconerp,cateErp=cateerp,categoryId1=categoryid1,categoryId2=categoryid2,categoryId3=categoryid3,categoryId4=categoryid4,price=price,mqType=mqtype,num=num,channel=channel,model=model,providerCode=providercode,popType=poptype,dateTime=datetime,hourIdx=houridx,minuteIdx=minuteidx,tenMinIdx=tenminidx,date=date,msgYn=msgyn,pin=pin,cateBuId=catebuid,cateDeptId1=catedeptid1,cateDeptId2=catedeptid2,cateDeptId3=catedeptid3,cateDeptId4=catedeptid4,cateErpGlb=cateerpglb,X=x,dt=dt,blackList=blacklist)
)
WITH BROKER hdfs_broker
;

## 添加临时分区
ALTER TABLE ge_acc_minute_cart ADD TEMPORARY PARTITION tp20241202 VALUES [("2024-12-02"), ("2024-12-03"));

## 查看临时分区
SHOW TEMPORARY PARTITIONS FROM ge_acc_minute_cart;

## 删除临时分区
ALTER TABLE ge_acc_minute_cart DROP TEMPORARY PARTITION tp20241204;

## 查询临时分区数据
select count(1) from ge_acc_minute_cart TEMPORARY PARTITION(tp20241202);

select count(1) from ge_acc_minute_cart PARTITION(p20241202);

## 往临时分区提交BrokerLoad导数任务(hive->Doirs)
LOAD LABEL test_ge_acc_minute_cart_09
(
DATA INFILE("hdfs://ns7/user/mart_scr/app.db/app_jdr_ge_s03_sku_addcart_acc_minute_doris_m_a_d_d/dt=2024-12-02/*")
INTO TABLE `ge_acc_minute_cart`
TEMPORARY PARTITION (tp20241202)
COLUMNS TERMINATED BY "\t"
FORMAT AS "csv"
(skuid,spuid,mainbrandid,brandid,shopid,venderid,buid,deptid1,deptid2,deptid3,deptid4,deptid5,saleserp,buyererp,purconerp,cateerp,categoryid1,categoryid2,categoryid3,categoryid4,price,mqtype,num,channel,model,providercode,poptype,datetime,houridx,minuteidx,tenminidx,date,msgyn,pin,catebuid,catedeptid1,catedeptid2,catedeptid3,catedeptid4,cateerpglb,x,blacklist)
COLUMNS FROM PATH AS (dt)
SET(skuId=skuid,spuId=spuid,mainBrandId=mainbrandid,brandId=brandid,shopId=shopid,venderId=venderid,buId=buid,deptId1=deptid1,deptId2=deptid2,deptId3=deptid3,deptId4=deptid4,deptId5=deptid5,salesErp=saleserp,buyerErp=buyererp,purConErp=purconerp,cateErp=cateerp,categoryId1=categoryid1,categoryId2=categoryid2,categoryId3=categoryid3,categoryId4=categoryid4,price=price,mqType=mqtype,num=num,channel=channel,model=model,providerCode=providercode,popType=poptype,dateTime=datetime,hourIdx=houridx,minuteIdx=minuteidx,tenMinIdx=tenminidx,date=date,msgYn=msgyn,pin=pin,cateBuId=catebuid,cateDeptId1=catedeptid1,cateDeptId2=catedeptid2,cateDeptId3=catedeptid3,cateDeptId4=catedeptid4,cateErpGlb=cateerpglb,X=x,dt=dt,blackList=blacklist)
)
WITH BROKER hdfs_broker
;


delete from ge_acc_minute_cart PARTITION p20241202 where skuId=100127326719;
delete from ge_acc_minute_cart PARTITION p20241202 where skuId=100109078581;

## 临时分区替换正式分区(原子性)
ALTER TABLE ge_acc_minute_cart REPLACE PARTITION (p20241202) WITH TEMPORARY PARTITION (tp20241202);
ALTER TABLE ge_acc_minute_cart REPLACE PARTITION (p20241203,p20241204) WITH TEMPORARY PARTITION (tp20241204) PROPERTIES ("strict_range" = "false");

## 删除正式分区数据
delete from ge_acc_minute_cart PARTITION p20241204 where dt='2024-12-04';

三、Stream Load实战案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
object Spark2Doris {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark: SparkSession = SparkSession.builder()
.appName("Hive to Doris with Spark Doris Connector")
.master("local[*]") // 根据需要设置为集群模式
.enableHiveSupport() // 启用 Hive 支持
.getOrCreate()

// 读取 Hive 表数据
val hiveTable = "adm.adm_zs_z1106_pre_sale_details" // Hive 表名
val hiveData: DataFrame = spark.sql(s"SELECT shop_id, dt, shop_typ, chan_cd, item_sku_id, item_sku_name, sale_ord_id, pre_sale_deposit_pay_flg, pre_sale_payment_pay_flg, user_pin, sku_count, pay_bargain_real, pay_bargain_plan, pay_balance_real, pay_balance_plan, freight, freight_discount, service_charge, item_id, item_name, pre_sale_id, data_typ FROM adm.adm_zs_z1106_pre_sale_details Where dt = '2024-10-17'")

// Doris 配置
val dorisTable = "order_deal.adm_zs_z1106_pre_sale_details"
val dorisHost = "drpub62.olap.jd.com"
val dorisPort = "2004"
val dorisUser = "******"
val dorisPassword = "******"

// 将数据写入 Doris
hiveData.write
.format("doris") // 使用 spark-doris-connector
.option("doris.table.identifier", dorisTable)
.option("doris.fenodes", s"$dorisHost:$dorisPort") // Doris 的节点
.option("user", dorisUser) // 用户名
.option("password", dorisPassword) // 密码
.option("doris.batch.size", "1024") // 可选,设置批量大小
.option("doris.max.filter.ratio", "1") // 可选,允许过滤率
.mode("append") // 选择写入模式
.save()

// 关闭 SparkSession
spark.stop()
}
}

重要事项:

  • key列为空会导致推数失败。
  • hive中的string列推doris的varchar()列超长会导致推数失败,需要裁剪数据或扩容doris列。
  • hive中的列数据类型与doris列数据类型不一致会导致推数失败,如string类型推int()类型。
  • 避免部分行推数失败导致所有行推数失败,可以使用doris.max.filter.ratio配置一个可容忍的失败率,否则任意一行推数失败则整个推数任务失败。

另外注意如下支持hiveData.write.format(“doris”)的依赖包并不是公共远程仓库中的,需要自行clone开源spark-doris-connector项目,根据spark版本本地打包上传本地或私域仓库使用。

1
2
3
4
5
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-2.4_2.11</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>

参考文献

Doris外部表及数据导入

maven项目 在IDEA 中使用 mvn install 添加本地 jar包依赖 全过程