Kafka의 3개의 토픽에서 데이터를 가져와 특정 key 별로 count를 집계해 db에 적재하는 Flink 애플리케이션이 있다.

해당 애플리케이션은 원래 토픽 별로 job을 실행하고 있었는데, 같은 파이프라인을 여러 job을 띄워 처리해야 하는 불편함이 있었기에

이 3개의 스트림 집계 파이프라인을 하나로 합쳐 적재하는 로직으로 변경하였고,

동일 집계 결과를 하나의 job만 실행해 얻을 수 있게 되었다.

 

그러나...

애플리케이션 변경 후 저녁 시간대 마다 consumer lag이 증가하는 이슈가 발생했고 이에 원인 파악 및 조치를 진행하였다.

 

 

 

랙 증가 원인 파악 및 조치

절대적인 트래픽 양이 많아지는 저녁부터 랙이 발생하기 시작하였으나 애플리케이션 변경 전과 후의 데이터 트래픽 차이는 크지 않았다.

Flink 클러스터 모니터링 수치가 다소 차이는 있으나 원인으로 바로 짚긴 어려웠고 gc로그 분석 또한 진행했으나 큰 특이사항은 없었다.

 

 

1. 네트워크 메모리 옵션 변경

stream 라인이 합쳐지며 네트워크 병목이 커진 것일까 하여 우선 taskmanager의 network memory.network.fraction 옵션을 변경해 네트워크 메모리 비율을 증가시켰으나 해당 이슈는 계속 지속되었다.

 

 

2. Parallelism 조정

비록 애플리케이션 변경 전과 후 비교 시 데이터 트래픽의 차이가 크지 않았으나 한달 전 데이터보다 트래픽이 꽤 증가한 상황이었다.

병렬 처리 프레임워크인 Flink 애플리케이션을 운영하며 거의 대부분의 경우 Parallelism의 수를 늘리면 전체적인 성능 향상이 크게 나타나는 것을 확인했었기에 1.5배의 Parallelism을 적용했으나 큰 차이 없이 여전히 이슈는 발생하였다. 

 

 

3. Key Skew 현상 발견

이슈 발생 시간 대의 모니터링 그래프를 다시 면밀히 확인 한 결과 stream 통합 시 taskmanager 간 bytes, records sent 수치의 편차가 이전에 비해 더 발생하는 것을 확인하였다.

해당 편차는 Key Skew가 발생하며 일부 taskmanager에게 데이터가 몰렸고, 띠라서 해당 taskmanager의 리소스 부족으로 랙이 증가하는 것으로 해석할 수 있었다.

 

왜 Key Skew가 발생했을까?

먼저 각 kafka source 데이터의 cardinality를 분석해보았다.

현재 애플리케이션에선 여러 필드의 조합으로 이루어진 key를 사용해 KeyBy 연산으로 그룹화하여 집계 데이터를 생성하는데, 이 과정에서 3개의 스트림이 합쳐지며 특정 필드에 의한 key들의 교집합이 증가하는 현상을 발견했다.  

 

조치 및 테스트

1. field key 추가

기존 key 조합에 포함되지 않았던 필드를 추가하여 skew 현상이 완화되는지 확인하였다.

신규 추가한 필드 b는 전체 데이터 중 교집합에 크게 영향을 미치는 a 특정 필드와의 비율이 비슷하게 분포되어 동일 a필드를 가진 레코드 중에서도 해당 값이 비교적 고르게 분포되어 있었다.

하지만 key의 분산 효과는 미비했다.

key를 구성하는 필드 중 하나인 c 필드의 가장 많은 비중을 차지하는(약 97.63%) 레코드들 중 b 필드가 포함되지않은 레코드들이 많았기 때문에 큰 효과가 없었던 것이다.

 

2. random key 추가

위의 b 필드의 가장 많은 비중을 차지하는 레코드들을 대상으로 random key(0~99)을 추가하여 skew 현상이 완화되는지 확인해보았다.

테스트 결과 taskmanager 별 분산 효과가 확인되었다!

기존 taskmanager 별 numRecordsInPerSecond 수치가 최대 약 900c/s에서 100c/s 로 감소하였으며, 랙 감소 속도 또한 기존보다 약 2배 증가하였다.

 

하지만...

random key 추가 후 저녁에 일부 taskmanager가 restart되는 이슈가 확인되었는데, random key의 범위를 너무 크게 잡아 key의 과다 생성이 원인으로 추측되었다.

 

따라서 random key 0~99 범위를 0~9로 변경 후 다시 실행해보았다.

02/01 17:35 경 적용 이후 key의 분산 범위는 다소 높아졌으나 random key 적용 전 기존 최대 bytes in 수치의 약 1/5이어서 처리 시 크게 문제되지 않는 수치였기에 더이상 consumer lag 및 taskmanager restart가 없는 평화를 되찾을 수 있었다.

 

 

 

간단 회고

데이터 파이프라인 구성 시 단순히 데이터를 전송, 적재의 성능을 높이는 것도 중요하지만 

데이터의 내용과 특성을 알고 있어야 더 효율적인 파이프라인을 구성할 수 있다는 걸 경험할 수 있었다.

 

'Flink' 카테고리의 다른 글

Flink cluster migration (Mesos -> Kubernetes)  (0) 2024.03.10

+ Recent posts