Kafka의 3개의 토픽에서 데이터를 가져와 특정 key 별로 count를 집계해 db에 적재하는 Flink 애플리케이션이 있다.

해당 애플리케이션은 원래 토픽 별로 job을 실행하고 있었는데, 같은 파이프라인을 여러 job을 띄워 처리해야 하는 불편함이 있었기에

이 3개의 스트림 집계 파이프라인을 하나로 합쳐 적재하는 로직으로 변경하였고,

동일 집계 결과를 하나의 job만 실행해 얻을 수 있게 되었다.

 

그러나...

애플리케이션 변경 후 저녁 시간대 마다 consumer lag이 증가하는 이슈가 발생했고 이에 원인 파악 및 조치를 진행하였다.

 

 

 

랙 증가 원인 파악 및 조치

절대적인 트래픽 양이 많아지는 저녁부터 랙이 발생하기 시작하였으나 애플리케이션 변경 전과 후의 데이터 트래픽 차이는 크지 않았다.

Flink 클러스터 모니터링 수치가 다소 차이는 있으나 원인으로 바로 짚긴 어려웠고 gc로그 분석 또한 진행했으나 큰 특이사항은 없었다.

 

 

1. 네트워크 메모리 옵션 변경

stream 라인이 합쳐지며 네트워크 병목이 커진 것일까 하여 우선 taskmanager의 network memory.network.fraction 옵션을 변경해 네트워크 메모리 비율을 증가시켰으나 해당 이슈는 계속 지속되었다.

 

 

2. Parallelism 조정

비록 애플리케이션 변경 전과 후 비교 시 데이터 트래픽의 차이가 크지 않았으나 한달 전 데이터보다 트래픽이 꽤 증가한 상황이었다.

병렬 처리 프레임워크인 Flink 애플리케이션을 운영하며 거의 대부분의 경우 Parallelism의 수를 늘리면 전체적인 성능 향상이 크게 나타나는 것을 확인했었기에 1.5배의 Parallelism을 적용했으나 큰 차이 없이 여전히 이슈는 발생하였다. 

 

 

3. Key Skew 현상 발견

이슈 발생 시간 대의 모니터링 그래프를 다시 면밀히 확인 한 결과 stream 통합 시 taskmanager 간 bytes, records sent 수치의 편차가 이전에 비해 더 발생하는 것을 확인하였다.

해당 편차는 Key Skew가 발생하며 일부 taskmanager에게 데이터가 몰렸고, 띠라서 해당 taskmanager의 리소스 부족으로 랙이 증가하는 것으로 해석할 수 있었다.

 

왜 Key Skew가 발생했을까?

먼저 각 kafka source 데이터의 cardinality를 분석해보았다.

현재 애플리케이션에선 여러 필드의 조합으로 이루어진 key를 사용해 KeyBy 연산으로 그룹화하여 집계 데이터를 생성하는데, 이 과정에서 3개의 스트림이 합쳐지며 특정 필드에 의한 key들의 교집합이 증가하는 현상을 발견했다.  

 

조치 및 테스트

1. field key 추가

기존 key 조합에 포함되지 않았던 필드를 추가하여 skew 현상이 완화되는지 확인하였다.

신규 추가한 필드 b는 전체 데이터 중 교집합에 크게 영향을 미치는 a 특정 필드와의 비율이 비슷하게 분포되어 동일 a필드를 가진 레코드 중에서도 해당 값이 비교적 고르게 분포되어 있었다.

하지만 key의 분산 효과는 미비했다.

key를 구성하는 필드 중 하나인 c 필드의 가장 많은 비중을 차지하는(약 97.63%) 레코드들 중 b 필드가 포함되지않은 레코드들이 많았기 때문에 큰 효과가 없었던 것이다.

 

2. random key 추가

위의 b 필드의 가장 많은 비중을 차지하는 레코드들을 대상으로 random key(0~99)을 추가하여 skew 현상이 완화되는지 확인해보았다.

