Docker Kafka + ELK 연동
by Glenn
Kafka + ELK 연동
모니터링 및 데이터를 수집하기위해 많이 사용하고있는 조합인 Kafka, ELK를 간단하게 사용해보면서
이해하기 위해 어플리케이션 로그를 수집해보겠습니다.
Kafka, ELK 간단한 개념정리
Kafka
확장성과 가용성이 높고, Pub/Sub 모델 지원하는 메시지 플랫폼
- producer : 메시지 생산자(발행)
- consumer : 메시지 소비자(구독)
- broker : 카프카의 서버, 동일한 노드내에 여러개의 broker을 띄울수있다.
- Topic : 메시지 분류 및 저장된곳
Elasticsearch 분산형 오픈 소스 검색 및 분석 엔진
- index : 서로 관련되어 있는 문서들의 모음
Logstash 데이터를 집계하고 처리를 담당
- pipeline : 데이터를 어디서 수집할지, 어디로 저장할지 설정
- filter : pipeline의 input으로 집계한 데이터를 분석하고 변환
Kibana Elasticsearch에 저장된 데이터를 시각화해주는 UI 제공
Elasticsearch + Logstash + Kibana의 앞글자를 따서 ELK stack라고 칭합니다.
Docker를 이용한 설치
참고
Kafka, ELK를 각각 Docker compose로 구성하면 Kafka와 Logstash연동에 문제가 있을수 있습니다.
Docker network는 default bridge이며, 기본적으로 같은 네트워크로 묶인 컨테이너끼리 통신이 가능합니다.
docker network connect, 공용 외부 네트워크 생성으로 해결할수도 있으나, 여기서는 한번에 compose로 구성하였습니다.
git clone https://github.com/deviantony/docker-elk.git
cd docker-elk
- X-Pack 설정 제거(유료이기에 제거해 줍니다.)
- X-Pack… 하단의 설정값을 제거 또는 주석
- vi docker-elk/elasticsearch/config/elasticsearch.yml
- vi kibana/config/kibana.yml
- vi logstash/config/logstash.yml
- X-Pack… 하단의 설정값을 제거 또는 주석
vi docker-compose-with-kafka.yml
- docker-compose-with-kafka.yaml 파일을 다음과 같이 수정
version: '3.2'
services:
elasticsearch:
build:
context: elasticsearch/
args:
ELK_VERSION: $ELK_VERSION
volumes:
- type: bind
source: ./elasticsearch/config/elasticsearch.yml
target: /usr/share/elasticsearch/config/elasticsearch.yml
read_only: true
- type: volume
source: elasticsearch
target: /usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"
environment:
ES_JAVA_OPTS: "-Xmx256m -Xms256m"
ELASTIC_PASSWORD: changeme
# Use single node discovery in order to disable production mode and avoid bootstrap checks.
# see: https://www.elastic.co/guide/en/elasticsearch/reference/current/bootstrap-checks.html
discovery.type: single-node
networks:
- kafka-elk
logstash:
build:
context: logstash/
args:
ELK_VERSION: $ELK_VERSION
volumes:
- type: bind
source: ./logstash/config/logstash.yml
target: /usr/share/logstash/config/logstash.yml
read_only: true
- type: bind
source: ./logstash/pipeline
target: /usr/share/logstash/pipeline
read_only: true
ports:
- "5044:5044"
- "5000:5000/tcp"
- "5000:5000/udp"
- "9600:9600"
environment:
LS_JAVA_OPTS: "-Xmx256m -Xms256m"
networks:
- kafka-elk
depends_on:
- elasticsearch
kibana:
build:
context: kibana/
args:
ELK_VERSION: $ELK_VERSION
volumes:
- type: bind
source: ./kibana/config/kibana.yml
target: /usr/share/kibana/config/kibana.yml
read_only: true
ports:
- "5601:5601"
networks:
- kafka-elk
depends_on:
- elasticsearch
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:latest
ports:
- "9900:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- kafka-elk
kafka:
container_name: kafka
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "test-topic:1:1"Springboot
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka-elk
networks:
kafka-elk:
driver: bridge
volumes:
elasticsearch:
docker-compose build && docker-compose up -d
- 접속확인
- Elasticsearch : localhost:9200
- Logstash : localhost:5000/9600
- Kibana : localhost:5601
docker network ls
명령어를 입력하여 네트워크 목록확인docker network inspect {network name}
명령어로 해당 네트워크에 컨테이너가 모두 포함되었는지 확인
만약 네트워크에 컨테이너가 포함이 안되어있다면, docker-compose-with-kafka.yml
설정 확인후
docker-compose build && docker-compose up -d
재수행
Docker kafka 설정에서
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
이처럼 설정하여 컨테이너 내부에선 kafka:29092
외부에선 localhost:9092
접속하도록 설정
Logstash pipeline 추가
- cd docker-elk/logstash/pipeline
- vi kafka-consumer.conf
- input으로 kafka의 test-topic에서 메시지를 읽어오고, output으로 elasticsearch로 보내면서 index를 설정해주었습니다.
input {
kafka {
client_id => "logstash-test-topic"
group_id => "logstash-test-topic"
topics => ["test-topic"]
codec => plain
bootstrap_servers => "kafka:29092"
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "kafka-app-log-%{+YYYY.MM.dd}"
}
}
- Logstash 컨테이너 재시
Springboot Logback 로그 수집
Springboot의 설정은 Logback만 진행하도록 하겠습니다.
1.build.gradle에 라이브러리 추가
# Logback은 kafka appender을 기본적으로 지원하지않아 별도 라이브러리가 필요합니다.
implementation 'com.github.danielwegener:logback-kafka-appender:0.1.0'
implementation 'net.logstash.logback:logstash-logback-encoder:6.2'
2.logback.xml 설정
<configuration>
<appender name="LOG-KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date - %-5p %t %-25logger{5} %F:%L %m%n</pattern>
</layout>
</encoder>
<topic>test-topic</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>retries=1</producerConfig>
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<producerConfig>compression.type=snappy</producerConfig>
<producerConfig>max.block.ms=1000</producerConfig>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date - %-5p %t %-25logger{5} %F:%L %m%n</pattern>
</encoder>
</appender>
<logger name="kafka-logger" level="INFO" additivity="false">
<appender-ref ref="LOG-KAFKA"/>
<appender-ref ref="STDOUT"/>
</logger>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
3.로그 남기기
// @Slf4j의 topic 옵션으로 logger을 지정해줍니다.
@Slf4j(topic = "kafka-logger")
@RestController
@RequiredArgsConstructor
public class HelloController {
@GetMapping({"", "/hello"})
public String hello() throws IOException {
// kafka logger에 로그 남기기
log.info("hello~!@");
return "hello";
}
}
Kibana index pattern 생성
- http://localhost:5601/app/management/kibana/indexPatterns 이동
- kibana pipeline에 설정한 index를 검색이 되는지 확인 (kafka-app-log-%{+YYYY.MM.dd})
kafka-app-log-*
패턴을 만들고 Time filed를 @timestamp로 설정후 생- http://localhost:5601/app/discover에서 데이터 확인
Subscribe via RSS