目录
- 1. 数据导入流程
- 2. 使用python实现Stream load
- 3. Doris HTTP headers参数说明
- 4. 导入资源设置
Stream Load是一种通过HTTP协议进行PUT操作,将数据导入到Doris。数据导入是同步操作,导入完成会返回成功或失败的response。一些具体的使用说明可以通过help stream load
获取
- 方式一,优点是每次的Coordinator可能是不同的BE,减低了压力:
- client向FE请求,FE找出一个BE作为Coordinator
- client向Coordinator发送数据,Coordinator对数据进行分发,发送到各个BE
- Coordinator向client返回发送的结果
- 方式二:
- client直接向一个BE1发送数据,BE1对数据进行分发,发送到各个BE
- BE1向client返回发送的结果
import requests
from requests.auth import HTTPBasicAuth
import uuid
import json
if __name__ == '__main__':
url = 'http://192.168.8.112:7030/api/dataQualitySystem/test/_stream_load'
json_data = {"records": [{"id": 3, "name": "zhangsan"}, {"id": 4, "name": "lisi"}]}
doris_headers = {
'label': str(uuid.uuid1()),
'format': 'json',
'expect': '100 continue',
'strip_outer_array': "true",
'json_root': '$.records',
'where': 'id=3'
}
auth = HTTPBasicAuth('root', 'Root_123')
response = requests.put(url=url,
json=json_data,
headers = doris_headers,
auth = auth
)
redict_url = response.url
redict_headers = {
'label': str(uuid.uuid1()),
'format': 'json',
'strip_outer_array': "true",
'json_root': '$.records',
'where': 'id=3'
}
redict_response = requests.put(url=redict_url,
json=json_data,
headers = redict_headers,
auth = auth
)
if json.loads(redict_response.text).get("Status") != "Success":
ex = Exception(redict_response.text)
raise ex
print(redict_response.text)
程序说明如下:
- url变量部分:
- 7030端口是fe.conf中的http_port
- dataQualitySystem/test分别指数据库和表
- doris_headers字典部分:
- 可以自己指定,也可以由doris系统生成,但label需保证在一个database唯一。除了stream load,也不能和其它数据导入方式的label相同
- expect: 100 continue表示client先向server发送消息说,我的数据超过了1024字节,server端回应说会接收数据的,之后client再向server端发送数据
- strip_outer_array表示导入多条数据,json_root表示多条数据的list对应的key
- where表示对导入的数据进行条件过滤
- 因为FE重定向到作为Coordinator的BE,python的requests虽然可以自动实现重定向到Coordinator,但是不能携带账号和密码进行认证,会报异常:no valid Basic authorization,所以自己手动实现重定向到BE(该BE相当于Coordinator)
执行结果如下:
{
"TxnId": 201601,
"Label": "c217d2f4-3dc5-11ec-862d-5cbaef5fa158",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 1,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 1,
"LoadBytes": 71,
"LoadTimeMs": 28,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 4,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 10,
"CommitAndPublishTimeMs": 13
}
返回结果说明:
- Status: 如果是Publish Timeout,表示导入成功,只是数据暂时对用户不可见
- ExistingJobStatus:如果Label重复,已经存在的Lable的Status
- NumberTotalRows:导入的总数据条数,等于NumberLoadedRows + NumberFilteredRows + NumberUnselectedRows
- NumberLoadedRows:导入成功的数据条数
- NumberFilteredRows:导入失败的数据条数
- NumberUnselectedRows:被where条件过滤的数据条数
- LoadTimeMs:导入完成时间。单位毫秒
- BeginTxnTimeMs:向Fe或BE请求,开始一个事务所花费的时间
- StreamLoadPutTimeMs:向Fe或BE请求,获取导入数据执行计划所花费的时间
- ReadDataTimeMs:读取数据所花费的时间
- WriteDataTimeMs:写入数据操作所花费的时间
- CommitAndPublishTimeMs:向Fe或BE请求,提交并且发布事务所花费的时间
- max_filter_ratio 导入数据的容错率,默认为0,范围为0-1,计算方式为:NumberFilteredRows / (NumberLoadedRows + NumberFilteredRows)
- exec_mem_limit:导入的内存限制。默认为 2GB,单位为字节
- stream_load_default_timeout_second:FE配置,默认300秒,单位为秒,超过该时间的导入任务将被取消变成CANCELLED。可以通过Doris HTTP headers设置或fe.conf进行设置
- streaming_load_max_mb:BE配置,默认10G,单位为MB,用户导入数据的最大限制。只能在be.conf中设置