테스트 결과 taskmanager 별 분산 효과가 확인되었다!

기존 taskmanager 별 numRecordsInPerSecond 수치가 최대 약 900c/s에서 100c/s 로 감소하였으며, 랙 감소 속도 또한 기존보다 약 2배 증가하였다.

 

하지만...

random key 추가 후 저녁에 일부 taskmanager가 restart되는 이슈가 확인되었는데, random key의 범위를 너무 크게 잡아 key의 과다 생성이 원인으로 추측되었다.

 

따라서 random key 0~99 범위를 0~9로 변경 후 다시 실행해보았다.

02/01 17:35 경 적용 이후 key의 분산 범위는 다소 높아졌으나 random key 적용 전 기존 최대 bytes in 수치의 약 1/5이어서 처리 시 크게 문제되지 않는 수치였기에 더이상 consumer lag 및 taskmanager restart가 없는 평화를 되찾을 수 있었다.

 

 

 

간단 회고

데이터 파이프라인 구성 시 단순히 데이터를 전송, 적재의 성능을 높이는 것도 중요하지만 

데이터의 내용과 특성을 알고 있어야 더 효율적인 파이프라인을 구성할 수 있다는 걸 경험할 수 있었다.

 

'Flink' 카테고리의 다른 글

Flink cluster migration (Mesos -> Kubernetes)  (0) 2024.03.10

기존 Mesos 위에서 운영되던 대규모 Flink 클러스터를 Kubernetes로 이전하기 위해 했던 삽질기.

사내의 Mesos 클러스터는 매우 Flink 의존적이었고(Flink가 홀로 사용 중), 그 Flink 마저 1.13 버전부터 Mesos가 deprecated될 예정이었다.

게다가 Mesos 아키텍처도 복잡한 편이라 쉽게 이해하고 운영하기 쉽지 않았다....

 

이에 Flink 클러스터를 Kubernetes로 이전하기로 결정하고 migration 프로젝트를 진행했다.

 

 

Flink Kubernetes 클러스터 설치

프로젝트 처음 시작 시에는 Flink 도커 이미지를 이용하여 직접 Flink 클러스터를 구성해보려 하기도 했었으나 이내 GoogleCloudPlatform에서 릴리즈된 비공식 버전 flink operator(flink-on-k8s-operator)를 발견해 해당 operator를 사용하여 클러스터 설치를 진행했다. 

(operator를 이용해 설치하는게 클러스터의 설정과 구성을 효율적으로 관리하고 적용하기 쉬우며 배포 및 스케일링 프로세스 또한 간편하고 효과적으로 처리할 수 있어 운영 리소스를 크게 절감할 수 있었다)

 

이후 프로젝트 진행 중 2022. 04에 처음으로 apache 공식 flink operator가 릴리즈되었고, 이에 공식 operator를 이용하여 Flink 클러스터를 재구성했다.

GoogleCloudPlatform 버전과 상이한 부분(옵션 등)이 있긴 하지만 클러스터 구성 방법과 배포 과정 자체는 거의 비슷하여 operator 변경 시 큰 이슈는 없었으며, 현재 해당 비공식 버전 operator는 deprecated되었다.

 

 

Apache Flink에서 제공하는 공식 kubernetes용 Flink Operator
https://github.com/apache/flink-kubernetes-operator

 

GitHub - apache/flink-kubernetes-operator: Apache Flink Kubernetes Operator

Apache Flink Kubernetes Operator. Contribute to apache/flink-kubernetes-operator development by creating an account on GitHub.

github.com

Java로 구현되었으며, 사용자는 kubectl과 같은 kubernetes 도구를 통해 Flink 애플리케이션과 lifecycle 관리가 가능하다.

 

 

설치 방법

도커, Kubernetes와 더불어 helm이 설치되어 있는 환경이라면 operator를 사용할 수 있다.

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

helm 명령어를 이용해서 repo를 추가한 뒤 flink-kubernetes-operator를 설치해주면 된다.

