logstash
它一個(gè)有jruby語言編寫的運(yùn)行在java虛擬機(jī)上的具有收集
分析轉(zhuǎn)發(fā)數(shù)據(jù)流功能的工具
- 能集中處理各種類型的數(shù)據(jù)
- 能標(biāo)準(zhǔn)化不通模式和格式的數(shù)據(jù)
- 能快速的擴(kuò)展自定義日志的格式
- 能非常方便的添加插件來自定義數(shù)據(jù)
安裝logstash
- 安裝jdk
- rpm包安裝
logstash運(yùn)行參數(shù)
- -f 制定配置文件抓督,目錄或者通配符加載配置信息
- -e 用于指定字符串輸入
- -w 指定filterworkers的數(shù)量,指定logstash工作線程數(shù)量
- -l 指定logstash默認(rèn)日志寫入文件中,默認(rèn)是控制臺(tái)輸出
- --quiet 靜默模式 僅僅只有error級(jí)別log輸出
- --verbose info級(jí)別的log輸出
- --debug debug級(jí)別日志的log輸出
- --V 查看logstash版本
- -p 可以寫自己的插件,然后指定好路徑使用她們
- -t 測(cè)試logstash讀取到的配置文件語法能否正常解析
配置語法
- input
- filter
- output
語法格式
- 區(qū)域
- 用{}定義區(qū)域
- 一個(gè)區(qū)域可以定義多個(gè)插件
- 數(shù)據(jù)類型
- boolen: 布爾 a => true
- Bytes: 字節(jié) a => "10MiB"
- Strings:字符串 a => "hello world"
- Number: 數(shù)值 a => 1024
- Array: 數(shù)組 match => ["datatime","UNIX","ISO8601"]
- Hash: 哈希 options => { key1 => "value1",key2 => "value2" }
- 編碼解碼: codec: codec => "json"
- 密碼型: my_passwd => "password"
- 路徑: my_path => "/tmp/logstash"
- 注釋: #
- 條件判斷
- ==,!= ,< ,> ,<= ,>=
- =~
- in,not in
- and ,or , nand, xor
- (), !()
- if expression {
} else if expression {
...
} else {
...
}
- 字段引用
- %{[response][status]}
logstash插件
- inputs 輸入
- codecs 解碼
- filters 過濾
- outputs 輸出
logstash inputs 配置
- stdin
- file
- tcp/udp
- rsyslog
- redis
- kafka
- beats
input {
stdin {
}
}
outpu {
stdout {
}
}
stdin
stdin {
add_field => { "a" => "b" }
codec => "json"
tags => "["a","b"]"
type => "my_type"
}
file
- close_older number No
- delimiter string No
- discover_interval number No
- exclude array No
- ignore_older number No
- max_open_files number No
- path array Yes
- sincedb_path string No
- sincedb_write_interval number No
- start_position string, one of ["beginning", "end"] No
- stat_interval number No
input {
file {
path => ["/var/log/nginx/access.log"]
type => "nginx-log"
start_position => 'beginning'
}
}
output {
stdout {}
}
tcp/udp
input {
tcp {
port => 9090
mode => "server"
ssl_enable => false
}
}
output {
stdout {}
}
nc 127.0.0.1:9090 < data
input {
udp {
host => "127.0.0.1"
port => 5050
}
}
output {
stdout {}
}
#python udp客戶端
import socket
port = 5050
host = "127.0.0.1"
file_input = raw_input("\033[32;1mPlease input: \033[0m")
s = socket.socket(socket.AF_INET,socket_SOCK_DGRAM)
s.sendto(file_input,(host,port))
rsyslog
input {
syslog {
host => "127.0.0.1"
type => "syslog"
port => 518
}
}
output {
stdout { }
}
###
vim /etc/rsyslog.conf
*.* @@127.0.0.1:518
###
logger 命令模擬發(fā)送日志
編碼
# plain
input {
stdin {
codec => 'plain'
}
}
output {
stdout { }
}
# json
input {
stdin {}
}
output {
stdout {
codec => "json"
}
}
#json_lines
input {
tcp {
port => 12345
host => '127.0.0.1'
codec => json_lines
}
}
output {
stdout { }
}
#rubydebug
input {
stdin {
codec => json
}
}
output {
stdout {
codec => rubydebug
}
}
multiline
input {
stdin {
codec => multiline {
charset => "" #字符編碼
max_bytes => #最大字節(jié)數(shù)
max_lines => #最大行數(shù),默認(rèn)500
multiline_tag => #設(shè)置一個(gè)事件標(biāo)簽揭北,默認(rèn)multiline
pattern => #string匹配規(guī)則
patterns_dir => #array多個(gè)匹配規(guī)則
negate => false #設(shè)置正向匹配還是反向匹配
what => next #匹配的內(nèi)容后,后面多行的日志是向前靠攏還是向后靠攏鸣哀,previous,next
}
}
}
input {
stdin {
codec = multiline {
pattern => "^\["
negate => true
what => previous
}
}
}
output {
stdout {
codec => rubydebug
}
}
logstash filter 配置
- json file
- grok file
- kv file
grok filter
kv filter
logstash output 配置
- file輸出
- tcp/udp方式輸出
- elasticsearch
- redis
- kafka
- hdfs
output file
output {
file {
path => "/root/access_result"
#message_format => "%{ip}"
#path => "/root/access_%{+YYYY.MM.DD}_%{host}.txt"
#gzip => true
}
stdout {
codec => rebydebug
}
}
output {
tcp {
codec => json_lines
host => "127.0.0.1"
port => "4050"
mode => "server"
}
}
output {
udp {
host => "127.0.0.1"
port => 4050
}
}
output {
elasticsearch {
host => "127.0.0.1"
protocol => "http"
index => "test_output-%{type}-%{+YYYY.MM.dd}"
document_type => "nginx"
workers => 5
}
}