Spark on Kubernetes 클러스터에서 ETL 잡을 돌리다 보면, 대부분의 Executor Pod는 멀쩡한데 딱 한두 개 Pod만 OOMKilled로 죽는 상황을 마주하게 된다. 로그를 열어보면 해당 Pod에 할당된 파티션의 데이터가 다른 파티션 대비 수십~수백 배 많다. 이것이 Data Skew 문제다.
Data Skew는 Spark만의 문제가 아니다. 데이터를 키(key) 기반으로 분산하는 모든 시스템 — Kafka, Flink, DB 샤딩, MapReduce — 에서 동일한 구조적 원인으로 발생한다. 이 글에서는 Data Skew가 왜 발생하는지 근본 원인을 파악하고, Spark에서의 구체적 해결법과 함께 분산 시스템 전반에 적용 가능한 포괄적 전략을 다룬다.
목차
- Data Skew란 무엇인가
- 왜 발생하는가 — 근본 원인
- Skew가 OOM으로 이어지는 메커니즘
- Spark에서의 진단 방법
- Spark에서의 해결법
- 분산 시스템 전반의 Skew 해결 전략
- 정리
- References
1. Data Skew란 무엇인가
분산 처리 시스템은 데이터를 여러 노드(파티션)에 나누어 병렬 처리한다. Data Skew는 이 분배가 균등하지 않아 특정 파티션에 데이터가 편중되는 현상이다.

