目录
1. 使用场景
- 1. 使用场景
- 2. 导入基本原理
- 3. 导入基本语法
- 4. 导入操作
- 5. 导入结果
- 6. 取消导入
- 7. fe.conf参数配置
Broker load是一种异步导入方式,目前能从HDFS导入数据。导入数据量级别为百GB
2. 导入基本原理用户在提交导入任务后,FE 会生成对应的Plan并根据目前BE的个数和文件的大小,将Plan分给多个BE执行,每个BE执行一部分导入数据。
BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功
其中Broker封装了文件系统接口,提供Doris读取远端存储系统中文件的能力
3. 导入基本语法语法如下,更多语法帮助请通过help broker load;
进行查看:
LOAD LABEL [database_name.]your_label
(
data_desc1[, data_desc2, ...]
)
WITH BROKER broker_name
[broker_properties]
[PROPERTIES ("key"="value", ...)];
- your_label:当前导入的标签,在一个database内需要唯一
- data_desc:可以通过多个data_desc向多个表导入数据,多个表导入要么全部成功,要么全部失败
- PROPERTIES:用于指定一些特殊的参数,如下所示:
- timeout: 导入任务的超时时间,在该时间内未完成,则状态变成CANCELLED。使用例子
"timeout" = "14400"
,默认4小时,单位为秒。导入任务的大概时间计算方式为:(总文件大小(MB) * 待导入的表及相关Rollup表的个数) / (单个BE导入速度(MB/S) * 导入并发数) - max_filter_ratio 导入数据的容错率,默认为0,范围为0-1,计算方式为:max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL))
- exec_mem_limit:导入的内存限制。默认为 2GB,单位为字节
- timeout: 导入任务的超时时间,在该时间内未完成,则状态变成CANCELLED。使用例子
data_desc用于描述一批导入数据,语法如下:
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(col_name1, col_name2, ...)]
[PRECEDING FILTER predicate]
[SET (k1 = func(k2), k3 = func(k4))]
[WHERE predicate]
[DELETE ON label=true]
[ORDER BY source_sequence]
- merge_type:有三种类型append、delete、merge,默认是append。append表示这批数据全部追加到现有数据中;delete表示删除与这批数据key相同的所有行;merge需要与delete on条件联合使用,表示满足delete条件的数据按照delete语义处理,其余的按照append语义处理
- file_path1:指定一个文件的路径。可以使用通配符
*
匹配目录下多个文件 - column_separator:用于指定一行数据的分隔符,默认为
\t
- file_type:用于指定导入文件的类型,例如:parquet、orc、csv。默认值通过文件后缀名判断
- (col_name1, col_name2, …):可选,默认文件中的列和doris表的列一一对应。用于指定文件中的列名,通常和set语法配合使用。如果一列没有映射到doris表的一列,则忽略该列
- SET (k1 = func(k2), k3 = func(k4)):该操作主要有两个作用:
- 文件中的列和doris表的列顺序不一致,可以统一列顺序。如文件中为各列为:(col1, col2),doris表的两列为(col2_A, col1_A),则可以使用
set (col2_A = col2, col1_A = col1)
- 对文件中的列进行计算,生成doris表的列。如文件中有一个时间列time_col,doris表中有一个年份列year_col,则可以使用
set (year_col = year(time_col))
- 文件中的列和doris表的列顺序不一致,可以统一列顺序。如文件中为各列为:(col1, col2),doris表的两列为(col2_A, col1_A),则可以使用
- PRECEDING FILTER predicate:对set操作前,文件中的原始数据进行过滤
- WHERE predicate:对set操作后的数据进行过滤。被过滤的数据不会计入容忍率的计算
示例如下:
mysql> load label test_db.label_20220309135100
-> (
-> data infile
-> (
-> "hdfs://nnha/orc_test/part-*.snappy.orc"
-> )
-> into table broker_load_test
-> format as "orc"
-> (user_id, age, user_name)
-> set
-> (
-> user_id = user_id,
-> user_name = upper(user_name),
-> age = age
-> )
-> where age > 100
-> )
-> with broker apache_hdfs_broker
-> (
-> "dfs.nameservices" = "nnha",
-> "dfs.ha.namenodes.nnha" = "nn1, nn2, nn3",
-> "dfs.namenode.rpc-address.nnha.nn1" = "bigdata001:8020",
-> "dfs.namenode.rpc-address.nnha.nn2" = "bigdata002:8020",
-> "dfs.namenode.rpc-address.nnha.nn3" = "bigdata003:8020",
-> "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
-> );
Query OK, 0 rows affected (0.07 sec)
mysql>
- broker_properties部分指定了HDFS的HA访问
导入结果如下:
mysql> show load order by createtime desc limit 1\G
*************************** 1. row ***************************
JobId: 1828972
Label: label_20220309135100
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=18877739
TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2022-03-09 13:51:40
EtlStartTime: 2022-03-09 13:51:48
EtlFinishTime: 2022-03-09 13:51:48
LoadStartTime: 2022-03-09 13:51:48
LoadFinishTime: 2022-03-09 13:59:34
URL: NULL
JobDetails: {"Unfinished backends":{"79fb2b3749a741ec-915c915c716d7242":[]},"ScannedRows":18877739,"TaskNumber":1,"All backends":{"79fb2b3749a741ec-915c915c716d7242":[10002,10003,10004]},"FileNumber":15,"FileSize":2820632812}
1 row in set (0.37 sec)
mysql>
各返回参数说明如下:
- JobId:系统自动生成的唯一ID,即使多次导入的label一样,JobId也会不一样
- State:导入任务的状态
- PENDING:导入任务正在等待被执行
- LOADING:导入任务正在被执行
- CANCELLED:导入失败
- FINISHED:导入成功
- Progress:导入任务的进度描述。LOAD进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数
- EtlInfo:unselected.rows表示where过滤了多少行数据
- TaskInfo:导入任务的参数
- ErrorMsg:当导入任务状态为CANCELLED,会显示失败的原因,显示分两部分:type msg,type的取值意义:
- USER_CANCEL: 用户取消的任务
- ETL_RUN_FAIL:在ETL阶段失败的导入任务
- ETL_QUALITY_UNSATISFIED:数据质量不合格,也就是错误数据率超过了 max_filter_ratio
- LOAD_RUN_FAIL:在LOADING阶段失败的导入任务
- TIMEOUT:导入任务没在超时时间内完成
- UNKNOWN:未知的导入错误
- CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime:导入任务长时间停留在CreateTime,而LoadStartTime为N/A则说明目前导入任务堆积严重。用户可减少导入提交的频率
- URL:导入任务失败的详细信息URL
- JobDetails:包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的多个BE节点 Id,未完成的多个BE节点Id。其中已处理的原始行数,每5秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以EtlInfo中显示的为准
当导入未完成时,用户可以取消导入。取消导入的命令请查看help cancel load
- min_bytes_per_broker_scanner:单个BE处理的数据量的最小值,默认64MB,单位bytes
- max_bytes_per_broker_scanner:单个BE处理的数据量的最大值,默认3G,单位bytes
- max_broker_concurrency:一个作业的最大的导入并发数,默认10
- 上面3各参数可以计算出如下值:
本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)
本次导入单个BE的处理量 = 源文件大小/本次导入的并发数
- desired_max_waiting_jobs:当运行的broker load任务操作该值,则拒绝新提交的broker load任务。默认值为100
- async_pending_load_task_pool_size:一个导入任务只有一个pending task,pending task负责获取导入文件的信息。该参数用于限制同时运行的pending task的任务数量,默认为10
- async_loading_load_task_pool_size:一个导入任务有多个loading task(等于 LOAD语句中DATA INFILE子句的个数),loading task会发送给BE执行具体的导入任务。该参数用于限制同时运行的loading task的任务数量