자바스크립트가 비활성화 되어있습니다.
자바스크립트가 활성화 되어야 콘텐츠가 깨지지 않고 보이게 됩니다.
자바스크립트를 사용할수 있도록 옵션을 변경해 주세요.
- willbsoon

본문 바로가기
Bigdata, AI

ELK + kafka + spark 설치 및 연동

by willbsoon 2021. 6. 4.

1. elk 설치

https://s-jg.tistory.com/32

 

Centos7에 ELK stack 구축 하기(Elasticsearch, Logstash, Kibana, Filebeat), 버전 7.12

웹 로그와 시스템 로그 통합을 위해 ELK 스택 도입을 생각 중이다. 정식 도입 전 테스트 서버에 간단하게 구성 해보도록 하겠다. 환경은 Centos7에 설치 하였고 테스트인 만큼 서버 1대에 Elasticsearch,

s-jg.tistory.com

 - Version Check

OS - CentOS Linux release 7.8.2003

JAVA - OpenJDK Runtime Environment (build 1.8.0_292-b10)

Elasticsearch - 7.13

Filebeat - 7.13

Logstash - 7.13

kibana - 7.13(elasticsearch와 버전 동일하게 다운) 

 

모두 yum으로 설치 가능. 그리고 그렇게 설치하기를 권장함.

 

2. filebeat - logstash - elasticsearch - kibana 연동

https://s-jg.tistory.com/33

Elasticsearch 설정파일 - /etc/elasticsearch/elasticsearch.yml

Elasticsearch 실행

 - sudo systemctl start/restart/stop elasticsearch

Logstash 설정

 - /etc/logstash/conf.d/logstash.conf

# /etc/logstash/conf.d/logstash.conf

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats{
    # host 안넣으면 에러 발생..
    host => "127.0.0.1"  
    port => 5044
  }
}

output {
  stdout {}
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
}

 - /etc/logstash/pipeline.yml (다수의 파이프라인 설정 가능)

# /etc/logstash/pipelines.yml

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/logstash.conf"

Logstash 서비스 실행

 - systemctl start/restart/stop logstash 

Logstash 설치(설정 적용)

 - sudo /usr/share/logstash/bin/system-install

Logstash log 확인하기
 - sudo tail -f /var/log/logstash/logstash-plain.log

 

Filebeat 설정 파일 위치

 - /etc/filebeat/filebeat.yml

 - 설정에 따라 output 주석처리 및 변경

Filebeat 서비스 실행

 - sudo systemctl start/restart/stop filebeat

Filebeat 로그 확인
 - sudo tail -f /var/log/messages

 

인덱스 목록 조회 -> filebeat 제대로 동작되는지 확인하기

$ curl localhost:9200/_cat/indices

 

Kibana 설정

 - /etc/kibana/kibana.yml - host, port, es host등 설정 변경 가능

 - sudo systemctl restart kibana

 

kibana에서 확인하기

localhost:5601 접속

좌측 메뉴에서 스크롤 내려서 Management > StackManagement 클릭

다시 좌측메뉴에 Kibana > Index Patterns 클릭

오른쪽 상단에 파란색 버튼의 Create Index Pattern 클릭

filebeat 골라서 생성

 

좌측 메뉴에서 Analytics > Discover 클릭

원하는 Index를 선택하면 로그가 들어와있는것을 확인가능함

 

3. kafka 설치

 - Version Check

Kafka - 2.13-2.8.0 (Scala Version : 2.13)

http://mirror.navercorp.com/apache/kafka/

 

해당 링크로 가면 Kafka를 다운받을 수 있음.

다운 받은 파일을 압축해제하기

$ tar -xvzf kafka_2.13-2.4.1.tgz

$ ln -s kafka_2.13-2.4.1 kafka

 

Kafka를 실행하기 위해 zookeeper가 먼저 실행되어 있어야 함

$ nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > zookeeper.log 2>&1 &

Kafka 실행

$ nohup ./bin/kafka-server-start.sh ./config/server.properties > kafka.log 2>&1 &

 

만약 에러 발생?

[2021-06-02 10:22:04,775] WARN Session 0x0 for server localhost/0:0:0:0:0:0:0:1:2181, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.lang.NoSuchMethodError: org.slf4j.helpers.Util.safeGetSystemProperty(Ljava/lang/String;)Ljava/lang/String;
        at org.slf4j.impl.VersionUtil.getJavaMajorVersion(VersionUtil.java:11)
        at org.slf4j.impl.Log4jMDCAdapter.<clinit>(Log4jMDCAdapter.java:37)
        at org.slf4j.impl.StaticMDCBinder.getMDCA(StaticMDCBinder.java:59)
        at org.slf4j.MDC.<clinit>(MDC.java:73)
        at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1080)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
[2021-06-02 10:22:05,879] WARN Session 0x0 for server localhost/0:0:0:0:0:0:0:1:2181, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.lang.NoClassDefFoundError: Could not initialize class org.slf4j.MDC
        at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1080)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
[2021-06-02 10:22:06,980] WARN Session 0x0 for server localhost/0:0:0:0:0:0:0:1:2181, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.lang.NoClassDefFoundError: Could not initialize class org.slf4j.MDC
        at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1080)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
.......

 - slf4j 관련 에러 => kafka/libs 밑에 slf4j-log4j12-1.7.30.jar 지우고 slf4j-simple-1.7.30.jar 넣어줌 => 해결!!

 

kafka 포트 : 2181
Kafka 테스트 하기 - Topic 생성
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic

Kafka 테스트 하기 - Topic 리스트 확인

