前提

好久没有给博客除草了,正好最近折腾了下 logstash,记录一下。

为啥要用 logstash 呢,其实是因为在测试环境上面腾讯云 tke 的日志没有开启日志收集,所以在排查问题的时候会十分的痛苦,正好有空了就想着将日志抽出来放进 es 里面,方便以后排查问题,正好看到腾讯云的日志规则是允许将 pod 的 stdout 日志进行采集之后投递到 kafka 的,就小试了一下。

部署 logstash

logstash 我选择使用 docker-compose 来进行快速的部署。

以下是部署流程,参考自 deviantony/docker-elk 项目

创建目录

mkdir logstash/config logstash/pipeline -p

创建环境变量

路径 .env

ELASTIC_VERSION=8.7.1
LOGSTASH_INTERNAL_PASSWORD='changeme'

创建 Dockerfile

路径 logstasg/Dockerfile

ARG ELASTIC_VERSION

# https://www.docker.elastic.co/
FROM docker.elastic.co/logstash/logstash:${ELASTIC_VERSION}

配置文件

路径 logstash/config/logstash.yml

---
## Default Logstash configuration from Logstash base image.
## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml
#
http.host: 0.0.0.0

node.name: logstash

路径 logstash/pipeline/logstash.conf

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}
}

## Add your filters / logstash plugins configuration here

output {
	elasticsearch {
		hosts => "elasticsearch:9200"
		user => "logstash_internal"
		password => "${LOGSTASH_INTERNAL_PASSWORD}"
		index => "logstash-%{+YYYY-MM-dd}"
	}
}

启动服务

version: '3.7'

services:
  logstash:
    build:
      context: logstash/
      args:
        ELASTIC_VERSION: ${ELASTIC_VERSION}
    volumes:
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro,Z
    ports:
      - 5044:5044
      - 50000:50000/tcp
      - 50000:50000/udp
      - 9600:9600
    environment:
      LS_JAVA_OPTS: -Xms256m -Xmx256m
      LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
    depends_on:
      - elasticsearch
    restart: unless-stopped

配置 logstash pipeline

配置 input

因为日志要从 kafka 读取,所以要在 input 块内声明一个新的数据源

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}

	kafka {
		bootstrap_servers => "kafka 地址"
		client_id => "test_logstash"
		group_id => "test_logstash"
		auto_offset_reset => "latest" 
		consumer_threads => 1
		decorate_events => true 
		topics => ["tencent-tke-log"]
		type => "bhy"
	}
}

配置过滤器

由于腾讯云投递到kafka的数据不是可以直接用的,数据示例如下

{"@timestamp":1684203000.007603,"@filepath":"/var/log/tke-log-agent/log/stdout-containerd/xxxxxxx.log","log":"2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxxxxx\"}","kubernetes":{"pod_name":"xxxxx","namespace_name":"default","pod_id":"xxxxxx","labels":{"k8s-app":"xxxxx","pod-template-hash":"xxxxx","qcloud-app":"xxxxxxxx"},"annotations":{"qcloud-redeploy-timestamp":"1681975158658","tke.cloud.tencent.com/networks-status":"json"},"host":"xxxxx","container_name":"xxxx","docker_id":"xxxxx","container_hash":"xxxxxx","container_image":"xxxxxxxx"}}

这时候就要对数据进行预处理,因为有很多数据是不需要添加进入es里面的。使用了 logstags 的 filter 进行处理。

保存原始数据

这个过滤器添加了一个叫做 source 的字段,并且将 kafka 消息全部存储在里面

	mutate {
		add_field => { "source" => "%{[message]}" }
  	}

解析 json

使用 json 插件,对 message 字段的 json 字符串进行序列化,并且将序列化后的数据存储在 kafkajson 字段内,同时删除 message 字段

	json {
		source => "message"
		remove_field => [ 
			"message"
		]
		target => "kafkajson"
	}

提取日志元数据

将日志中的一些定位数据提取到外层

	mutate {
    	rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
    	rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
    	rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
    	rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
  	}

使用正则表达式处理数据

腾讯云给出的数据会在服务自己打印的日志前方添加时间以及 ”stdout F“,所以要使用正则表达式将我们不需要的东西做下替换

2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxx\"}
	mutate {
		gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
	}

提取服务打印的日志