※ helm 차트의 템플릿이나 변수 값의 오버라이딩이 필요할 경우 위 레포지토리의  helm 디렉토리 참고

 

 

helm 차트를 설치하고 나면 flink-kubernetes-operator pod이 생성된다.

FlinkDeployment를 통해 Flink클러스터를 구성할 준비가 완료된 것이다.

 

FlinkDeployment 와 FlinkSessionJob은 operator에서 API를 사용할 수 있게 하는 핵심 CR(Custom Resource)로써, FlinkDeployment는 Flink 애플리케이션 및 클러스터 배포를 실행하고, FlinkSessionJob은 세션 클러스터의 세션 작업을 정의해 각 클러스터에서 여러 FlinkSessionJob을 실행할 수 있다.

  • FlinkDeployment에 의해 관리되는 Flink 애플리케이션
  • FlinkDeployment로 관리되는 빈 Flink 세션 + FlinkSessionJobs로 관리되는 여러 독립적인 세션 작업

현재 운영중인 Flink 애플리케이션 특성 상 운영 시 더 효율적으로 관리할 수 있는 면이 있어 FlinkDeployment로 클러스터를 구성한 뒤 Flink 애플리케이션을 띄우는 방식으로 진행했다.

 

FlinkDeployment 주요 구성

더보기
  • image: flink docker image
  • flinkVersion: flink 버전 정의
  • ingress: web ui 접속용 ingress 설정
  • serviceAccount : flink serviceAccount 적용
  • jobManager: jobamanager 설정 적용
    • resource: cpu/memory 지정
    • podTemplate : jobmanager pod 내부 옵션 설정
    • volumeMounts: 볼륨 설정 시 pod 내부의 마운트 경로 설정
    • ports: 포트 설정(ex. flink metrics 포트 지정)
    • volumes: pod의 볼륨 설정
  • taskManager
    • resource: cpu/memory 지정
    • podTemplate : taskmanager pod 내부 옵션 설정
    • volumeMounts: 볼륨 설정 시 pod 내부의 마운트 경로 설정
    • ports: 포트 설정(ex. flink metrics 포트 지정)
    • volumes: pod의 볼륨 설정
  • logConfiguration: log4j 설정(log4j-console.properties, logback-console.xml)

git의 examples 아래의 예시를 참고하여 위의 주요 설정들을 각 kubernetes 및 flink 운영 환경에 맞게 설정한 뒤 kubectl 명령어를 통해 배포해주었다.

kubectl apply -f flinkdeployment.yaml

 

기존 Mesos환경과 동일하게 jobmanager, taskmanager의 cpu/mem 리소스를 지정해주고, 옵션 또한 버전이 업그레이드 되면서 상이한 부분이 있었지만 최대한 동일한 설정으로 구성하였다.
 

Flink 클러스터 구축 시 발생 이슈

1. ingress 설정

현재 사내의 Kubernetes클러스터는 cert-manager를 통해 letsencrypt에서 인증서를 발급받은 후, 발급 받은 인증서를 ingress에 적용하여 https로 접근할 수 있도록 구축해놓았다.

FlinkDeployment에도 ingress옵션이 있어 해당 옵션을 이용해 위의 방법으로 발급받은 인증서를 적용해 Flink 클러스터의 Web UI에 접근할 수 있도록 구성하려 했으나 아쉽게도 ingress 항목에 tls 속성이 누락되어 있어 인증서 적용이 불가했다.

 

결국 별도로 ingress 설정을 구성한 뒤 FlinkDeployment yaml에 추가하여 tls 인증서를 적용하였다.

 

(※ 2024/03/15 Update: 아직 릴리즈되지 않았지만 flink-kubernetes-operator의 1.8 버전부터 ingress 속성에 tls속성이 추가되어 FlinkDeployment 에서 간단히 인증서를 적용한 ingress를 구축할 수 있을 것으로 보인다.

단, operator 1.7 버전부터 현재 운영중인 1.14 버전의 Flink는 지원하지 않기 때문에(Flink Version Support Policy Change) 위 속성을 이용하려면 적어도 1.16 버전으로 클러스터를 업그레이드 해야 한다.......)

 

 

