Logstash 学习使用 一、概要 1.1、官网地址 https://www.elastic.co/cn/logstash/ 1.2、介绍 Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。也就是一个采集-------\> 过滤------\> 输出的过程,从哪里采集,采集后到哪里去,中间要不要过滤掉一下东西。 1.3、input输入 input 就是Logstash的入口,数据首先需要经过这一步。它支持采集很多输入选择,比如syslog、redis、file、es、exec、log4j、jdbc、kafka等。 1.4、filter过滤 filter是个负责筛选的部分,数据从源传输到输出的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式。它支持很多过滤库,如下: 过滤器库 说明 mutate 对字段的处理,比如添加字段、对字段取值和赋值、切割或者合并 json 向json里添加或者删除字段 data 对时间格式的转化等 grok 由单个简单的正则组合,对一大段文字进行切割并映射到关键字的工具 1.5、输出 Elasticsearch 是首选输出方向,但也支持很多选择(与输入支持的库差不多),可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。 1.6、应用场景 Logstash最常用于ELK (elasticsearch + logstash + kibane)中作为日志收集器使用。可同步mysql数据到es等。这里我的案例是订阅kafka到es的过称。 二、安装 2.1、下载地址 https://www.elastic.co/cn/downloads/logstash 2.2、启动 logstash是基于JDK的,所以必须先安装Jdk环境 在安装目录下执行命令./bin/logstash,但一般需要指定配置文件,例如: nohup ./bin/logstash -f conf.conf \& 参数 说明 举例 -e 使用命令行里的配置参数启动实例 ./bin/logstash -e 'input {stdin {}} output {stdout {}}' -f 指定启动实例的配置文件 ./bin/logstash -f test.conf -w 指定filter线程数量,默认线程数是5 -l 指定日志文件名称 ./bin/logstash-f test.conf -l logs/test.log 三、简单配置使用 3.1、简单配置(控制台输出,只要用于调试) 创建一个test.conf的文件 \`\`\`xml input{ stdin {} } output { stdout {} } \`\`\` 启动 \`\`\`bash ./bin/logstash -f ./test.conf \`\`\` 启动后,在控制台输入什么就会发现它message也输出内容,其他字段是它模版自带的 \`\`\`xml # 输入 sfghfghfgh # 输出 { "message" =\> "sfghfghfgh", "host" =\> "iZ2ze7b12m7993dl7fyi22Z", "@timestamp" =\> 2021-08-11T03:07:34.719Z, "@version" =\> "1" } \`\`\` 3.2、简单配置添加过滤器 在刚才的文件里加入一个过滤器,输入字母的小写转化为大写 我的心得:修改数据后不能使用新的字段接收,是重新赋值覆盖旧的数据而已。 \`\`\`xml input{ stdin {} } filter { mutate { uppercase =\> \[ "message" \] } } output { stdout { } } \`\`\` \`\`\`xml # 输入 asdf # 输出 { "message" =\> "ASDF", "host" =\> "iZ2ze7b12m7993dl7fyi22Z", "@timestamp" =\> 2021-08-11T03:07:34.719Z, "@version" =\> "1" } \`\`\` 3.3、读取文件 \`\`\`xml input{ file { # 收集的文件,多个用,隔开,用绝对路径 path =\> \[ "/data/test/\*.log"\] start_position =\> "end" } } filter { mutate { add_field =\> { "add_msg" =\> "%{\[message\]}" } } } output { stdout { } } \`\`\` start_position 代表logstash初次采集文件内容的位置,。取值:beginning或者end。end代表每次都是从文件尾部开始。add_field为添加输出字段 如果想输出到文件里,那么就是 \`\`\`xml output { file { path =\> "/data/test2/test.log" codec =\> line { format =\> "格式化后数据: %{message}"} } } \`\`\` 这是一个简单的一个输出,codec 定义输出内容格式,默认为json_lines(以json格式输出)。 3.4、从文件输出到es \`\`\`xml input{ file { # 收集的文件,多个用,隔开,用绝对路径 path =\> \[ "/data/test/\*.log"\] start_position =\> "end" } } output { elasticsearch { hosts =\> \["xxx:9200"\] index =\> "log-%{+YYYY-MM}" } } \`\`\` 3.5、与Redis、Kafka集成 \`\`\`xml input { kafka { #kafaka服务地址 bootstrap_servers =\> "xx:9092" #订阅的topic名 topics =\> \["logTopic"\] } } output { redis { host =\> "127.0.0.1" port =\> 6379 key =\> "test" # key的类型,list或者channel data_type =\> "list" } } \`\`\` 3.6、总结 前面讲的主要都是demo级别的,作为理解,然后在升级到实战,这过程需要查看官方文档 四、logstash集成es,自定义模版字段 4.1、准备工作 kafka,Elaseicsearch 和 logstash,es和logstash版本尽量保持一致。 4.2、logstash配置 这三个中,其他两个比较好处理,所以先解决logstash,直接放配置文件 \`\`\`json input { kafka { #kafaka服务地址 bootstrap_servers =\> "xxx:9092" topics =\> \["logTopic"\] } } filter { mutate{ # 将message内容用\|切割 split =\> \["message","\|"\] # 添加字段 add_field =\> {"moduleName" =\> "%{\[message\]\[0\]}"} add_field =\> {"value" =\> "%{\[message\]\[1\]}"} add_field =\> {"url" =\> "%{\[message\]\[2\]}"} add_field =\> {"param" =\> "%{\[message\]\[3\]}"} add_field =\> {"className" =\> "%{\[message\]\[4\]}"} add_field =\> {"methodName" =\> "%{\[message\]\[5\]}"} add_field =\> {"returnResult" =\> "%{\[message\]\[6\]}"} add_field =\> {"elapsedTime" =\> "%{\[message\]\[7\]}"} add_field =\> {"errorMsg" =\> "%{\[message\]\[8\]}"} remove_field =\> "message" remove_field =\> "@version" } } output { elasticsearch { hosts =\> \["xxx:9200"\] # 在es的索引 index =\> "log_record-%{+YYYY-MM}" } #打印,调试的时候用,默认为rubydebug stdout { codec =\> json } } \`\`\` 注意点就是message的内容,将内容推到fafka时就规定好格式,然后使用分隔符进行切割,赋值到其他字段上。 也可以使用其他方式,比如json解析的方式,logstash支持json解析赋值,这里我使用了分隔符切割的方式。 4.3、kafka内容 在推送到kafka上就规定格式使用 \| 来分割,因为到logstash订阅是我使用\|来分割。 4.4、elasticsearch 模版配置 是用logstash传输到es会使用默认的一套模版,具体是用哪个模版是更具索引名来确定的。 查看模版信息 \`\`\`xml #GET请求方式 es地址:9200/_template/logstash \`\`\` 添加新的模版 注意这里的索引名称和logstash在es创建的索引名要有联系才行。 \`\`\`xml # PUT请求方式 , log_record 为模版名称 http://123.56.97.130:9200/_template/log_record \`\`\`xml \`\`\`xml { "template": "log_record\*", "order": 1, "version": 1, "index_patterns": \[ "\*" \], "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "properties": { "@timestamp": { "type": "date", "format": "dateOptionalTime", "doc_values": true }, "className": { "index": false, "type": "text" }, "createTime": { "format": "strict_date_optional_time\|\|epoch_millis", "type": "date", "index": "false" }, "elapsedTime": { "type": "long", "index": true }, "methodName": { "index": false, "type": "text" }, "moduleName": { "index": false, "type": "text" }, "param": { "search_analyzer": "ik_smart", "analyzer": "ik_max_word", "store": true, "type": "text", "index": "true", "fields": { "keyword": { "type": "keyword" } } }, "returnResult": { "type": "text", "search_analyzer": "ik_max_word", "analyzer": "ik_max_word", "index": true }, "url": { "type": "text", "index": false }, "value": { "search_analyzer": "ik_max_word", "analyzer": "ik_max_word", "store": true, "type": "text", "index": true }, "errorMsg": { "search_analyzer": "ik_max_word", "analyzer": "ik_max_word", "store": true, "type": "text", "index": true } } } } \`\`\`