Logstash的配置单独开一个,因为关于数据转换这块,其实内容比较多.也是Logstash存在的一个很重要的原因cuiyaonan2000@163.com
Codec PluginLogstash不只是一个input|filter|output的数据流,而是一个input|decode|filter|encode|output的数据流。
codec就是用来decode,encode 事件的。所以codec常用在input和output中常见的codec有:
-
plain:读取原始内容(如果在input或者output中不写codec,默认就是plain)
-
dots:将内容简化为点进行输出
-
rubydebug:将Logstash Events按照ruby格式输出,方便调试
-
line:处理带有换行符的内容
-
json:处理json格式的内容
-
multiline:处理多行数据的内容
input 以及 output 如果不设置codec,默认就是plain
所以无论input 或者 output都会放到message中cuiyaonan2000@163.com
结构如下:
{ "message": "{\"name\": \"hello\"}", "@timestamp": "2022-08-15T02:31:41.487Z", "host": "localhost.localdomain", "@version": "1", "path": "/soft/logstash-7.14.0/bin/mytestfile" } { "message": "test input", "@timestamp": "2022-08-15T02:32:11.683Z", "host": "localhost.localdomain", "@version": "1", "path": "/soft/logstash-7.14.0/bin/mytestfile" }json插件
input { file { path => "/soft/logstash-7.14.0/bin/mytestfile" start_position => "end" codec => json } } output { file { path => "/soft/logstash-7.14.0/bin/outputfile" } }
如上input 的codec设置成json,则如果input的内容为json则不会放到message中,如果不是json格式则会放到message中,且增加一个json转换失败的标记cuiyaonan2000@163.com
举两个栗子我们看一下,一个是正确的json,一个是错的json
#input错误的json格式 { "tags": ["_jsonparsefailure"], "@timestamp": "2022-08-15T02:43:33.616Z", "@version": "1", "message": "test input json", "host": "localhost.localdomain", "path": "/soft/logstash-7.14.0/bin/mytestfile" } #input正确的json格式 { "@timestamp": "2022-08-15T02:46:27.375Z", "name": "hello", "host": "localhost.localdomain", "@version": "1", "path": "/soft/logstash-7.14.0/bin/mytestfile" }multiline插件
用于合并多行数据.
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。multiline插件用于解决此类问题。
multiline有以下选项,
- pattern:用于指定具体匹配模式,可使用正则表达式设置规则
- what:设置匹配的行合并到上一行或者下一行,previous表示合并到上一行,next表示合并到下一行
- 此外,还有一些常用选项,
- negate: 指定我们需要操作的行是pattern匹配的行,还是未匹配的行,默认值为false,表示操作pattern匹配的行,如果设置为true,则表示操作未匹配的行
- max_bytes: 指定模式匹配中能存储的最大字节数,默认10M,如果该值过大容易造成OOM
- max_lines: 和max_bytes类似,控制最大匹配行,默认500行
如果设置了multiline 如果要输出到output则需要在结束的文件内容中增加the end 否则multiline知道满足max_lines 或者 max_bytes的时候才会输出到output切记cuiyaonan2000@163.com
input { file { path => "/soft/logstash-7.14.0/bin/mytestfile" start_position => "end" codec => multiline { pattern => "cuiyaonan2000@163.com \(" what => previous negate => false } } } output { file { path => "/soft/logstash-7.14.0/bin/outputfile" } }
我们向日志文件中增加如下的4个内容
cuiyaonan2000@163.com (你今天快了么 cuiyaonan2000@163.com (我很快乐 cuiyaonan2000@163.com (大家都快乐 123132 (good luck i am luck
输出结果是
{ "@timestamp": "2022-08-15T05:47:06.537Z", "@version": "1", "tags": ["multiline"], "path": "/soft/logstash-7.14.0/bin/mytestfile", "host": "localhost.localdomain", "message": "the end\ncuiyaonan2000@163.com (你今天快了么\ncuiyaonan2000@163.com (我很快乐\ncuiyaonan2000@163.com (大家都快乐" } { "@timestamp": "2022-08-15T05:47:06.537Z", "host": "localhost.localdomain", "message": "123132 (good luck", "@version": "1", "path": "/soft/logstash-7.14.0/bin/mytestfile" }
再次输入
echo "the end" >> mytestfile
显示最后一条:
{ "@timestamp": "2022-08-15T05:48:58.017Z", "host": "localhost.localdomain", "message": "the end", "@version": "1", "path": "/soft/logstash-7.14.0/bin/mytestfile" }
Input 标准输入
格式:
input { stdin{ } }
从HTTP输入格式:
input { http { port => 端口号 } }从TCP输入
input { tcp { mode => "server" host => "0.0.0.0" port => 端口号 codec => json_lines } }接收Beats输入
只需要监听一个端口就可以了
input { beats { port => 5044 } }
从File输入input { file { path => "/var/log/test.log"
start_position => "end"
} }
Output 输出到控制台多用于调试
output { stdout { codec => rubydebug } }输出到文件
实现将分散在多地的文件统一到一处的需求,比如将所有web机器的web日志收集到1个文件中,从而方便查阅信息
output { file { path => "文件路径" codec => line {format => %{message}} } }输出到Elasticsearch
output { elasticsearch { #es的地址,可以用逗号分隔 hosts => ["http://192.168.3.12:9200"] #创建索引的名称,这里的yyyy.mm都是@timestamp 获取的 index => "logstash-%{+YYYY.MM.dd}" #定义es索引的type,一般你应该让同一种类型的日志存到同一种type中,比如debug日志和error日志存到不同的type中.如果不设置默认type为logs document_type => "_doc" user => "用户名" password => "密码" } }Filter grok
grok能够将非结构化的信息,转化为格式化的信息.它是通过正则表达式,把匹配到的信息 转换成json中的key和value,其中正则表达式中需要包含grok的自己的语法来进行转换cuiyaonan2000@163.com
grok的语法是:
%{语法:语义}
举个栗子:
filter { grok { match => { "message" => "%{IPV4:ip}\ \[%{HTTPDATE:timestamp}\]" } } }
如上:
- 第一个\ 表示匹配一个空格
- 第二个\[ 表示转义字符[
- 第三个\] 表示转义字符]
- %{IPV4:ip} : IPV4 表示正则中的正则,用于匹配ip地址,后面的ip表示放入格式化数据的key
- %{HTTPDATE:timestamp} 同上timestamp表示格式化数据的key.
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+ EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME} INT (?:[+-]?(?:[0-9]+)) BASE10NUM (?[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) NUMBER (?:%{BASE10NUM}) BASE16NUM (?(?"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``)) UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} # URN, allowing use of RFC 2141 section 2.3 reserved characters URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+ # Networking MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}) CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2}) IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? IPV4 (?<[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uqlnua1t-1637751171341)(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])][.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9]) IP (?:%{IPV6}|%{IPV4}) HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b) IPORHOST (?:%{IP}|%{HOSTNAME}) HOSTPORT %{IPORHOST}:%{POSINT} # paths PATH (?:%{UNIXPATH}|%{WINPATH}) UNIXPATH (/([\w_%!$@:.,+~-]+|\\.)*)+ TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+)) WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+ URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+ URIHOST %{IPORHOST}(?::%{POSINT:port})? # uripath comes loosely from RFC1738, but mostly from what Firefox # doesn't turn into %XX URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+ #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]* URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})? # Months: January, Feb, 3, 03, 12, December MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b MONTHNUM (?:0?[1-9]|1[0-2]) MONTHNUM2 (?:0[1-9]|1[0-2]) MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) # Days: Monday, Tue, Thu, etc... DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?) # Years? YEAR (?>\d\d){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) # '60' is a leap second in most time standards and thus is valid. SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? DATE %{DATE_US}|%{DATE_EU} DATESTAMP %{DATE}[- ]%{TIME} TZ (?:[APMCE][SD]T|UTC) DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE} DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND} # Syslog Dates: Month Day HH:MM:SS SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} PROG [\x21-\x5a\x5c\x5e-\x7e]+ SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])? SYSLOGHOST %{IPORHOST} SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} # Shortcuts QS %{QUOTEDSTRING} # Log formats SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: # Log Levels LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
既然语法是正则,自然就可以自定义,具体参考官网:Grok filter plugin | Logstash Reference [8.3] | Elastic
最全的方法说明参考官网:Grok filter plugin | Logstash Reference [8.3] | Elastic
通是官方也给出了常用的几个方法cuiyaonan2000@163.com: 由此我们可以知道grok可以增加字段key,也可以覆盖原来内容的key内容.
如下所示,监察原有的日志行,并获取date和time
input { file { path => "/soft/logstash-7.14.0/bin/mytestfile" start_position => "end" codec => json } } filter { grok { match => [ "message", "^%{DATA:date}\ %{DATA:time}\ " ] } } output { file { path => "/soft/logstash-7.14.0/bin/outputfile" } }
结果如下所示
{ "@timestamp": "2022-08-15T08:00:32.015Z", "time": "10:03:38.255", "@version": "1", "tags": ["_jsonparsefailure"], "path": "/soft/logstash-7.14.0/bin/mytestfile", "host": "localhost.localdomain", "message": "2022-08-11 10:03:38.255 [] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [335] - Bean 'spring.cloud.alibaba.seata-com.alibaba.cloud.seata.SeataProperties' of type [com.alibaba.cloud.seata.SeataProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)", "date": "2022-08-11" }自定义正则匹配
input { file { path => "/soft/logstash-7.14.0/bin/mytestfile" start_position => "end" codec => json } } filter{ grok { match => [ "message", "(?^\d{4}\-\d{2}\-\d{2}\ \d{2}\:\d{2}\:\d{2}\.\d{3})" ] } } output { file { path => "/soft/logstash-7.14.0/bin/outputfile" } }
展示结果
{ "@timestamp": "2022-08-15T08:20:15.404Z", "@version": "1", "tags": ["_jsonparsefailure"], "path": "/soft/logstash-7.14.0/bin/mytestfile", "host": "localhost.localdomain", "message": "2022-08-11 10:03:38.255 [] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [335] - Bean 'spring.cloud.alibaba.seata-com.alibaba.cloud.seata.SeataProperties' of type [com.alibaba.cloud.seata.SeataProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)", "mytime": "2022-08-11 10:03:38.255" }Date
顾明思议就是进行处理的插件
input { file { path => "/soft/logstash-7.14.0/bin/mytestfile" start_position => "end" codec => json } } filter { grok { match => [ "message", "(?^\d{4}\-\d{2}\-\d{2}\ \d{2}\:\d{2}\:\d{2}\.\d{3})" ] } date { match => ["mytime", "yyyy-MM-dd HH:mm:ss.SSS"] } } output { file { path => "/soft/logstash-7.14.0/bin/outputfile" } }
如上date的match 中过滤mytime字段,并将满足yyyy-MM-dd HH:mm:ss.SSS格式的记录的值 覆盖@timestamp.
注意这是是默认覆盖@timestam,也可以指定覆盖其它,只需要增加一个target,如下所示
date { match => ["mytime","yyyy-MM-dd HH:mm:ss.SSS"] target => "@timestamp" }
结果如下所示:
{ "@timestamp": "2022-08-11T02:03:38.255Z", "@version": "1", "tags": ["_jsonparsefailure"], "path": "/soft/logstash-7.14.0/bin/mytestfile", "host": "localhost.localdomain", "message": "2022-08-11 10:03:38.255 [] INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [335] - Bean 'spring.cloud.alibaba.seata-com.alibaba.cloud.seata.SeataProperties' of type [com.alibaba.cloud.seata.SeataProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)", "mytime": "2022-08-11 10:03:38.255" }mutate
当我们使用logstash从外部读取到数据后,默认情况下读取到的值都是string的类型,假设我们这个时候需要修改字段值的类型,如果从string修改成integer,或者删除字段、修改字段的名字、给字段一个默认值等操作时,这个时候我们就可以借助mutatefilter 来实现
filter { mutate { coerce => { "default_value" => "该字段没有值,设置一个默认值" } rename => { "user_real_name" => "[user][real_name]" "user_english_name" => "[user][english_name]" "age" => "年龄" } # 1、更新字段的值 update => { "user_address" => "用户的地址是: %{address}" } # 1、更新字段的值 replace => { "user_address" => "用户的地址是: %{address}" } # 1、数据类型转换 convert => { "age" => "string" } # 1、替换字段的内容, 第二个参数可以写正则表达式 ,替换的字段 只能是 string 类型或者 string类型的数组 gsub => [ "address", ";", "--" ] # 去除首尾空格 strip => ["strip_blank"] # 移除字段 ,如果 Event 中 username 的值是 zhangsan ,那么会移除字段名是 foo_zhangsan 这个字段。 remove_field => ["user_real_name","foo_%{username}"] } }
更多用法参考:https://www.jianshu.com/p/266352af1f81