2. Flink 버전 업그레이드

기존 Mesos에서 운영되었던 Flink는 1.7버전으로(매우 오래되긴 하였다.........) operator에서는 지원하지 않는 버전이었다.(최소 1.13 이상 지원)

따라서 1.7 -> 1.14 버전으로 업그레이드 및 코드 개선 작업을 진행하였다.

 

 

Flink 애플리케이션 이전 후 발생 이슈

Flink 클러스터 구축도 완료하고, 애플리케이션 버전도 업그레이드 하고, 이제 돌리기만 하면 되는구나...! 

...... 오만한 생각을 했다. 계속 이어지는 삽질기.

 

Kafka로 적재되는 실시간 로그 트래픽 건수는 일 평균 100억 건 정도.

로그는 총 6개의 토픽으로 나뉘어 적재되는데, 이 중 하나의 토픽에 적재되는 트래픽이 다른 토픽들의 트래픽을 합친 것의 2배 이상 압도적으로 많다.

각 토픽마다 적재되는 로그 타입에 따라 Flink 애플리케이션을 이용해 포맷 변환 파이프라인을 운영중이기에

트래픽 양에 따른 Kafka 토픽의 파티션 수 및 Flink job의 parallelism 수도 각각 지정해주었다.

 

기존 Mesos환경과 동일한 서버 사양에 jobmanager, taskmanager의 cpu/mem 리소스를 지정해주고, 옵션 또한 버전이 업그레이드 되면서 상이한 부분이 있었지만 구성에 큰 차이가 없게 적용한 뒤 애플리케이션을 띄웠다.

 

트래픽 양이 작은 job부터 하나씩 띄우기 시작했고, 마지막으로 가장 많은 job을 실행시키자 이슈가 발생하기 시작했다.

 

 

Kafka consumer lag 증가

Flink의 처리 과정에 부하가 생기며 Kafka consumer lag이 증가하기 시작했고, 앞단의 Kafka로 로그를 적재하는 flume 파이프라인에 백프레셔 까지 발생하기 시작했다.

일단 원복 후 모니터링 수치를 확인한 결과 기존 mesos에 비해 cpu 및 load avg 수치가 1/4로 확인되었다.

 

기존 mesos 클러스터와 동일하게 구성했는데 왜..... 처리량이 적을까.

cpu/memory 리소스를 전보다 늘려서 다시 실행해봐도 동일한 리소스 사용률을 보이며 적재 딜레이가 지속되었다. 

 

Mesos와 Kubernetes의 플랫폼 차이일까?

Mesos가 매우 큰 규모의 클러스터와 다양한 리소스 유형을 관리하는 데 적합한 플랫폼으로 알려져 해당 Flink 파이프라인과 같은 대규모 분산 시스템에서는 Kubernetes보다 성능이 더 좋을 수 있다는 생각이 들기도 했다.

하지만 Kubernetes 클러스터의 메트릭과 모니터링 수치를 아무리 확인해봐도 부하라던지 특이점을 찾을 수 없었다.

이전한 Kubernetes에는 당시 대용량 트래픽을 처리하지 않고 주로 플랫폼 운영을 위한 애플리케이션만 운영중이었기에 클러스터를 함께 사용하는 다른 애플리케이션의 영향 또한 미미하다고 생각했다.

 

그렇다면 컨테이너 환경 때문일까?

Mesos는 기본적으로 컨테이너를 지원하지 않고 프레임워크가 애플리케이션을 관리하지만 Kubernetes는 컨테이너 오케스트레이션 시스템으로서 docker와 같은 컨테이너를 실행하고 관리한다.

컨테이너 환경으로 변경된게 영향이 있을까 싶어 리서치 하던 중 컨테이너에 최적화된 Quarkus 프레임워크를 발견하였다.