这回终于可以将日志提取出来了

	json {
		source => "[kafkajson][log]"
		remove_field => [ 
			"kafkajson"
		]
		target => "data"
	}

提取日志等级

最后一步,将日志的等级字段提取到外层,方便使用

	mutate {
    	rename => { "[data][level]" => "level" }
  	}

完整 pipeline

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}

	kafka{
		bootstrap_servers => "kafka 地址"
		client_id => "test_logstash"
		group_id => "test_logstash"
		auto_offset_reset => "latest" 
		consumer_threads => 1
		decorate_events => true 
		topics => ["tencent-tke-log"]
		type => "bhy"
	}
}

filter {
	mutate {
		add_field => { "source" => "%{[message]}" }
  	}
	json {
		source => "message"
		remove_field => [ 
			"message"
		]
		target => "kafkajson"
	}
	mutate {
    	rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
    	rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
    	rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
    	rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
  	}
	mutate {
		gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
	}
	json {
		source => "[kafkajson][log]"
		remove_field => [ 
			"kafkajson"
		]
		target => "data"
	}
	mutate {
    	rename => { "[data][level]" => "level" }
  	}
}

## Add your filters / logstash plugins configuration here

output {
	elasticsearch {
		hosts => "elasticsearch:9200"
		user => "logstash_internal"
		password => "${LOGSTASH_INTERNAL_PASSWORD}"
		index => "logstash-%{+YYYY-MM-dd}"
	}
}

特别鸣谢

在本次配置 logstash 的过程中,我自己搭建的、使用 openAPI API 的、基于 GPT3.5 语言模型的 telegram bot 给予了我很大的帮助。

以下是我与它的对话(对话内容经过处理)

johnpoint, [5/15/23 2:18PM]
我想从 2023-05-15T13:51:00.004687384+08:00 stdout F {"level":"debug","time":"2023-05-15T13:51:00.002+0800","msg":"xxxxxxxx"} 这些数据里面使用logstash 提取出json数据logstash pipeline该怎么写

openAI, [5/15/23 2:18PM]
假设你的日志文件是以JSON格式输出的,你需要配置一个JSON解析器来解析日志中的JSON数据,并将其转换为可用的事件。以下是一个简单的Logstash配置文件

```
input {
  file {
    path => "/path/to/your/logfile"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  stdout {
      codec => rubydebug
  }
}
```

logstash的input部分,我们指定读取的文件的路径,以及从文件的开始位置开始读入数据。
filter部分,我们使用`json`插件从日志中提取json对象
最后,我们在output部分使用`rubydebug`进行输出。

运行这个Logstash的配置命令

```
bin/logstash -f /path/to/your/config/file
```

这样,logstash就会从你指定的文件中读取数据,并把提取的JSON对象输出到控制台上

johnpoint, [5/15/23 2:19PM]
这个数据有json格式的数据,但是在json格式的数据前面还有一些脏数据,是否可以去除?

openAI, [5/15/23 2:19PM]
可以,在使用 logstash 分析数据的时候,可以使用 grok 这个插件,通过正则表达式匹配出需要的数据。

以下是一个可能符合你提供的数据的 grok 表达式:

```
%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}
```

这个正则表达式的含义是匹配以 ISO8601 格式表示的时间戳(如 "2023-05-15T13:51:00.004687384+08:00")和以字符串 "stdout F " 结尾的内容,然后跟着一个 Json 格式的字符串。"%{GREEDYDATA:json_data}" 表示将这个字符串保存到一个名为 "json_data" 的字段中。

上述 grok 表达式可以在 Logstash 的配置文件中使用,例如:

```
input {
  # 配置 logstash 接收数据的源
}

filter {
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}"
    }
  }
  # 其他过滤器
}

output {
  # 配置 logstash 输出数据的目标
}
```

这个配置文件中,通过 filter 下的 grok 插件匹配出了 timestamp  json_data 两个字段。从而可以使用 logstash 利用这些数据进行更进一步的操作,如输出到 Elasticsearch 或者 stdout。达到你想要的结果。

其中 `%{TIMESTAMP_ISO8601:timestamp}` 表示匹配 ISO8601 格式的时间戳,并将其保存到一个名为 "timestamp" 的字段中。`%{GREEDYDATA:json_data}` 表示匹配任意字符,可以匹配行末附加的不符合 Json 格式的内容。因为 grok 会一直匹配到行尾,所以匹配过程中会包括最后一个字段(即 json_data)中包含的所有内容。

