前提
好久没有给博客除草了,正好最近折腾了下 logstash,记录一下。
为啥要用 logstash 呢,其实是因为在测试环境上面腾讯云 tke 的日志没有开启日志收集,所以在排查问题的时候会十分的痛苦,正好有空了就想着将日志抽出来放进 es 里面,方便以后排查问题,正好看到腾讯云的日志规则是允许将 pod 的 stdout 日志进行采集之后投递到 kafka 的,就小试了一下。
部署 logstash
logstash 我选择使用 docker-compose 来进行快速的部署。
以下是部署流程,参考自 deviantony/docker-elk 项目
创建目录
mkdir logstash/config logstash/pipeline -p
创建环境变量
路径 .env
ELASTIC_VERSION=8.7.1
LOGSTASH_INTERNAL_PASSWORD='changeme'
创建 Dockerfile
路径 logstasg/Dockerfile
ARG ELASTIC_VERSION
# https://www.docker.elastic.co/
FROM docker.elastic.co/logstash/logstash:${ELASTIC_VERSION}
配置文件
路径 logstash/config/logstash.yml
---
## Default Logstash configuration from Logstash base image.
## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml
#
http.host: 0.0.0.0
node.name: logstash
路径 logstash/pipeline/logstash.conf
input {
beats {
port => 5044
}
tcp {
port => 50000
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY-MM-dd}"
}
}
启动服务
version: '3.7'
services:
logstash:
build:
context: logstash/
args:
ELASTIC_VERSION: ${ELASTIC_VERSION}
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro,Z
ports:
- 5044:5044
- 50000:50000/tcp
- 50000:50000/udp
- 9600:9600
environment:
LS_JAVA_OPTS: -Xms256m -Xmx256m
LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
depends_on:
- elasticsearch
restart: unless-stopped
配置 logstash pipeline
配置 input
因为日志要从 kafka 读取,所以要在 input 块内声明一个新的数据源
input {
beats {
port => 5044
}
tcp {
port => 50000
}
kafka {
bootstrap_servers => "kafka 地址"
client_id => "test_logstash"
group_id => "test_logstash"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["tencent-tke-log"]
type => "bhy"
}
}
配置过滤器
由于腾讯云投递到kafka的数据不是可以直接用的,数据示例如下
{"@timestamp":1684203000.007603,"@filepath":"/var/log/tke-log-agent/log/stdout-containerd/xxxxxxx.log","log":"2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxxxxx\"}","kubernetes":{"pod_name":"xxxxx","namespace_name":"default","pod_id":"xxxxxx","labels":{"k8s-app":"xxxxx","pod-template-hash":"xxxxx","qcloud-app":"xxxxxxxx"},"annotations":{"qcloud-redeploy-timestamp":"1681975158658","tke.cloud.tencent.com/networks-status":"json"},"host":"xxxxx","container_name":"xxxx","docker_id":"xxxxx","container_hash":"xxxxxx","container_image":"xxxxxxxx"}}
这时候就要对数据进行预处理,因为有很多数据是不需要添加进入es里面的。使用了 logstags 的 filter 进行处理。
保存原始数据
这个过滤器添加了一个叫做 source 的字段,并且将 kafka 消息全部存储在里面
mutate {
add_field => { "source" => "%{[message]}" }
}
解析 json
使用 json 插件,对 message 字段的 json 字符串进行序列化,并且将序列化后的数据存储在 kafkajson 字段内,同时删除 message 字段
json {
source => "message"
remove_field => [
"message"
]
target => "kafkajson"
}
提取日志元数据
将日志中的一些定位数据提取到外层
mutate {
rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
}
使用正则表达式处理数据
腾讯云给出的数据会在服务自己打印的日志前方添加时间以及 ”stdout F“,所以要使用正则表达式将我们不需要的东西做下替换
2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxx\"}
mutate {
gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
}
提取服务打印的日志
这回终于可以将日志提取出来了
json {
source => "[kafkajson][log]"
remove_field => [
"kafkajson"
]
target => "data"
}
提取日志等级
最后一步,将日志的等级字段提取到外层,方便使用
mutate {
rename => { "[data][level]" => "level" }
}
完整 pipeline
input {
beats {
port => 5044
}
tcp {
port => 50000
}
kafka{
bootstrap_servers => "kafka 地址"
client_id => "test_logstash"
group_id => "test_logstash"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["tencent-tke-log"]
type => "bhy"
}
}
filter {
mutate {
add_field => { "source" => "%{[message]}" }
}
json {
source => "message"
remove_field => [
"message"
]
target => "kafkajson"
}
mutate {
rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
}
mutate {
gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
}
json {
source => "[kafkajson][log]"
remove_field => [
"kafkajson"
]
target => "data"
}
mutate {
rename => { "[data][level]" => "level" }
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY-MM-dd}"
}
}
特别鸣谢
在本次配置 logstash 的过程中,我自己搭建的、使用 openAPI API 的、基于 GPT3.5 语言模型的 telegram bot 给予了我很大的帮助。
以下是我与它的对话(对话内容经过处理)
johnpoint, [5/15/23 2:18 PM]
我想从 2023-05-15T13:51:00.004687384+08:00 stdout F {"level":"debug","time":"2023-05-15T13:51:00.002+0800","msg":"xxxxxxxx"} 这些数据里面使用logstash 提取出json数据,logstash 的pipeline该怎么写
openAI, [5/15/23 2:18 PM]
假设你的日志文件是以JSON格式输出的,你需要配置一个JSON解析器来解析日志中的JSON数据,并将其转换为可用的事件。以下是一个简单的Logstash配置文件:
```
input {
file {
path => "/path/to/your/logfile"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
}
```
在logstash的input部分,我们指定读取的文件的路径,以及从文件的开始位置开始读入数据。
在filter部分,我们使用`json`插件从日志中提取json对象。
最后,我们在output部分使用`rubydebug`进行输出。
运行这个Logstash的配置命令:
```
bin/logstash -f /path/to/your/config/file
```
这样,logstash就会从你指定的文件中读取数据,并把提取的JSON对象输出到控制台上。
johnpoint, [5/15/23 2:19 PM]
这个数据有json格式的数据,但是在json格式的数据前面还有一些脏数据,是否可以去除?
openAI, [5/15/23 2:19 PM]
可以,在使用 logstash 分析数据的时候,可以使用 grok 这个插件,通过正则表达式匹配出需要的数据。
以下是一个可能符合你提供的数据的 grok 表达式:
```
%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}
```
这个正则表达式的含义是匹配以 ISO8601 格式表示的时间戳(如 "2023-05-15T13:51:00.004687384+08:00")和以字符串 "stdout F " 结尾的内容,然后跟着一个 Json 格式的字符串。"%{GREEDYDATA:json_data}" 表示将这个字符串保存到一个名为 "json_data" 的字段中。
上述 grok 表达式可以在 Logstash 的配置文件中使用,例如:
```
input {
# 配置 logstash 接收数据的源
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}"
}
}
# 其他过滤器
}
output {
# 配置 logstash 输出数据的目标
}
```
这个配置文件中,通过 filter 下的 grok 插件匹配出了 timestamp 和 json_data 两个字段。从而可以使用 logstash 利用这些数据进行更进一步的操作,如输出到 Elasticsearch 或者 stdout。达到你想要的结果。
其中 `%{TIMESTAMP_ISO8601:timestamp}` 表示匹配 ISO8601 格式的时间戳,并将其保存到一个名为 "timestamp" 的字段中。`%{GREEDYDATA:json_data}` 表示匹配任意字符,可以匹配行末附加的不符合 Json 格式的内容。因为 grok 会一直匹配到行尾,所以匹配过程中会包括最后一个字段(即 json_data)中包含的所有内容。
johnpoint, [5/15/23 2:36 PM]
返回 Can't get text on a START_OBJECT at 1:8 是什么错误
openAI, [5/15/23 2:36 PM]
这个错误常见于尝试将 JSON 对象作为字符串进行解析的情况下,但是实际上得到的是 JSON 对象,而不是字符串。这意味着该对象的开头是一个“{”,而不是一个“"”。可以检查代码调用的函数或方法,以确保解析的字符串确实是一个字符串。另外,可以检查 JSON 对象的源代码,以确保它是按照预期格式进行编写的。
johnpoint, [5/15/23 2:53 PM]
[2023-05-15T06:52:48,511][WARN ][logstash.filters.json ][main][xxxxxx] Error parsing json {:source=>"json_data", :raw=>"{\\\"level\\\":\\\"debug\\\",\\\"time\\\":\\\"2023-05-15T14:52:47.664+0800\\\",\\\"msg\\\":\\\"xxxxxxxx\\\"}\", \"@timestamp\"=>0.1684133567674155e10}", :exception=>#<LogStash::Json::ParserError: Unexpected character ('\' (code 92)): was expecting double-quote to start field name
这行日志表示 logstash 有什么问题呢
openAI, [5/15/23 2:53 PM]
这行日志表示 Logstash 在解析输入数据 json_data 时遇到了编码问题,其中输入数据包含了 `\'` 等不符合 JSON 规范的字符,导致 JSON 解析器无法正确解析。具体来说,可能是输入数据中使用了反斜线对引号进行转义,而又没有正确地对反斜线本身进行转义。可以尝试对输入数据进行预处理,将不符合 JSON 规范的字符进行转义或删除,以便让 Logstash 能够正确解析数据。
johnpoint, [5/15/23 2:53 PM]
可以给出相对应 的 pipeline 配置吗
openAI, [5/15/23 2:53 PM]
好的,以下是一个简单的 Logstash Pipeline 配置,用于从输入中提取 `json_data` 字段,并过滤掉任何无效数据:
```conf
input {
# 在这里配置输入插件
stdin {}
}
filter {
# 从 json_data 字段提取 JSON 数据
json {
source => "json_data"
# 用于忽略不完整的 JSON 数据字符串
skip_on_invalid_json => true
}
# 过滤掉没有有效 JSON 数据的事件
if "_jsonparsefailure" in [tags] {
drop {}
}
}
output {
# 在这里配置输出插件
stdout {}
}
```
您需要根据自己的需求进行相应的修改和调整,比如将 `json_data` 改成实际的字段名,或者使用适合您的输入和输出插件。
johnpoint, [5/15/23 2:54 PM]
使用grok提取的json数据会多出不必要的转义反斜杠
openAI, [5/15/23 2:54 PM]
是的,这是因为 Grok 模式匹配时会将反斜杠视为转义字符,因此如果日志中原本就有反斜杠,会被额外转义。为了解决这个问题,可以在配置文件中使用 gsub 过滤器去除多余的反斜杠。示例配置如下:
```
input {
stdin { }
}
filter {
grok {
match => { "message" => '^\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{WORD:log_level}\] %{GREEDYDATA:json_data}$' }
}
mutate {
gsub => ["json_data", "\\\\", "\\"]
}
json {
source => "json_data"
}
}
output {
stdout { codec => rubydebug }
}
```
这个配置中首先使用 Grok 解析出日志中的 JSON 数据,并将其存储到 `json_data` 字段中。然后使用 `mutate` 过滤器中的 `gsub` 将多余的反斜杠替换掉,最后使用 `json` 过滤器将其转换成 JSON 格式的事件。这样就可以避免多出不必要的反斜杠了。
johnpoint, [5/15/23 3:00 PM]
报错了 Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \",\", \"]\" at line 36, column 34 (byte 672) after filter {\n\tjson {\n\t\tsource => \"message\"\n\t\tremove_field => [ \n\t\t\t\"message\"\n\t\t]\n\t\ttarget => \"kafkajson\"\n\t}\n\tgrok {\n\t\tmatch => {\"kafkajson\" => \"%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}\"}\n\t}\n\tmutate {\n\t\tgsub => [\"json_data\", \"\\\\\\\\\", \"", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in `block in converge_state'"]}
openAI, [5/15/23 3:00 PM]
这个错误是在 Logstash 配置文件中,第36行,第34列(672个字节)处,期望看到 `[ \t\r\n]", "#", "{", ",", "]"` 中的一个字符,但是实际遇到的字符无法按预期解析。这可能是因为语法错误或表达式不正确导致的。请检查您的配置文件并确保语法正确,并解决任何表达式错误。
另外,由于错误信息无法提供足够的上下文,您还可能需要检查其他部分的配置文件以确定是否存在其他错误。
johnpoint, [5/15/23 3:15 PM]
logstash 可以将一个字段的数据移动到另一个字段里面去吗
openAI, [5/15/23 3:15 PM]
可以的,可以使用logstash的filter插件中的mutate插件的rename选项来实现.
例如,如果数据来源的字段名为`source_field`,想要将其复制到目标字段`target_field`,可以使用以下配置:
```
filter {
mutate {
rename => { "source_field" => "target_field" }
}
}
```
这将把`source_field`的值复制到新字段`target_field`中,并删除`source_field`字段本身。
johnpoint, [5/15/23 3:19 PM]
如果我要保留这个字段呢
openAI, [5/15/23 3:19 PM]
可以使用 mutate 插件的 add_field 选项,用于在事件中创建新字段。
比如,如果您想要在 Logstash 事件中创建一个新字段 "new_field" 并从原始字段 "old_field" 中移动其值:
```
filter {
mutate {
add_field => { "new_field" => "%{[old_field]}" }
remove_field => [ "old_field" ]
}
}
```
这将将 "old_field" 中的值赋给新的 "new_field" 字段,并将删除 "old_field" 字段。 如果您想在保留源字段的同时移动值,可以将 "remove_field" 选项删除。
johnpoint, [5/15/23 3:21 PM]
logstash 删除自动创建 keyword 字段
openAI, [5/15/23 3:21 PM]
Logstash 中默认会为每个字符串类型的字段创建一个 `keyword` 类型的子字段,作为原始字段的精确值。如果你不需要这个子字段,可以在索引模板(template)中将它移除:在 `mappings` 中设置 `"include_in_all": false` 即可。但需要注意的是,使用该方法删除 `keyword` 字段会导致失去部分精确查询的能力。
如果你想要在日志流中保留 `keyword` 字段,可以使用 `remove_field` 过滤器来删除原始字段,但保留 `keyword` 字段:
```ruby
filter {
mutate {
remove_field => ["my_field"]
rename => { "my_field.keyword" => "new_field" }
}
}
```
上面的例子中,Logstash 会删除原始的 `my_field` 字段,并将精确值保存在 `my_field.keyword` 子字段中,最后使用 `rename` 过滤器,将 `my_field.keyword` 重命名为 `new_field`。这样,你就可以在 Elasticsearch 中使用 `new_field` 进行精确查询。
johnpoint, [5/15/23 4:18 PM]
logstash 为什么创建的 es index 没有每天滚动的名字
openAI, [5/15/23 4:18 PM]
logstash 创建 ES index 默认采用的是 Daily Index 的模式,每天一个 Index,并且附带日期的后缀。例如,一个名为 `logstash` 的索引将被转换成 `logstash-yyyy.mm.dd` 的名字,在写入数据时,会根据 UTC 时区来决定对应日期的具体值。如果没有滚动,一种可能是配置文件中未启用,还有一个可能是 Elasticsearch 集群配置文件设置了禁止创建新 index,可以检查一下 Elasticsearch 的配置文件中的相关设置。
johnpoint, [5/15/23 4:19 PM]
logstash 怎么开启 Daily Index 模式
openAI, [5/15/23 4:19 PM]
要开启 Daily Index 模式,需要修改 Logstash 的配置文件。在 `output` 配置中,可以使用 `index` 配置项来指定 Elasticsearch 索引的名称。为了让索引名称按日滚动,可使用日期格式占位符,例如:
```
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "your-index-name-%{+YYYY.MM.dd}"
}
}
```
在上述配置中,索引名称的格式为 `your-index-name-YYYY.MM.dd`,其中 `%{+YYYY.MM.dd}` 是一个日期格式占位符,表示当前日期。这样配置后,每天会自动创建一个新的索引。
还需注意的是,为了启用 Date Index 模式,需要在 Elasticsearch 的配置文件中将 `action.auto_create_index` 设置为 `true`。