您当前的位置: 首页 >  ar

cuiyaonan2000

暂无认证

  • 2浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Elasticsearch: Pipeline

cuiyaonan2000 发布时间:2022-08-12 10:00:09 ,浏览量:2

序言

Elasticsearch有采集管道直说.其实我们在Kibana中就可以看到它已经提供了2个.所有的文档(Document)都是先通过管道在入库的cuiyaonan2000@163.com

默认提供的管道如下所示:

管道的定义如下所示

 

 Ingest Node

Ingest Node表示:预处理节点,是 ES 用于功能上命名的一种节点类型,可以通过 elasticsearch.xml 进行如下配置来标识出集群中的某个节点是否是 Ingest Node.

node.ingest: ture

上述将 node.ingest 设置成 true,则表明当前节点是 Ingest Node,具有预处理能力, Elasticsearch 默认所有节点都是 Ingest Node,即集群中所有的节点都具有预处理能力.

预处理

用过 Logstash 对日志进行处理的用户都知道,一般情况下我们并不会直接将原始的日志进行加载到 Elasticsearch 集群,而是对原始日志信息进行(深)加工后保存到 Elasticsearch 集群中。比如 Logstash 支持多种解析器比如 json,kv,date 等,比较经典的是 grok.这里我们不会对 Logstash 的解析器进行详细说明,只是为了描述一个问题,有些时候我们需要 Logstash 来对加载到 Elasticsearch 中的数据进行处理,这个处理,从概念上而言,我们也能称之为"预处理。而这里我们所说的预处理也其实就是类似的概念。

Elasticsearch 5.x 版本开始,官方在内部集成了部分 Logstash 的功能,这就是 Ingest,而具有 Ingest 能力的节点称之为 Ingest Node.

如果要脱离 Logstash 来对在 Elasticsearch 写入文档之前对文档进行加工处理,比如为文档某个字段设置默认值,重命名某个字段,设置通过脚本来完成更加复杂的加工逻辑,我们则必须要了解两个基本概念: Pipeline 和 Processors.

Pipeline与Processor

这里就以Pipeline和java中的Stream进行类比,两者从功能和概念上很类似,我们经常会对Stream中的数据进行处理,比如map操作,peek操作,reduce操作,count操作等,这些操作从行为上说,就是对数据的加工,而Pipeline也是如此,Pipeline也会对通过该Pipeline的数据(一般来说是文档)进行加工,比如上面说到的,修改文档的某个字段值,修改文档某个字段的类型等等.而Elasticsearch对该加工行为进行抽象包装,并称之为Processors.

Elasticsearch命名了多种类型的Processors来规范对文档的操作,比如set,append,date,join,json,kv等等.这些不同类型的Processors,我们会在后文进行说明

如此这般,Pipeline相当于java中的stream.而process相当于是一些类似于map,flatmap的算子cuiyaonan2000@163.com

定义一个 Pipeline 是件很简单的事情,官方给出了参考(pipeline中包含了processors):

PUT _ingest/pipeline/my-pipeline-id
{
  "description" : "describe pipeline",
  "processors" : [
    {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
    }
  ]
}

上面的例子,表明通过指定的 URL 请求"_ingest/pipeline“定义了一个 ID 为”my-pipeline-id"的 pipeline,其中请求体中的存在两个必须要的元素:

  • description 描述该 pipeline 是做什么的
  • processors 定义了一系列的 processors,这里只是简单的定义了一个赋值操作,即将字段名为"foo“的字段值都设置为”bar"

如果需要了解更多关于 Pipeline 定义的信息,可以参考: Ingest APIs

Simulate Pipeline API

该API即用于我们测试自己的Pipeline的工具,可以帮助我们去测试自己的管道.

#如下测试pipeline的时候不指定pipeline,而是在测试内容中声明一个
POST _ingest/pipeline/_simulate
{
  "pipeline" : {
    // pipeline definition here
    //因为没有在路径中设置pipeline的id,所以这里我们要声明一个
  },
  "docs" : [
    { "_source": {/** first document **/} },
    { "_source": {/** second document **/} },
    // ...
  ]
}
 
#这里直接在路径中指明一个pipeline来测试
POST _ingest/pipeline/my-pipeline-id/_simulate
{
  "docs" : [
    { "_source": {/** first document **/} },
    { "_source": {/** second document **/} },
    // ...
  ]
}

simulate的瞬态字段

执行如下的测试访问

POST _ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}
 

返回的内容

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "type",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "type",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

从具体的响应结果中看到,在文档通过 pipeline 时(或理解为被 pipeline 中的 processors 加工后),新的文档与原有的文档产生了差异,这些差异体现为:

  • 文档都新增了 field2 字段,这点我们可以通过对比响应前后的定义在"docs"中文档的_source 内容中看出
  • 额外增加了一些临时(官方称之为瞬态)的字段,比如 timestamp,他们都在_ingest 节点下,(这些)字段都是临时与源文档存在一起,在被 pipeline 中的 processors 加工后后返回给对应的批量操作或索引操作之后。这些信息就不会携带返回。----额外增加的字段都在_ingest节点下cuiyaonan2000@163.com

Processor中访问文档元数据

每个文档都会有一些元数据字段信息(metadata filed),比如_id_index,_type 等,我们在 processors 中也可以直接访问这些信息的,比如下面的例子:

{
  "set": {
    "field": "_id"
    "value": "1"
  }
}

Processor访问瞬态字段

只是引用方式的区别,瞬态字段需要使用{{}}来访问,没有文档元数据的字段访问方便cuiyaonan2000@163.com

{
  "set": {
    "field": "received"
    "value": "{{_ingest.timestamp}}"
  }
}

Processors 类型

系统提供的默认的类型如下所示.

  • Append Processor 追加处理器
  • Convert Processor 转换处理器
  • Date Processor 日期转换器
  • Date Index Name Processor 日期索引名称处理器
  • Fail Processor 失败处理器
  • Foreach Processor 循环处理器
  • Grok Processor Grok 处理器
  • Gsub Processor
  • Join Processor
  • JSON Processor
  • KV Processor
  • Lowercase Processor
  • Remove Processor
  • Rename Processor
  • Script Processor
  • Split Processor
  • Sort Processor
  • Trim Processor
  • Uppercase Processor
  • Dot Expander Processor

实例参考文档Elasticsearch Pipeline 详解_自知自省的博客-CSDN博客_es的pipeline

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0429s