johnpoint, [5/15/23 2:36PM]
返回 Can't get text on a START_OBJECT at 1:8 是什么错误

openAI, [5/15/23 2:36 PM]
这个错误常见于尝试将 JSON 对象作为字符串进行解析的情况下,但是实际上得到的是 JSON 对象,而不是字符串。这意味着该对象的开头是一个“{”,而不是一个“"”。可以检查代码调用的函数或方法,以确保解析的字符串确实是一个字符串。另外,可以检查 JSON 对象的源代码,以确保它是按照预期格式进行编写的。

johnpoint, [5/15/23 2:53 PM]
[2023-05-15T06:52:48,511][WARN ][logstash.filters.json    ][main][xxxxxx] Error parsing json {:source=>"json_data", :raw=>"{\\\"level\\\":\\\"debug\\\",\\\"time\\\":\\\"2023-05-15T14:52:47.664+0800\\\",\\\"msg\\\":\\\"xxxxxxxx\\\"}\", \"@timestamp\"=>0.1684133567674155e10}", :exception=>#<LogStash::Json::ParserError: Unexpected character ('\' (code 92)): was expecting double-quote to start field name
这行日志表示 logstash 有什么问题呢

openAI, [5/15/23 2:53 PM]
这行日志表示 Logstash 在解析输入数据 json_data 时遇到了编码问题,其中输入数据包含了 `\'` 等不符合 JSON 规范的字符,导致 JSON 解析器无法正确解析。具体来说,可能是输入数据中使用了反斜线对引号进行转义,而又没有正确地对反斜线本身进行转义。可以尝试对输入数据进行预处理,将不符合 JSON 规范的字符进行转义或删除,以便让 Logstash 能够正确解析数据。

johnpoint, [5/15/23 2:53PM]
可以给出相对应  pipeline 配置吗

openAI, [5/15/23 2:53PM]
好的,以下是一个简单的 Logstash Pipeline 配置,用于从输入中提取 `json_data` 字段,并过滤掉任何无效数据:

```conf
input {
  # 在这里配置输入插件
  stdin {}
}

filter {
  # 从 json_data 字段提取 JSON 数据
  json {
    source => "json_data"
    # 用于忽略不完整的 JSON 数据字符串
    skip_on_invalid_json => true
  }

  # 过滤掉没有有效 JSON 数据的事件
  if "_jsonparsefailure" in [tags] {
    drop {}
  }
}

output {
  # 在这里配置输出插件
  stdout {}
}
```

您需要根据自己的需求进行相应的修改和调整,比如将 `json_data` 改成实际的字段名,或者使用适合您的输入和输出插件。

johnpoint, [5/15/23 2:54PM]
使用grok提取的json数据会多出不必要的转义反斜杠

openAI, [5/15/23 2:54PM]
是的,这是因为 Grok 模式匹配时会将反斜杠视为转义字符,因此如果日志中原本就有反斜杠,会被额外转义。为了解决这个问题,可以在配置文件中使用 gsub 过滤器去除多余的反斜杠。示例配置如下:

```
input {
  stdin { }
}

filter {
  grok {
    match => { "message" => '^\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{WORD:log_level}\] %{GREEDYDATA:json_data}$' }
  }
  mutate {
    gsub => ["json_data", "\\\\", "\\"]
  }
  json {
    source => "json_data"
  }
}

output {
  stdout { codec => rubydebug }
}
```

这个配置中首先使用 Grok 解析出日志中的 JSON 数据,并将其存储到 `json_data` 字段中。然后使用 `mutate` 过滤器中的 `gsub` 将多余的反斜杠替换掉,最后使用 `json` 过滤器将其转换成 JSON 格式的事件。这样就可以避免多出不必要的反斜杠了。

johnpoint, [5/15/23 3:00PM]
报错了 Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \",\", \"]\" at line 36, column 34 (byte 672) after filter {\n\tjson {\n\t\tsource => \"message\"\n\t\tremove_field => [ \n\t\t\t\"message\"\n\t\t]\n\t\ttarget => \"kafkajson\"\n\t}\n\tgrok {\n\t\tmatch => {\"kafkajson\" => \"%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}\"}\n\t}\n\tmutate {\n\t\tgsub => [\"json_data\", \"\\\\\\\\\", \"", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in `block in converge_state'"]}

openAI, [5/15/23 3:00PM]
这个错误是在 Logstash 配置文件中,第36行,第34列(672个字节)处,期望看到 `[ \t\r\n]", "#", "{", ",", "]"` 中的一个字符,但是实际遇到的字符无法按预期解析。这可能是因为语法错误或表达式不正确导致的。请检查您的配置文件并确保语法正确,并解决任何表达式错误。

另外,由于错误信息无法提供足够的上下文,您还可能需要检查其他部分的配置文件以确定是否存在其他错误。

johnpoint, [5/15/23 3:15PM]
logstash 可以将一个字段的数据移动到另一个字段里面去吗

openAI, [5/15/23 3:15PM]
可以的,可以使用logstash的filter插件中的mutate插件的rename选项来实现.

例如,如果数据来源的字段名为`source_field`,想要将其复制到目标字段`target_field`,可以使用以下配置:

```
filter {
  mutate {
    rename => { "source_field" => "target_field" }
  }
}
```

这将把`source_field`的值复制到新字段`target_field`中,并删除`source_field`字段本身。

johnpoint, [5/15/23 3:19PM]
如果我要保留这个字段呢

openAI, [5/15/23 3:19PM]
可以使用 mutate 插件的 add_field 选项,用于在事件中创建新字段。

比如,如果您想要在 Logstash 事件中创建一个新字段 "new_field" 并从原始字段 "old_field" 中移动其值:

```
filter {
   mutate {
     add_field => { "new_field" => "%{[old_field]}" }
     remove_field => [ "old_field" ]
   }
}
```

这将将 "old_field" 中的值赋给新的 "new_field" 字段,并将删除 "old_field" 字段。 如果您想在保留源字段的同时移动值,可以将 "remove_field" 选项删除。

johnpoint, [5/15/23 3:21PM]
logstash 删除自动创建 keyword 字段

openAI, [5/15/23 3:21PM]
Logstash 中默认会为每个字符串类型的字段创建一个 `keyword` 类型的子字段,作为原始字段的精确值。如果你不需要这个子字段,可以在索引模板(template)中将它移除:在 `mappings` 中设置 `"include_in_all": false` 即可。但需要注意的是,使用该方法删除 `keyword` 字段会导致失去部分精确查询的能力。

如果你想要在日志流中保留 `keyword` 字段,可以使用 `remove_field` 过滤器来删除原始字段,但保留 `keyword` 字段: 

```ruby
filter {
  mutate {
    remove_field => ["my_field"]
    rename => { "my_field.keyword" => "new_field" }
  }
}
```

上面的例子中,Logstash 会删除原始的 `my_field` 字段,并将精确值保存在 `my_field.keyword` 子字段中,最后使用 `rename` 过滤器,将 `my_field.keyword` 重命名为 `new_field`。这样,你就可以在 Elasticsearch 中使用 `new_field` 进行精确查询。

johnpoint, [5/15/23 4:18PM]
logstash 为什么创建的 es index 没有每天滚动的名字

openAI, [5/15/23 4:18PM]
logstash 创建 ES index 默认采用的是 Daily Index 的模式,每天一个 Index,并且附带日期的后缀。例如,一个名为 `logstash` 的索引将被转换成 `logstash-yyyy.mm.dd` 的名字,在写入数据时,会根据 UTC 时区来决定对应日期的具体值。如果没有滚动,一种可能是配置文件中未启用,还有一个可能是 Elasticsearch 集群配置文件设置了禁止创建新 index,可以检查一下 Elasticsearch 的配置文件中的相关设置。

johnpoint, [5/15/23 4:19PM]
logstash 怎么开启 Daily Index 模式

openAI, [5/15/23 4:19PM]
要开启 Daily Index 模式,需要修改 Logstash 的配置文件。在 `output` 配置中,可以使用 `index` 配置项来指定 Elasticsearch 索引的名称。为了让索引名称按日滚动,可使用日期格式占位符,例如:

```
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "your-index-name-%{+YYYY.MM.dd}"
  }
}
```

在上述配置中,索引名称的格式为 `your-index-name-YYYY.MM.dd`,其中 `%{+YYYY.MM.dd}` 是一个日期格式占位符,表示当前日期。这样配置后,每天会自动创建一个新的索引。

还需注意的是,为了启用 Date Index 模式,需要在 Elasticsearch 的配置文件中将 `action.auto_create_index` 设置为 `true`

参考文档