Quarkus는 JVM 및 네이티브 컴파일을 위해 만들어진 풀스택, Kubernetes 네이티브 Java 프레임워크로, 특히 컨테이너 환경에서의 Java를 최적화하여 느린 부팅 속도 및 메모리 사용량 개선 등 쿠버네티스 환경에서 효율적으로 사용할 수 있다고 한다.

따라서 이 Quarkus 프레임워크를 적용하여 Java 애플리케이션을 다시 빌드한 뒤 애플리케이션을 실행해보았다.

이 과정에서 Quarkus는 Java 11버전 부터 지원을 하기 때문에 아직 8(...)버전을 사용하던 Java 버전 또한 업그레이드 하였으며, gc 방식도 parallelGC에서 G1GC로 변경해 실행했다.

그러나... 마찬가지로 큰 효과가 없었다.

 

이 과정에서 하나의 Flink클러스터에서 여러 job을 함께 운영하다보니 위처럼 이슈가 발생했을 경우 다른 job들에게도 영향을 미쳐 별도의 Flink 클러스터를 구축해 Job을 실행하기 시작했다.

 

이 외 수많은 삽질(스케일 다운 테스트, flink 옵션 조정, 애플리케이션 로직 개선 등등)을 시도하다 점점 지치기 시작했다.

도대체 뭐가 문제야............

너무 여기에 몰두되어 원인 파악이 늦어지는 걸까 싶어 팀장님께 양해를 구해 다른 업무부터 처리하며 리프레시 후 다시 진행해보기도 했다.

나름 효과가 있어(^^) 이제 이 프로젝트도 빨리 마무리 지어야지! 라는 생각이 강해져 다시 집중하기 시작했다.

 

그리고 다시 원인 파악을 하던 중 미처 체크하지 못한 부분이 있다는 생각에 Aㅏ! 했다. (사무실에서 ah!는 금지사항이지만 어쩔 수 없이 나올 때가 있다....)

Kubernetes의 Prometheus를 이용한 메트릭 모니터링은 했지만 노드 자체의 트래픽 및 네트워크 체크는 안했던 것.

이전 시간동안의 Network 트래픽 수치 확인을 해보니 최대 1000Mbit/초 에서 눌리는 형태의 그래프를 확인했다!

기존 Mesos 클러스터의 서버 스펙과 동일한 CPU/Mem 구성이지만 달랐던 점은 NIC!

기존 서버의 NIC는 10g였지만 현재 Kubernetes의 서버는 1g... 

 

그동안의 삽질이 주마등처럼 흘러갔다ㅜㅜㅜㅜㅜㅜㅜ

SE팀에 요청해 해당 노드들의 NIC를 교체한 결과 더이상 랙이 발생하지 않았다.....

 

 

Job Fail 현상 발생

삽질 중 quarkus 프레임워크를 적용한 애플리케이션이 위의 이슈 해결에는 크게 기여하지 못했지만 Kubernetes 환경에 최적화된 프레임워크이기에 활용해도 좋을 것 같아 기존 Flink 클러스터의 java 버전을 업그레이드 하고 새로 빌드한 애플리케이션으로 변경해 job을 실행해줬다. 

그러나 기존 버전에서도 잘 돌아가던 job들에게 간헐적으로 fail이 발생하기 시작했다.

 

메트릭 수치를 확인해보니 job fail 전 일부 taskmanger의 heap 메모리 수치가 튀는 현상을 발견했다.

기존과 다른 점이라면 quarkus 적용을 위해 java11로 업그레이드를 하고 g1gc 방식으로 변경했다는 건데..

java버전에 의존성이 있는 로직이 있나? 싶어 코드를 분석하다가 원인 파악을 하게 되었다.

현재 사내 로그 포맷 변환을 위해 개발한 udf가 있는데, 이 udf 라이브러리를 hive와 flink 애플리케이션에서 공통으로 사용한다.

이 udf가 java 8버전으로 빌드되었기에 java11로 빌드한 flink 애플리케이션에서 이슈가 발생한 것으로 보였다.