./kafka-topics.sh --list --zookeeper localhost:2181

 

./kafka-console-producer.sh --broker-list localhost:9092 -topic test_1  >실행 후 메세지를 입력하고 종료(ctrl + c)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_1 --from-beginning   > 메세지 확인

 

 

4. filebeat - kafka- logstash - elasticsearch - kibana 연동

filebeat 설정에서 kafka로 데이터를 보내도록 변경해줘야함

#filebeat.yml

...

output.kafka:
  hosts: "localhost:9092"
  topic: "msg_log"

...

 

logstash 설정에서 input을 kafka로 부터 받는걸로 변경해줘야함. logstash에서 자동으로 kafka를 subscribe한것을 받아오게됨. 새파일을 만들어 진행

# /etc/logstash/conf.d/kafka.conf

...

input {
  kafka{
    bootstrap_servers => "localhost:9092"
    group_id => "logstash"
    topics => ["msg_log"] # filebeat에서 밀어넣은 topic으로
    consumer_threads => 1
    decorate_events => true
  }
}
output {
  elasticsearch {
    action => "index"
    hosts => ["127.0.0.1:9200"]
    index => "msg_log" # es에 넣고자하는대로 설정
  }
}

...

pipeline 파일에는 따로 추가해주기

설정 변경한 것들을 재시작해주기

 

 

5. spark 설치

 - Version Check

Spark - 3.1.2-bin-hadoop2.7 (Hadoop Version : 2.7)

https://mirror.navercorp.com/apache/spark/spark-3.1.2/

다운로드 및 압축 해제

$ tar -xvzf  spark-3.1.2-bin-hadoop2.7.tgz

$ ln -s  spark-3.1.2-bin-hadoop2.7 spark

 

~/.bash_profile 에 환경변수 등록

export SPARK_HOME=/home/cbs210601/downloads/spark

export PATH=$PATH:$SPARK_HOME/bin

 

~/.bash_profile 수정사항 적용

source ~/.bashrc

 

$SPARK_HOME/conf 밑에 spark-env.sh.template 파일을 복사해 spark-env.sh 파일 생성

해당 파일에 JAVA_HOME과 SPARK_HOME을 추가해준다.

export JAVA_HOME=/usr/local/java
export SPARK_HOME=/home/cbs210601/downloads/spark

 

pyspark를 쓰려면 $SPARK_HOME/jars 밑에 추가해줘야함(maven repository(https://mvnrepository.com)에서 다운)

 

 

spark-submit 파일 : 배포할때 쓰임.

EX) {SPARK_HOME}/bin/spark-submit --master spark://host:7077 --executor-memory 10g myscript.py

 

6. filebeat - kafka- logstash - kafka - spark - elasticsearch - kibana 연동

pyspark를 통해 연결할수 있음, pyspark는 대화형 shell이므로 실행결과를 바로 확인가능함.

나중에 py파일을 만들어놓고 작업하는것도 만들어봐야함.

 

$ sudo ./pyspark

실행하게되면 쉘이 뜨게됨.

아래 이미지의 url로 접속하면 모니터링도 가능

 

아래는 kafka에서 spark로, 그리고 spark에서 es로 보내는 과정!

structured stream인지 뭔지.. 이론은 하나도 기억이 나지 않음...ㅠ

 

from pyspark.sql.functions import decode
from pyspark.sql.functions import from_unixtime,unix_timestamp
from pyspark.sql.functions import to_utc_timestamp

#kafka 데이터 받아오기
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "127.0.0.1:9092").option("subscribe", "msg_log_1").load()
df_decode = df.withColumn("value",decode(df.value,'utf-8'))          #### 디코딩
df_decode = df.withColumn("@timestamp",\
  from_unixtime(unix_timestamp(to_utc_timestamp(df.timestamp,'Asia/Seoul')),\
  "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))

# console에 결과 띄우기
#query = df_decode.writeStream.format("console").trigger(processingTime='10 seconds').start()
#query.stop()

query = df_decode.writeStream.outputMode("append")\
  .format("org.elasticsearch.spark.sql")\
  .option("es.nodes","192.168.0.7:9200")\
  .option("checkpointLocation", "file:///home/cbs210601/downloads/spark/chkpt")\
  .start("msg_log_1_4/").awaitTermination(200)
         # es에 넣을 인덱스
         
# ctrl + c 로 종료가능

 

https://home.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-docs/structured-streaming-kafka-integration.html

spark에서 kafka를 subscribe하게되면

readStream으로 받아오게되는데 여기서 key, value가 binary타입으로 받게됨...

이걸 해결하기 위해 decode를 해줌.

 

timestamp도 long 타입으로 받기때문에 이걸 kibana에서 인식할수 있는 string으로 바꿔줘야함

그리고 UTC로 바꿔줘야 kibana에서 이걸 받아서 타임존을 다시 수정해서 보여줌.

 

 

7. 마무리

3일 교육간 이론은 하나도 모르겠는데 실습은 잘 따라함ㅎㅎ

일단 막 올린거라 나중에 이쁘게 수정하기..

 

 

참고

pyspark api doc : https://home.apache.org/~pwendell/spark-releases/latest/api/python/index.html  

spark + kafka 연동 : https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 

pyspark 예제코드 : https://www.bmc.com/blogs/spark-elasticsearch-hadoop/

참고 블로그(스파크) : https://moons08.github.io/programming/sparkStreaming/#source-read

kafka 참고 자료(서비스 등록 내용) : https://twofootdog.tistory.com/90   

오픈소스 지원센터 : https://www.oss.kr/solution_guide   

 

 

 

댓글