핵심은 전체 처리 시간이 가장 느린 Task에 의해 결정된다는 것이다. 300개 Task 중 299개가 1초에 끝나도, 1개가 25초 걸리면 Stage 전체가 25초다. 그리고 그 1개 Task가 메모리 한도를 초과하면 OOM으로 죽는다.
2. 왜 발생하는가
Data Skew의 근본 원인은 단순하다: 키의 분포가 균등하지 않은데, 시스템은 키 기반으로 데이터를 분배한다.
Hash Partitioning의 구조적 한계
대부분의 분산 시스템은 Hash Partitioning을 사용한다:
partition = hash(key) % numPartitions
해시 함수 자체는 균등 분배를 보장하지만, 같은 키는 반드시 같은 파티션으로 간다. 특정 키의 레코드가 압도적으로 많으면, 해시가 아무리 균등해도 해당 파티션에 데이터가 몰린다.
현실 데이터에서 Skew가 발생하는 대표 패턴
패턴 1: Hot Key — 소수의 키가 대다수 데이터를 차지
-- 예: 전국 주행 데이터를 region_code로 GROUP BY
-- 서울(11)에 전체 데이터의 40%가 집중
SELECT region_code, COUNT(*) as trip_count
FROM trips
GROUP BY region_code
region_code | trip_count
------------|----------
11 (서울) | 4,000,000 ← 전체의 40%
41 (경기) | 2,000,000
26 (부산) | 500,000
... | ...
46 (전남) | 50,000
이 데이터를 GROUP BY region_code로 집계하면, region_code = 11을 처리하는 Task 하나가 400만 건을 혼자 처리해야 한다.
패턴 2: Null/Default 값 집중
-- user_id가 NULL인 비로그인 이벤트가 전체의 30%
SELECT user_id, COUNT(*)
FROM events
GROUP BY user_id
user_id | count
--------|--------
NULL | 3,000,000 ← hash(NULL) → 단일 파티션
user_1 | 500
user_2 | 480
...
NULL이나 빈 문자열, 기본값("unknown", "N/A") 등은 모두 같은 해시값을 가지므로 한 파티션에 몰린다.
패턴 3: Join 시 한쪽 테이블의 키 편중
-- orders 테이블: 대형 마트(store_id=1)가 전체 주문의 50%
-- stores 테이블: 균등 분포
SELECT *
FROM orders o JOIN stores s ON o.store_id = s.store_id
Shuffle Join 시 store_id = 1에 해당하는 orders 레코드가 한 파티션에 집중된다.
패턴 4: 시계열 데이터의 시간대 편중
-- 예: 내비게이션 주행 이벤트를 hour 기준으로 파티셔닝
-- 출퇴근 시간대에 데이터가 극단적으로 집중
SELECT HOUR(departure_time) as hour, COUNT(*) as trip_count
FROM trips
WHERE trip_date = '2025-03-11'
GROUP BY HOUR(departure_time)
hour | trip_count
-----|----------
0 | 12,000
1 | 5,000
...
7 | 180,000
8 | 850,000 ← 피크: 전체의 25%
9 | 420,000
...
12 | 150,000
...
17 | 380,000
18 | 780,000 ← 피크: 전체의 23%
19 | 350,000
...
23 | 30,000
HOUR(departure_time)을 파티션 키로 사용하면 24개 파티션 중 08시, 18시 파티션에 전체 데이터의 약 48%가 몰린다. 나머지 22개 파티션은 한가한데 2개 파티션만 과부하 상태가 된다.
패턴 5: 멱법칙(Power Law) 분포
-- 예: 이커머스 플랫폼의 seller_id별 주문 수
-- 소수의 대형 셀러가 주문 대부분을 차지
SELECT seller_id, COUNT(*) as order_count
FROM orders
GROUP BY seller_id
ORDER BY order_count DESC
seller_id | order_count
------------|------------
seller_001 | 2,500,000 ← 상위 1개 셀러가 전체의 25%
seller_002 | 800,000
seller_003 | 500,000
seller_004 | 300,000
seller_005 | 200,000
...
seller_500 | 1,200
...
seller_9999 | 50 ← 하위 80% 셀러는 각각 수십~수백 건
이것이 멱법칙(Zipf's Law) 분포다. 상위 1%의 키가 전체 데이터의 50% 이상을 차지하는 것이 일반적이며, 소셜 미디어의 팔로워 수, 웹사이트 방문 횟수, 상품 판매량 등 현실 데이터 대부분이 이 분포를 따른다. seller_id로 GROUP BY하면 상위 몇 개 셀러를 처리하는 Task에 데이터가 극단적으로 몰린다.
Spark가 아니어도 Skew는 발생한다
| 시스템 | Skew 발생 지점 | 증상 |
| Kafka | 특정 파티션에 메시지 집중 (키 기반 파티셔닝) | Consumer lag 편중, 특정 Consumer 과부하 |
| Flink | keyBy() 이후 특정 키 편중 | 특정 TaskManager 백프레셔/OOM |
| DB 샤딩 | 샤드 키 편중 | 특정 샤드 hot spot, 쿼리 지연 |
| MapReduce | Reducer 입력 편중 | 특정 Reducer 장시간 실행/OOM |
| Elasticsearch | 라우팅 키 편중 | 특정 샤드 디스크/메모리 과다 사용 |
공통 구조: hash(key) % N 기반 분배 + 키 분포 불균등 = Skew
3. Skew가 OOM으로 이어지는 메커니즘
Spark Executor의 메모리 구조
Executor Memory (spark.executor.memory)
├── Execution Memory (Shuffle, Join, Sort, Aggregation 버퍼)
├── Storage Memory (Cache, Broadcast)
└── User Memory (UDF 객체 등)
+ Off-Heap (spark.executor.memoryOverhead)
Shuffle Read 시, 한 Task가 처리할 파티션 데이터를 Executor 메모리에 올린다. Skew된 파티션의 데이터가 Executor의 Execution Memory를 초과하면:
- 디스크 Spill 발생 → 성능 급락
- Spill로도 감당 불가 →
java.lang.OutOfMemoryError - K8s 환경에서 Executor Pod의 메모리 limit 초과 → OOMKilled (Exit Code 137)
Spark on Kubernetes에서 특정 Executor Pod만 죽는 이유
Spark on Kubernetes는 Spark의 공식 클러스터 매니저 중 하나로, Driver와 Executor를 각각 별도의 Pod로 실행한다(Apache Spark 공식 문서 — Running Spark on Kubernetes). 각 Executor Pod에는 K8s의 resources.limits.memory로 메모리 상한이 설정된다.
Executor Pod 0: partition 0 (50MB) → 정상 완료
Executor Pod 1: partition 1 (50MB) → 정상 완료
Executor Pod 2: partition 2 (5GB) → OOMKilled ← Skew 파티션
Executor Pod 3: partition 3 (50MB) → 정상 완료
Skew된 파티션을 할당받은 Executor Pod만 컨테이너 메모리 limit을 초과하여, Linux 커널의 OOM Killer에 의해 강제 종료(OOMKilled, Exit Code 137)된다. 나머지 Executor Pod는 정상이기 때문에, 로그만 보면 "왜 이 Pod만 죽지?"라는 의문이 생긴다.
4. Spark에서의 진단 방법
Spark UI로 Skew 확인
Spark Web UI → Stages 탭 → 해당 Stage의 Task 목록에서:
| 지표 | 확인 포인트 |
| Shuffle Read Size | Task 간 편차가 10배 이상이면 Skew 의심 |
| Duration | 대부분 Task가 수 초인데 일부만 수 분이면 Skew |
| Spill (Memory/Disk) | 특정 Task만 Spill이 발생하면 Skew |
Summary Metrics의 Min / 25th / Median / 75th / Max 를 비교한다:
Shuffle Read Size:
Min: 1.2 MB
25th: 5.0 MB
Median: 8.0 MB
75th: 12.0 MB
Max: 850.0 MB ← Median 대비 100배 이상 → 심각한 Skew
코드 레벨 진단
// 파티션별 레코드 수 확인
df.groupBy(spark_partition_id()).count().orderBy(desc("count")).show()
// 특정 키의 분포 확인
df.groupBy("join_key").count().orderBy(desc("count")).show(20)
5. Spark에서의 해결법
AQE (Adaptive Query Execution) — Spark 3.x 기본 내장
Spark 3.0부터 도입되고 3.2부터 기본 활성화된 AQE는 런타임 통계를 기반으로 Skew를 자동 감지하고 처리한다.
출처: Apache Spark 공식 문서 — https://spark.apache.org/docs/latest/sql-performance-tuning.html#optimizing-skew-join
핵심 설정
# AQE 활성화 (Spark 3.2+ 기본 true)
spark.sql.adaptive.enabled=true
# Skew Join 최적화 활성화 (기본 true)
spark.sql.adaptive.skewJoin.enabled=true
# Skew 판정 기준: 파티션 크기 > median × factor AND > threshold
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
# Skew 파티션 분할 시 목표 크기
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB
# RebalancePartitions에서도 Skew 최적화 (Spark 3.2+)
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled=true
AQE Skew Join 동작 원리
[Before AQE]
Partition 0: 50MB → Task 0 (1초)
Partition 1: 50MB → Task 1 (1초)
Partition 2: 5GB → Task 2 (OOM 또는 100초)
[After AQE - Skew 파티션 자동 분할]
Partition 0: 50MB → Task 0 (1초)
Partition 1: 50MB → Task 1 (1초)
Partition 2-a: 64MB → Task 2 (1초) ← 5GB를 64MB 단위로 분할
Partition 2-b: 64MB → Task 3 (1초)
Partition 2-c: 64MB → Task 4 (1초)
...
Partition 2-n: 64MB → Task N (1초)
Sort-Merge Join에서 Skew가 감지되면, AQE는 Skew된 파티션을 advisoryPartitionSizeInBytes 크기로 분할하고, 반대쪽 테이블의 해당 파티션을 복제(replicate)하여 각 분할된 Task와 Join한다.
AQE의 한계
- Aggregation(groupBy) Skew는 자동 처리하지 않는다 — Join Skew만 대상
- Spark 3.0 미만에서는 사용 불가
- Skew 감지 기준(
factor=5.0,threshold=256MB)이 데이터 특성에 맞지 않으면 감지 실패
Salting — 수동 키 분산 기법
AQE가 커버하지 못하는 Aggregation Skew나 Spark 2.x 환경에서 사용하는 기법이다. 핵심 아이디어: Skew된 키에 랜덤 접미사를 붙여 여러 파티션으로 분산시킨 뒤, 부분 집계 → 최종 집계 2단계로 처리한다.
Aggregation Salting
import org.apache.spark.sql.functions._
val saltBuckets = 10
// Step 1: 키에 salt 추가 → 부분 집계
val salted = df
.withColumn("salt", (rand() * saltBuckets).cast("int"))
.groupBy(col("region_code"), col("salt"))
.agg(count("*").as("partial_count"), sum("distance").as("partial_distance"))
// Step 2: salt 제거 → 최종 집계
val result = salted
.groupBy("region_code")
.agg(sum("partial_count").as("total_count"), sum("partial_distance").as("total_distance"))
[Before Salting]
region_code=11 → 4,000,000건 → 1개 파티션
[After Salting]
region_code=11, salt=0 → ~400,000건 → 파티션 A
region_code=11, salt=1 → ~400,000건 → 파티션 B
...
region_code=11, salt=9 → ~400,000건 → 파티션 J
→ 10개 파티션으로 분산
Join Salting
val saltBuckets = 10
// 큰 테이블: salt 추가
val saltedOrders = orders
.withColumn("salt", (rand() * saltBuckets).cast("int"))
.withColumn("salted_key", concat(col("store_id"), lit("_"), col("salt")))
// 작은 테이블: salt 범위만큼 복제 (explode)
val saltedStores = stores
.withColumn("salt", explode(lit((0 until saltBuckets).toArray)))
.withColumn("salted_key", concat(col("store_id"), lit("_"), col("salt")))
// salted_key로 Join
val result = saltedOrders.join(saltedStores, "salted_key")
Salting의 트레이드오프
| 장점 | 단점 |
| Aggregation Skew에도 적용 가능 | 코드 복잡도 증가 |
| Spark 버전 무관 | 작은 테이블 복제로 인한 메모리/네트워크 비용 |
| salt 수로 분산 정도 제어 가능 | 2-pass 처리로 Stage 수 증가 |
Broadcast Join — 한쪽이 작을 때
Join 대상 중 한쪽 테이블이 충분히 작으면, Shuffle 자체를 제거하여 Skew 문제를 원천 차단한다.
import org.apache.spark.sql.functions.broadcast
// stores 테이블이 작으면 (기본 10MB 이하 자동, 힌트로 강제 가능)
val result = orders.join(broadcast(stores), "store_id")
# 자동 Broadcast 임계값 (기본 10MB)
spark.sql.autoBroadcastJoinThreshold=10485760
# 필요 시 늘림 (예: 100MB)
spark.sql.autoBroadcastJoinThreshold=104857600
Broadcast Join은 Join 대상 중 작은 쪽 테이블 전체를 Driver가 수집한 뒤, 모든 Executor에 복사(broadcast)하는 방식이다. 각 Executor는 로컬에 복사된 작은 테이블과 자신이 가진 큰 테이블 파티션을 직접 Join한다. 이 과정에서 Shuffle이 발생하지 않으므로 파티셔닝 자체가 불필요하고, Skew가 발생할 여지가 없다.
[일반 Shuffle Join]
큰 테이블 ──Shuffle──┐
├─→ Join (파티션 단위) → Skew 가능
작은 테이블 ─Shuffle──┘
[Broadcast Join]
작은 테이블 ──broadcast──→ 모든 Executor에 복사
큰 테이블 ──(Shuffle 없음)──→ 각 Executor가 로컬 Join → Skew 없음
단, Broadcast 대상 테이블이 Driver와 모든 Executor 메모리에 올라가야 하므로 크기 제한이 있다.
Repartition — 파티션 재분배
특정 키가 아닌 다른 키 조합이나 랜덤으로 재파티셔닝하여 Skew를 완화한다.
// 파티션 수 늘리기
df.repartition(500, col("region_code"), col("user_id"))
// 처리 전 균등 재분배
df.repartition(200)
단, repartition은 전체 Shuffle을 유발하므로 비용이 크다. Skew 해소 효과와 Shuffle 비용을 비교해야 한다.
Null/Default 값 별도 처리
// NULL 키를 분리하여 별도 처리
val nullRows = df.filter(col("user_id").isNull)
val nonNullRows = df.filter(col("user_id").isNotNull)
// nonNullRows만 Join/GroupBy 수행
val joinedNonNull = nonNullRows.join(otherDf, "user_id")
// NULL 행은 비즈니스 로직에 따라 별도 처리 후 합치기
val result = joinedNonNull.unionByName(nullRows.withColumn(...))
해결법 선택 가이드
Skew 발생
│
├─ Join에서 발생?
│ ├─ 한쪽 테이블이 작다 (수백 MB 이하)
│ │ └─ → Broadcast Join
│ ├─ Spark 3.x 사용 중
│ │ └─ → AQE 설정 확인/튜닝
│ └─ Spark 2.x 또는 AQE로 부족
│ └─ → Join Salting
│
├─ Aggregation에서 발생?
│ └─ → Aggregation Salting (2-pass)
│
├─ NULL/Default 값 집중?
│ └─ → NULL 분리 처리
│
└─ 위 방법으로 부족?
└─ → Repartition + 파티션 수 조정
6. 분산 시스템 전반의 Skew 해결 전략
Data Skew는 Spark 고유의 문제가 아니다. 데이터를 키 기반으로 분배하는 모든 분산 시스템에서 구조적으로 발생한다. Kafka, Flink, DB 샤딩, Elasticsearch, MapReduce — 어떤 시스템이든 hash(key) % N 형태의 파티셔닝을 사용하는 순간, 키 분포가 균등하지 않으면 Skew는 필연적이다.
분산 시스템이 데이터를 나누려면 결정론적 분배 규칙이 필요하다. 같은 키의 데이터는 같은 노드로 보내야 Join, Aggregation, 상태 관리가 가능하기 때문이다. 그런데 현실 세계의 데이터는 균등하지 않다. 서울에 인구가 집중되고, 소수의 상품이 매출 대부분을 차지하고, 특정 시간대에 트래픽이 몰린다. "같은 키는 같은 곳으로"라는 분산의 기본 원칙과 "현실 데이터는 편중된다"는 사실이 충돌하는 지점에서 Skew가 발생한다. 이것은 특정 프레임워크의 버그가 아니라, 분산 처리의 구조적 트레이드오프다.
따라서 해결 전략도 Spark에 한정되지 않는 범용적 접근이 필요하다.
키 설계 단계에서 Skew 예방
Skew의 근본 원인은 키 설계에 있다. 파티션/샤드 키를 선택할 때 해당 키의 카디널리티와 분포를 반드시 확인해야 한다.

원칙: 파티션 키의 카디널리티가 파티션 수보다 충분히 커야 하고, 각 키 값의 빈도가 가능한 균등해야 한다.
Composite Key (복합 키)
단일 키의 카디널리티가 낮거나 편중될 때, 여러 필드를 조합하여 분산도를 높인다.
-- Kafka Producer: 단일 키 → 복합 키
Before: key = region_code → 17개 파티션에만 분산
After: key = region_code + user_id → 수백만 개 키로 분산
Pre-Splitting / Pre-Aggregation
데이터가 시스템에 들어오기 전에 미리 분산시키거나, 중간 집계를 수행한다.

Flink에서의 예:
// keyBy 전에 pre-key를 추가하여 분산
stream
.map(event -> new Tuple2<>(event.key + "_" + random.nextInt(10), event))
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(partialReducer) // 부분 집계
.map(t -> new Tuple2<>(extractOriginalKey(t.f0), t.f1))
.keyBy(t -> t.f0)
.reduce(finalReducer) // 최종 집계
Dynamic Rebalancing
런타임에 부하를 감지하고 재분배하는 전략이다.
| 시스템 | 메커니즘 |
| Spark AQE | 런타임 통계 기반 Skew 파티션 자동 분할 |
| Kafka | Partition Reassignment (수동/자동) |
| DynamoDB | Adaptive Capacity — hot 파티션에 자동으로 추가 처리량 할당 |
| Elasticsearch | Index Lifecycle Management + Rollover로 샤드 재분배 |
Skew 모니터링 체계 구축
Skew는 데이터 특성이 변하면 언제든 재발한다. 지속적 모니터링이 필요하다.
[모니터링 지표]
├── 파티션/샤드별 데이터 크기 편차 (max / median 비율)
├── Task/Consumer별 처리 시간 편차
├── Spill 발생 빈도
└── OOM/OOMKilled 발생 빈도
[알림 기준 예시]
- max_partition_size / median_partition_size > 10 → Skew 경고
- 특정 Task 처리 시간 > 전체 median × 5 → Straggler 경고
전략 매트릭스
| 전략 | 적용 시점 | Spark | Kafka | Flink | DB 샤딩 |
| 키 설계 개선 | 설계 단계 | ✅ | ✅ | ✅ | ✅ |
| Composite Key | 설계 단계 | ✅ | ✅ | ✅ | ✅ |
| Salting | 처리 단계 | ✅ | - | ✅ | - |
| Broadcast/Replicate | 처리 단계 | ✅ | - | ✅ | - |
| Pre-Aggregation | 수집/처리 단계 | ✅ | ✅ | ✅ | - |
| Dynamic Rebalancing | 런타임 | ✅ (AQE) | ✅ | ✅ | ✅ |
| NULL 분리 처리 | 처리 단계 | ✅ | ✅ | ✅ | ✅ |
| 모니터링 | 운영 단계 | ✅ | ✅ | ✅ | ✅ |
7. 정리
- Data Skew는 키 분포 불균등 + 키 기반 파티셔닝의 구조적 결과다. Spark만의 문제가 아니라 분산 시스템의 보편적 문제다.
- K8s 환경에서 특정 Pod만 OOMKilled되는 현상은, Skew된 파티션을 할당받은 Executor가 메모리 limit을 초과하기 때문이다.
- Spark 3.x에서는 AQE의 Skew Join 최적화가 기본 활성화되어 있으므로, 먼저 AQE 설정을 확인하고 튜닝한다.
- AQE가 커버하지 못하는 Aggregation Skew에는 Salting 기법을 적용한다.
- 근본적으로는 키 설계 단계에서 Skew를 예방하는 것이 가장 효과적이다.
- Skew는 데이터 특성 변화에 따라 재발하므로, 모니터링 체계를 갖추어야 한다.
8. References
- [1] Apache Spark 공식 문서 — Performance Tuning (AQE, Skew Join): https://spark.apache.org/docs/latest/sql-performance-tuning.html#optimizing-skew-join
- [2] Apache Spark 공식 문서 — Adaptive Query Execution: https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
- [3] Martin Kleppmann, Designing Data-Intensive Applications — Chapter 6: Partitioning (Hash Partitioning, Skew and Hot Spots)
'Engineering > Database System' 카테고리의 다른 글
| 배치 vs 마이크로배치 vs 스트리밍, 그리고 Flink (0) | 2026.03.25 |
|---|---|
| MySQL 타임존 다루기 (0) | 2025.01.20 |
| [Database] MySQL ORDER BY 방식 비교와 성능 (0) | 2024.07.16 |
| [Database] 빅 데이터 (Big Data) 1장 - Big Data Storage System (0) | 2024.04.13 |
| [Database] 데이터 타입 비교 : char vs varchar (0) | 2024.01.17 |
댓글