하지만 안타깝게도 현재 사용중인 2점대 버전의 hive에는 java 8버전의 라이브러리를 사용할 수 밖에 없다.

결국 quarkus 적용은 보류.....

 

 

Timezone 이슈

Flink 클러스터의 timezone을 default로 세팅해 배포했더니 로그의 timestamp 포맷 변환시 KST가 아닌 UTC로 변환되는 이슈가 발생했다.

udf 로직을 바로 변경할 수는 없어 kubernetes의 volume 옵션을 이용해 flink container의 timezone 설정을 KST로 변경해주었다.

이후 정상 동작 확인.  

 

 

Ingress를 이용한 file 업로드 시 에러 발생

Flink 클러스터에서 실행할 jar 애플리케이션을 업로드 하는 중 "413 Request Entity Too Large" 에러 발생.

확인해보니 jar파일 업로드 요청이 ingress에서 허용하는 최대 크기를 초과했기 때문이었다.

ingress의 annotation 설정에 body 사이즈 및 timeout 설정도 추가해 문제를 해결하였다.

    nginx.ingress.kubernetes.io/proxy-body-size: 1g
    nginx.ingress.kubernetes.io/proxy-connect-timeout: '1800'
    nginx.ingress.kubernetes.io/proxy-read-timeout: 600s
    nginx.ingress.kubernetes.io/proxy-send-timeout: 600s
    nginx.ingress.kubernetes.io/session-cookie-expires: '172800'
    nginx.ingress.kubernetes.io/session-cookie-max-age: '172800'

 

 

 

이전 효과

기존의 Flink가 거의 독점으로 사용했던 Mesos 서버를 걷어냄으로써 다소 낭비로 느껴졌던 리소스를 효율적으로 운영하고 비용 절감 효과를 이뤄냈으며, 클러스터 관리 포인트 또한 크게 개선할 수 있었다.

  • Flink 전용 환경 제거, 리소스 절약
    • Flink 클러스터 의존적인 Mesos 클러스터 제거 및 Mesos HA구성을 위한 Zookeeper 제거
    • 20%의 서버 절감 효과 및 다른 서비스들과의 리소스 공유
  • 클러스터 관리 개선
    • flink 클러스터 구축 및 설정 용이
    • 장애 대응 속도 개선

 

더 해보고 싶은 것

우선 진행 못했던 java 버전 업그레이드와 더불어 quarkus 적용.. 물론 hdfs 업그레이드부터 해야 해 일이 커지긴 하지만 꼭 해야하는 일이다.

더불어 Flink의 Autoscaler 기능도 활용해보고 싶다. 시간대 별로 편차가 큰 트래픽에 대응해 리소스를 더 효율적으로 사용할 수 있을 것이고, 급증하는 트래픽에 대한 대응도 신속히 할 수 있으니 24시간 내내 실시간으로 운영되는 우리 파이프라인에 정말 안성맞춤인 기능이다.

(정말 효율적으로 운영할 수 있을지는 적용해봐야 아는거지만^-^..)

 

 

 

간단 회고

정말... 가장 많은 삽질로 인해 진땀을 뺐던 프로젝트이다.

그 삽질을 정리하기도 쉽지 않았고 이미 정리한 삽질기를 다시 블로그로 옮기며 또 한번 반성과 추억을 회상했다. 

하드웨어의 리소스나 스펙을 높여 문제점을 해결하기보다 가능한 한 소프트웨어 중심으로 이슈를 해결하려는 성향이 있는데 

이런 성향이 하드웨어적인 요소가 뒷받침해주지 않을 때 오히려 더 안좋게 작용할 수 있다는 경험을 했고 

하드웨어의 스케일업이 필요한 경우를 잘 판단하고 요청할 수 있게 더 꼼꼼히 체크하고 공부해야 겠다는 생각이 들었다.

 

삽질은 남는다........

'Flink' 카테고리의 다른 글

Flink Key Skew 이슈 해결하기  (0) 2024.03.10

+ Recent posts