Kafka + ELK 연동

모니터링 및 데이터를 수집하기위해 많이 사용하고있는 조합인 Kafka, ELK를 간단하게 사용해보면서
이해하기 위해 어플리케이션 로그를 수집해보겠습니다.

Kafka, ELK 간단한 개념정리

Kafka
확장성과 가용성이 높고, Pub/Sub 모델 지원하는 메시지 플랫폼

  • producer : 메시지 생산자(발행)
  • consumer : 메시지 소비자(구독)
  • broker : 카프카의 서버, 동일한 노드내에 여러개의 broker을 띄울수있다.
  • Topic : 메시지 분류 및 저장된곳

Elasticsearch 분산형 오픈 소스 검색 및 분석 엔진

  • index : 서로 관련되어 있는 문서들의 모음

Logstash 데이터를 집계하고 처리를 담당

  • pipeline : 데이터를 어디서 수집할지, 어디로 저장할지 설정
  • filter : pipeline의 input으로 집계한 데이터를 분석하고 변환

Kibana Elasticsearch에 저장된 데이터를 시각화해주는 UI 제공

Elasticsearch + Logstash + Kibana의 앞글자를 따서 ELK stack라고 칭합니다.

kafka_elk_1.webp

Docker를 이용한 설치

참고
Kafka, ELK를 각각 Docker compose로 구성하면 Kafka와 Logstash연동에 문제가 있을수 있습니다.
Docker network는 default bridge이며, 기본적으로 같은 네트워크로 묶인 컨테이너끼리 통신이 가능합니다.
docker network connect, 공용 외부 네트워크 생성으로 해결할수도 있으나, 여기서는 한번에 compose로 구성하였습니다.

  1. git clone https://github.com/deviantony/docker-elk.git
  2. cd docker-elk
  3. X-Pack 설정 제거(유료이기에 제거해 줍니다.)
    1. X-Pack… 하단의 설정값을 제거 또는 주석
      1. vi docker-elk/elasticsearch/config/elasticsearch.yml
      2. vi kibana/config/kibana.yml
      3. vi logstash/config/logstash.yml
  4. vi docker-compose-with-kafka.yml
  5. docker-compose-with-kafka.yaml 파일을 다음과 같이 수정
version: '3.2'

services:
  elasticsearch:
    build:
      context: elasticsearch/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./elasticsearch/config/elasticsearch.yml
        target: /usr/share/elasticsearch/config/elasticsearch.yml
        read_only: true
      - type: volume
        source: elasticsearch
        target: /usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      ES_JAVA_OPTS: "-Xmx256m -Xms256m"
      ELASTIC_PASSWORD: changeme
      # Use single node discovery in order to disable production mode and avoid bootstrap checks.
      # see: https://www.elastic.co/guide/en/elasticsearch/reference/current/bootstrap-checks.html
      discovery.type: single-node
    networks:
      - kafka-elk

  logstash:
    build:
      context: logstash/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./logstash/config/logstash.yml
        target: /usr/share/logstash/config/logstash.yml
        read_only: true
      - type: bind
        source: ./logstash/pipeline
        target: /usr/share/logstash/pipeline
        read_only: true
    ports:
      - "5044:5044"
      - "5000:5000/tcp"
      - "5000:5000/udp"
      - "9600:9600"
environment:
      LS_JAVA_OPTS: "-Xmx256m -Xms256m"
    networks:
      - kafka-elk
    depends_on:
      - elasticsearch

  kibana:
    build:
      context: kibana/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./kibana/config/kibana.yml
        target: /usr/share/kibana/config/kibana.yml
        read_only: true
    ports:
      - "5601:5601"
    networks:
      - kafka-elk
    depends_on:
      - elasticsearch

  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "9900:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka-elk

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "test-topic:1:1"Springboot
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - kafka-elk

networks:
  kafka-elk:
    driver: bridge

volumes:
  elasticsearch:
  1. docker-compose build && docker-compose up -d
  2. 접속확인
    1. Elasticsearch : localhost:9200
    2. Logstash : localhost:5000/9600
    3. Kibana : localhost:5601
  3. docker network ls 명령어를 입력하여 네트워크 목록확인
  4. docker network inspect {network name} 명령어로 해당 네트워크에 컨테이너가 모두 포함되었는지 확인 kafka_elk_2.webp

만약 네트워크에 컨테이너가 포함이 안되어있다면, docker-compose-with-kafka.yml설정 확인후
docker-compose build && docker-compose up -d 재수행

Docker kafka 설정에서
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 이처럼 설정하여 컨테이너 내부에선 kafka:29092 외부에선 localhost:9092접속하도록 설정

Logstash pipeline 추가

  1. cd docker-elk/logstash/pipeline
  2. vi kafka-consumer.conf
    • input으로 kafka의 test-topic에서 메시지를 읽어오고, output으로 elasticsearch로 보내면서 index를 설정해주었습니다.
input {
  kafka {
    client_id => "logstash-test-topic"
    group_id => "logstash-test-topic"
    topics => ["test-topic"]
    codec => plain
    bootstrap_servers => "kafka:29092"
  }
}
## Add your filters / logstash plugins configuration here
output {
        elasticsearch {
                hosts => "elasticsearch:9200"
                index => "kafka-app-log-%{+YYYY.MM.dd}"
        }
}
  1. Logstash 컨테이너 재시

Springboot Logback 로그 수집

Springboot의 설정은 Logback만 진행하도록 하겠습니다.

1.build.gradle에 라이브러리 추가

# Logback은 kafka appender을 기본적으로 지원하지않아 별도 라이브러리가 필요합니다.
implementation 'com.github.danielwegener:logback-kafka-appender:0.1.0'
implementation 'net.logstash.logback:logstash-logback-encoder:6.2'

2.logback.xml 설정

<configuration>
    <appender name="LOG-KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%date - %-5p %t %-25logger{5} %F:%L %m%n</pattern>
            </layout>
        </encoder>
        <topic>test-topic</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/>
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
        <producerConfig>retries=1</producerConfig>
        <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
        <producerConfig>compression.type=snappy</producerConfig>
        <producerConfig>max.block.ms=1000</producerConfig>
    </appender>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%date - %-5p %t %-25logger{5} %F:%L %m%n</pattern>
        </encoder>
    </appender>

    <logger name="kafka-logger" level="INFO" additivity="false">
        <appender-ref ref="LOG-KAFKA"/>
        <appender-ref ref="STDOUT"/>
    </logger>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

3.로그 남기기

// @Slf4j의 topic 옵션으로 logger을 지정해줍니다.
@Slf4j(topic = "kafka-logger")
@RestController
@RequiredArgsConstructor
public class HelloController {
    @GetMapping({"", "/hello"})
    public String hello() throws IOException {
        // kafka logger에 로그 남기기
        log.info("hello~!@");
        return "hello";
    }
}

Kibana index pattern 생성

  1. http://localhost:5601/app/management/kibana/indexPatterns 이동
  2. kibana pipeline에 설정한 index를 검색이 되는지 확인 (kafka-app-log-%{+YYYY.MM.dd})
  3. kafka-app-log-* 패턴을 만들고 Time filed를 @timestamp로 설정후 생
  4. http://localhost:5601/app/discover에서 데이터 확인