Logstash的配置单独开一个,因为关于数据转换这块,其实内容比较多.也是Logstash存在的一个很重要的原因cuiyaonan2000@163.com
Codec PluginLogstash不只是一个input|filter|output
的数据流,而是一个input|decode|filter|encode|output
的数据流。
codec就是用来decode,encode 事件的。所以codec常用在input和output中常见的codec有:
-
plain:读取原始内容(如果在input或者output中不写codec,默认就是plain)
-
dots:将内容简化为点进行输出
-
rubydebug:将Logstash Events按照ruby格式输出,方便调试
-
line:处理带有换行符的内容
-
json:处理json格式的内容
-
multiline:处理多行数据的内容
input 以及 output 如果不设置codec,默认就是plain
所以无论input 或者 output都会放到message中cuiyaonan2000@163.com
结构如下:
{
"message": "{\"name\": \"hello\"}",
"@timestamp": "2022-08-15T02:31:41.487Z",
"host": "localhost.localdomain",
"@version": "1",
"path": "/soft/logstash-7.14.0/bin/mytestfile"
}
{
"message": "test input",
"@timestamp": "2022-08-15T02:32:11.683Z",
"host": "localhost.localdomain",
"@version": "1",
"path": "/soft/logstash-7.14.0/bin/mytestfile"
}
json插件
input {
file {
path => "/soft/logstash-7.14.0/bin/mytestfile"
start_position => "end"
codec => json
}
}
output {
file {
path => "/soft/logstash-7.14.0/bin/outputfile"
}
}
如上input 的codec设置成json,则如果input的内容为json则不会放到message中,如果不是json格式则会放到message中,且增加一个json转换失败的标记cuiyaonan2000@163.com
举两个栗子我们看一下,一个是正确的json,一个是错的json
#input错误的json格式
{
"tags": ["_jsonparsefailure"],
"@timestamp": "2022-08-15T02:43:33.616Z",
"@version": "1",
"message": "test input json",
"host": "localhost.localdomain",
"path": "/soft/logstash-7.14.0/bin/mytestfile"
}
#input正确的json格式
{
"@timestamp": "2022-08-15T02:46:27.375Z",
"name": "hello",
"host": "localhost.localdomain",
"@version": "1",
"path": "/soft/logstash-7.14.0/bin/mytestfile"
}
multiline插件
用于合并多行数据.
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。multiline插件用于解决此类问题。
multiline有以下选项,
- pattern:用于指定具体匹配模式,可使用正则表达式设置规则
- what:设置匹配的行合并到上一行或者下一行,previous表示合并到上一行,next表示合并到下一行
- 此外,还有一些常用选项,
- negate: 指定我们需要操作的行是pattern匹配的行,还是未匹配的行,默认值为false,表示操作pattern匹配的行,如果设置为true,则表示操作未匹配的行
- max_bytes: 指定模式匹配中能存储的最大字节数,默认10M,如果该值过大容易造成OOM
- max_lines: 和max_bytes类似,控制最大匹配行,默认500行
如果设置了multiline 如果要输出到output则需要在结束的文件内容中增加the end 否则multiline知道满足max_lines 或者 max_bytes的时候才会输出到output切记cuiyaonan2000@163.com
input {
file {
path => "/soft/logstash-7.14.0/bin/mytestfile"
start_position => "end"
codec => multiline {
pattern => "cuiyaonan2000@163.com \("
what => previous
negate => false
}
}
}
output {
file {
path => "/soft/logstash-7.14.0/bin/outputfile"
}
}
我们向日志文件中增加如下的4个内容
cuiyaonan2000@163.com (你今天快了么
cuiyaonan2000@163.com (我很快乐
cuiyaonan2000@163.com (大家都快乐
123132 (good luck
i am luck
输出结果是
{
"@timestamp": "2022-08-15T05:47:06.537Z",
"@version": "1",
"tags": ["multiline"],
"path": "/soft/logstash-7.14.0/bin/mytestfile",
"host": "localhost.localdomain",
"message": "the end\ncuiyaonan2000@163.com (你今天快了么\ncuiyaonan2000@163.com (我很快乐\ncuiyaonan2000@163.com (大家都快乐"
} {
"@timestamp": "2022-08-15T05:47:06.537Z",
"host": "localhost.localdomain",
"message": "123132 (good luck",
"@version": "1",
"path": "/soft/logstash-7.14.0/bin/mytestfile"
}
再次输入
echo "the end" >> mytestfile
显示最后一条:
{
"@timestamp": "2022-08-15T05:48:58.017Z",
"host": "localhost.localdomain",
"message": "the end",
"@version": "1",
"path": "/soft/logstash-7.14.0/bin/mytestfile"
}
Input 标准输入
格式:
input { stdin{ } }
从HTTP输入格式:
input { http { port => 端口号 } }从TCP输入
input { tcp { mode => "server" host => "0.0.0.0" port => 端口号 codec => json_lines } }接收Beats输入
只需要监听一个端口就可以了
input { beats { port => 5044 } }
从File输入input { file { path => "/var/log/test.log"
start_position => "end"
} }
Output 输出到控制台多用于调试
output { stdout { codec => rubydebug } }输出到文件
实现将分散在多地的文件统一到一处的需求,比如将所有web机器的web日志收集到1个文件中,从而方便查阅信息
output { file { path => "文件路径" codec => line {format => %{message}} } }输出到Elasticsearch
output { elasticsearch { #es的地址,可以用逗号分隔 hosts => ["http://192.168.3.12:9200"] #创建索引的名称,这里的yyyy.mm都是@timestamp 获取的 index => "logstash-%{+YYYY.MM.dd}" #定义es索引的type,一般你应该让同一种类型的日志存到同一种type中,比如debug日志和error日志存到不同的type中.如果不设置默认type为logs document_type => "_doc" user => "用户名" password => "密码" } }Filter grok
grok能够将非结构化的信息,转化为格式化的信息.它是通过正则表达式,把匹配到的信息 转换成json中的key和value,其中正则表达式中需要包含grok的自己的语法来进行转换cuiyaonan2000@163.com
grok的语法是:
%{语法:语义}
举个栗子:
filter {
grok {
match => {
"message" => "%{IPV4:ip}\ \[%{HTTPDATE:timestamp}\]"
}
}
}
如上:
- 第一个\ 表示匹配一个空格
- 第二个\[ 表示转义字符[
- 第三个\] 表示转义字符]
- %{IPV4:ip} : IPV4 表示正则中的正则,用于匹配ip地址,后面的ip表示放入格式化数据的key
- %{HTTPDATE:timestamp} 同上timestamp表示格式化数据的key.
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?(?"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
# URN, allowing use of RFC 2141 section 2.3 reserved characters
URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+
# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?! "/soft/logstash-7.14.0/bin/mytestfile"
start_position => "end"
codec => json
}
}
filter {
grok {
match => [
"message", "^%{DATA:date}\ %{DATA:time}\ "
]
}
}
output {
file {
path => "/soft/logstash-7.14.0/bin/outputfile"
}
}
结果如下所示
{
"@timestamp": "2022-08-15T08:00:32.015Z",
"time": "10:03:38.255",
"@version": "1",
"tags": ["_jsonparsefailure"],
"path": "/soft/logstash-7.14.0/bin/mytestfile",
"host": "localhost.localdomain",
"message": "2022-08-11 10:03:38.255 [] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [335] - Bean 'spring.cloud.alibaba.seata-com.alibaba.cloud.seata.SeataProperties' of type [com.alibaba.cloud.seata.SeataProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
"date": "2022-08-11"
}
自定义正则匹配
input {
file {
path => "/soft/logstash-7.14.0/bin/mytestfile"
start_position => "end"
codec => json
}
}
filter{
grok {
match => [
"message", "(?^\d{4}\-\d{2}\-\d{2}\ \d{2}\:\d{2}\:\d{2}\.\d{3})"
]
}
}
output {
file {
path => "/soft/logstash-7.14.0/bin/outputfile"
}
}
展示结果
{
"@timestamp": "2022-08-15T08:20:15.404Z",
"@version": "1",
"tags": ["_jsonparsefailure"],
"path": "/soft/logstash-7.14.0/bin/mytestfile",
"host": "localhost.localdomain",
"message": "2022-08-11 10:03:38.255 [] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [335] - Bean 'spring.cloud.alibaba.seata-com.alibaba.cloud.seata.SeataProperties' of type [com.alibaba.cloud.seata.SeataProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
"mytime": "2022-08-11 10:03:38.255"
}
Date
顾明思议就是进行处理的插件
input {
file {
path => "/soft/logstash-7.14.0/bin/mytestfile"
start_position => "end"
codec => json
}
}
filter {
grok {
match => [
"message", "(?^\d{4}\-\d{2}\-\d{2}\ \d{2}\:\d{2}\:\d{2}\.\d{3})"
]
}
date {
match => ["mytime", "yyyy-MM-dd HH:mm:ss.SSS"]
}
}
output {
file {
path => "/soft/logstash-7.14.0/bin/outputfile"
}
}
如上date的match 中过滤mytime字段,并将满足yyyy-MM-dd HH:mm:ss.SSS格式的记录的值 覆盖@timestamp.
注意这是是默认覆盖@timestam,也可以指定覆盖其它,只需要增加一个target,如下所示
date {
match => ["mytime","yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
}
结果如下所示:
{
"@timestamp": "2022-08-11T02:03:38.255Z",
"@version": "1",
"tags": ["_jsonparsefailure"],
"path": "/soft/logstash-7.14.0/bin/mytestfile",
"host": "localhost.localdomain",
"message": "2022-08-11 10:03:38.255 [] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [335] - Bean 'spring.cloud.alibaba.seata-com.alibaba.cloud.seata.SeataProperties' of type [com.alibaba.cloud.seata.SeataProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
"mytime": "2022-08-11 10:03:38.255"
}
mutate
当我们使用logstash
从外部读取到数据后,默认情况下读取到的值都是string
的类型,假设我们这个时候需要修改字段值的类型,如果从string
修改成integer
,或者删除字段、修改字段的名字、给字段一个默认值等操作时,这个时候我们就可以借助 mutate
filter 来实现
filter {
mutate {
coerce => {
"default_value" => "该字段没有值,设置一个默认值"
}
rename => {
"user_real_name" => "[user][real_name]"
"user_english_name" => "[user][english_name]"
"age" => "年龄"
}
# 1、更新字段的值
update => {
"user_address" => "用户的地址是: %{address}"
}
# 1、更新字段的值
replace => {
"user_address" => "用户的地址是: %{address}"
}
# 1、数据类型转换
convert => {
"age" => "string"
}
# 1、替换字段的内容, 第二个参数可以写正则表达式 ,替换的字段 只能是 string 类型或者 string类型的数组
gsub => [
"address", ";", "--"
]
# 去除首尾空格
strip => ["strip_blank"]
# 移除字段 ,如果 Event 中 username 的值是 zhangsan ,那么会移除字段名是 foo_zhangsan 这个字段。
remove_field => ["user_real_name","foo_%{username}"]
}
}
更多用法参考:https://www.jianshu.com/p/266352af1f81