1. elk 설치
- Version Check
OS - CentOS Linux release 7.8.2003
JAVA - OpenJDK Runtime Environment (build 1.8.0_292-b10)
Elasticsearch - 7.13
Filebeat - 7.13
Logstash - 7.13
kibana - 7.13(elasticsearch와 버전 동일하게 다운)
모두 yum으로 설치 가능. 그리고 그렇게 설치하기를 권장함.
2. filebeat - logstash - elasticsearch - kibana 연동
Elasticsearch 설정파일 - /etc/elasticsearch/elasticsearch.yml
Elasticsearch 실행
- sudo systemctl start/restart/stop elasticsearch
Logstash 설정
- /etc/logstash/conf.d/logstash.conf
# /etc/logstash/conf.d/logstash.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats{
# host 안넣으면 에러 발생..
host => "127.0.0.1"
port => 5044
}
}
output {
stdout {}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
- /etc/logstash/pipeline.yml (다수의 파이프라인 설정 가능)
# /etc/logstash/pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
- pipeline.id: main
path.config: "/etc/logstash/conf.d/logstash.conf"
Logstash 서비스 실행
- systemctl start/restart/stop logstash
Logstash 설치(설정 적용)
- sudo /usr/share/logstash/bin/system-install
Logstash log 확인하기
- sudo tail -f /var/log/logstash/logstash-plain.log
Filebeat 설정 파일 위치
- /etc/filebeat/filebeat.yml
- 설정에 따라 output 주석처리 및 변경
Filebeat 서비스 실행
- sudo systemctl start/restart/stop filebeat
Filebeat 로그 확인
- sudo tail -f /var/log/messages
인덱스 목록 조회 -> filebeat 제대로 동작되는지 확인하기
$ curl localhost:9200/_cat/indices
Kibana 설정
- /etc/kibana/kibana.yml - host, port, es host등 설정 변경 가능
- sudo systemctl restart kibana
kibana에서 확인하기
localhost:5601 접속
좌측 메뉴에서 스크롤 내려서 Management > StackManagement 클릭
다시 좌측메뉴에 Kibana > Index Patterns 클릭
오른쪽 상단에 파란색 버튼의 Create Index Pattern 클릭
filebeat 골라서 생성
좌측 메뉴에서 Analytics > Discover 클릭
원하는 Index를 선택하면 로그가 들어와있는것을 확인가능함
3. kafka 설치
- Version Check
Kafka - 2.13-2.8.0 (Scala Version : 2.13)
http://mirror.navercorp.com/apache/kafka/
해당 링크로 가면 Kafka를 다운받을 수 있음.
다운 받은 파일을 압축해제하기
$ tar -xvzf kafka_2.13-2.4.1.tgz
$ ln -s kafka_2.13-2.4.1 kafka
Kafka를 실행하기 위해 zookeeper가 먼저 실행되어 있어야 함
$ nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > zookeeper.log 2>&1 &
Kafka 실행
$ nohup ./bin/kafka-server-start.sh ./config/server.properties > kafka.log 2>&1 &
만약 에러 발생?
[2021-06-02 10:22:04,775] WARN Session 0x0 for server localhost/0:0:0:0:0:0:0:1:2181, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.lang.NoSuchMethodError: org.slf4j.helpers.Util.safeGetSystemProperty(Ljava/lang/String;)Ljava/lang/String;
at org.slf4j.impl.VersionUtil.getJavaMajorVersion(VersionUtil.java:11)
at org.slf4j.impl.Log4jMDCAdapter.<clinit>(Log4jMDCAdapter.java:37)
at org.slf4j.impl.StaticMDCBinder.getMDCA(StaticMDCBinder.java:59)
at org.slf4j.MDC.<clinit>(MDC.java:73)
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1080)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
[2021-06-02 10:22:05,879] WARN Session 0x0 for server localhost/0:0:0:0:0:0:0:1:2181, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.lang.NoClassDefFoundError: Could not initialize class org.slf4j.MDC
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1080)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
[2021-06-02 10:22:06,980] WARN Session 0x0 for server localhost/0:0:0:0:0:0:0:1:2181, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.lang.NoClassDefFoundError: Could not initialize class org.slf4j.MDC
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1080)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
.......
- slf4j 관련 에러 => kafka/libs 밑에 slf4j-log4j12-1.7.30.jar 지우고 slf4j-simple-1.7.30.jar 넣어줌 => 해결!!
kafka 포트 : 2181
Kafka 테스트 하기 - Topic 생성
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic
Kafka 테스트 하기 - Topic 리스트 확인
./kafka-topics.sh --list --zookeeper localhost:2181
./kafka-console-producer.sh --broker-list localhost:9092 -topic test_1 >실행 후 메세지를 입력하고 종료(ctrl + c)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_1 --from-beginning > 메세지 확인
4. filebeat - kafka- logstash - elasticsearch - kibana 연동
filebeat 설정에서 kafka로 데이터를 보내도록 변경해줘야함
#filebeat.yml
...
output.kafka:
hosts: "localhost:9092"
topic: "msg_log"
...
logstash 설정에서 input을 kafka로 부터 받는걸로 변경해줘야함. logstash에서 자동으로 kafka를 subscribe한것을 받아오게됨. 새파일을 만들어 진행
# /etc/logstash/conf.d/kafka.conf
...
input {
kafka{
bootstrap_servers => "localhost:9092"
group_id => "logstash"
topics => ["msg_log"] # filebeat에서 밀어넣은 topic으로
consumer_threads => 1
decorate_events => true
}
}
output {
elasticsearch {
action => "index"
hosts => ["127.0.0.1:9200"]
index => "msg_log" # es에 넣고자하는대로 설정
}
}
...
pipeline 파일에는 따로 추가해주기
설정 변경한 것들을 재시작해주기
5. spark 설치
- Version Check
Spark - 3.1.2-bin-hadoop2.7 (Hadoop Version : 2.7)
https://mirror.navercorp.com/apache/spark/spark-3.1.2/
다운로드 및 압축 해제
$ tar -xvzf spark-3.1.2-bin-hadoop2.7.tgz
$ ln -s spark-3.1.2-bin-hadoop2.7 spark
~/.bash_profile 에 환경변수 등록
export SPARK_HOME=/home/cbs210601/downloads/spark
export PATH=$PATH:$SPARK_HOME/bin
~/.bash_profile 수정사항 적용
source ~/.bashrc
$SPARK_HOME/conf 밑에 spark-env.sh.template 파일을 복사해 spark-env.sh 파일 생성
해당 파일에 JAVA_HOME과 SPARK_HOME을 추가해준다.
export JAVA_HOME=/usr/local/java
export SPARK_HOME=/home/cbs210601/downloads/spark
pyspark를 쓰려면 $SPARK_HOME/jars 밑에 추가해줘야함(maven repository(https://mvnrepository.com)에서 다운)
spark-submit 파일 : 배포할때 쓰임.
EX) {SPARK_HOME}/bin/spark-submit --master spark://host:7077 --executor-memory 10g myscript.py
6. filebeat - kafka- logstash - kafka - spark - elasticsearch - kibana 연동
pyspark를 통해 연결할수 있음, pyspark는 대화형 shell이므로 실행결과를 바로 확인가능함.
나중에 py파일을 만들어놓고 작업하는것도 만들어봐야함.
$ sudo ./pyspark
실행하게되면 쉘이 뜨게됨.
아래 이미지의 url로 접속하면 모니터링도 가능
아래는 kafka에서 spark로, 그리고 spark에서 es로 보내는 과정!
structured stream인지 뭔지.. 이론은 하나도 기억이 나지 않음...ㅠ
from pyspark.sql.functions import decode
from pyspark.sql.functions import from_unixtime,unix_timestamp
from pyspark.sql.functions import to_utc_timestamp
#kafka 데이터 받아오기
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "127.0.0.1:9092").option("subscribe", "msg_log_1").load()
df_decode = df.withColumn("value",decode(df.value,'utf-8')) #### 디코딩
df_decode = df.withColumn("@timestamp",\
from_unixtime(unix_timestamp(to_utc_timestamp(df.timestamp,'Asia/Seoul')),\
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
# console에 결과 띄우기
#query = df_decode.writeStream.format("console").trigger(processingTime='10 seconds').start()
#query.stop()
query = df_decode.writeStream.outputMode("append")\
.format("org.elasticsearch.spark.sql")\
.option("es.nodes","192.168.0.7:9200")\
.option("checkpointLocation", "file:///home/cbs210601/downloads/spark/chkpt")\
.start("msg_log_1_4/").awaitTermination(200)
# es에 넣을 인덱스
# ctrl + c 로 종료가능
spark에서 kafka를 subscribe하게되면
readStream으로 받아오게되는데 여기서 key, value가 binary타입으로 받게됨...
이걸 해결하기 위해 decode를 해줌.
timestamp도 long 타입으로 받기때문에 이걸 kibana에서 인식할수 있는 string으로 바꿔줘야함
그리고 UTC로 바꿔줘야 kibana에서 이걸 받아서 타임존을 다시 수정해서 보여줌.
7. 마무리
3일 교육간 이론은 하나도 모르겠는데 실습은 잘 따라함ㅎㅎ
일단 막 올린거라 나중에 이쁘게 수정하기..
참고
pyspark api doc : https://home.apache.org/~pwendell/spark-releases/latest/api/python/index.html
spark + kafka 연동 : https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
pyspark 예제코드 : https://www.bmc.com/blogs/spark-elasticsearch-hadoop/
참고 블로그(스파크) : https://moons08.github.io/programming/sparkStreaming/#source-read
kafka 참고 자료(서비스 등록 내용) : https://twofootdog.tistory.com/90
오픈소스 지원센터 : https://www.oss.kr/solution_guide
'Bigdata, AI' 카테고리의 다른 글
Tensorflow 2.0 Tensorboard 사용법 (2) | 2021.05.26 |
---|---|
머신러닝 모델 성능평가 (url) (0) | 2020.12.01 |
CNN 구조에 대한 설명 (url) (0) | 2020.11.30 |
딥러닝 활성화 함수 정리 (0) | 2020.11.23 |
공학에서 사용하는 문자, 기호 (0) | 2020.07.15 |
댓글