filebeat+kafka+logstash对日志进行统一归集管理

0. 环境架构

192.168.162.111                      logstash kafka
192.168.162.112                      filebeat

1. 配置yum源(所有服务器)

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

cat > /etc/yum.repos.d/elasticsearch.repo << EOF
[elasticsearch-7.x] 
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum 
gpgcheck=1 
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch 
enabled=1 
autorefresh=1 
type=rpm-md
EOF


2. 安装jdk(所有服务器)

yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y


3. 安装并配置filebeat

yum install filebeat -y

cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
        - /var/log/*.log

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 1

processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

tags: ["test1"]
output.kafka:
  hosts: ["192.168.162.111:9092"]
  topic: "get_logs"
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000



4. 安装并配置logstash

yum install logstash -y

mkdir /data/logs/
chown -R logstash:logstash /data/logs

cat /etc/logstash/conf.d/nginx.conf
input {
kafka {
  bootstrap_servers => "192.168.162.111:9092"
  group_id => "test"
  client_id => "test"
  auto_offset_reset => "latest"
  topics => ["get_logs"]
  codec => json { charset => "UTF-8" }
  }
}

filter {
  ruby {
    code => 'event.set("filename",event.get("[log][file][path]").split("/")[-1])'
  }
}

output {
  stdout { codec => rubydebug }    // 调试
  if "nginx" in [log][file][path] and "access" in [log][file][path] {
    file {
      path => "/data/logs/live-test-nginx/live-test_%{+YYYYMMdd}_%{filename}"
      flush_interval => 3
      codec => line { format => "%{[tags][0]} %{message}"}
    }
  }

  if "nginx" in [log][file][path] and "error" in [log][file][path] {
    file {
      path => "/data/logs/live-test-nginx/live-test_%{+YYYYMMdd}_%{filename}"
      flush_interval => 3
      codec => line { format => "%{[tags][0]} %{message}"}
    }
  }
}

5. 配置kafka和zookeeper

wget -c -P /tmp "http://apache.mirrors.hoobly.com/kafka/2.6.0/kafka_2.13-2.6.0.tgz"

cd /opt/ ; tar xvf /tmp/kafka_2.13-2.6.0.tgz
mv kafka_2.13-2.6.0/ kafka
vim kafka/config/server.properties(只配置下面两行,换成本机内网ip)
listeners=PLAINTEXT://192.168.162.111:9092
advertised.listeners=PLAINTEXT://192.168.162.111:9092

配置kafka systemd管理文件
cat /usr/lib/systemd/system/kafka.service
[Unit]
Description=Kafka server daemon

[Service]
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
KillMode=process
Restart=on-failure
RestartSec=42s

[Install]
WantedBy=multi-user.target

6. 配置zookeeper systemd管理文件

cat /usr/lib/systemd/system/zkp.service
[Unit]
Description=Zookeeper server daemon

[Service]
Type=simple
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
KillMode=process
Restart=on-failure
RestartSec=42s

[Install]
WantedBy=multi-user.target

systemctl daemon-reload
systemctl restart zkp kafka logstash
systemctl enable zkp kafka logstash

Snipaste_2020-09-23_18-19-08.png