Data on EKS · Production Guide

Apache Celeborn on Kubernetes

A comprehensive field guide for deploying, scaling, and operating Apache Celeborn as a Remote Shuffle Service on Amazon EKS — covering architecture, storage choices, node topology, graceful upgrades, and production-tested configuration knobs.

Apache Celeborn 0.5+ Amazon EKS Spark on Kubernetes Karpenter Remote Shuffle Service
01

Architecture Overview

Celeborn has three distinct components with different operational characteristics. Understanding these boundaries is the prerequisite for every deployment decision.

Spark Application Layer
Spark Driver
LifecycleManager embedded here. Manages shuffle metadata, requests slots from Master.
Spark Executors (N)
ShuffleClient embedded here. Pushes shuffle data directly to Workers.
↕ RPC     ↕ Data Push
Celeborn Control Plane
Master (×3, Raft HA)
Manages resources. Allocates slots. Syncs state via Raft. Spans multiple AZs. NOT on data path.
Master (Leader)
Active leader handles all slot allocation. Followers ready for failover in seconds.
↕ Push / Fetch
Celeborn Data Plane (same AZ as Executors)
Worker Pod A
Receives pushed shuffle data. Merges partitions. Serves reads. NVMe or EBS backed.
Worker Pod B
Replica target if replication enabled. Dedicated node group, 1 pod per node.
Worker Pod C
Load-balanced by slot count. Unhealthy workers excluded by Master heartbeat.
💡
Critical distinction

Masters are control plane only — they do not touch shuffle data. Workers are the data plane. All performance decisions (storage, placement, thread tuning) apply to Workers, not Masters. Masters should span AZs for Raft quorum. Workers should be co-located with Spark executors.

Minimum production setup: 3 Masters + 4+ Workers. Never run multiple Masters or Workers on the same node.

02

When to Use Celeborn

Celeborn is a strategic choice, not a universal performance accelerator. Based on Data on EKS TPC-DS 3TB benchmarks, the trade-offs are clear.

✓ Right fit for Celeborn

  • Large shuffle workloads (>100GB per job)
  • Spark Dynamic Resource Allocation needed
  • Long-running complex pipelines (>30min)
  • High executor churn causing shuffle loss
  • Multi-tenant clusters needing shuffle isolation
  • Hadoop ESS migration to Kubernetes
  • Storage-compute decoupling required

✗ Not the right fit

  • Small shuffle queries (sub-GB per stage)
  • OLAP-style queries with minimal shuffle
  • Latency-sensitive jobs where +16% overhead matters
  • Simple ETL with bounded data volumes
  • Clusters without dedicated storage-dense nodes
  • Teams without infra capacity to operate a stateful service
📊
Pinterest Production Result

Pinterest's Moka platform (replacing 14,000-node Hadoop processing 120PB/day) adopted Celeborn on EKS and saw overall Spark job performance improve 5% on average, with primary gains in operational stability, reduced shuffle timeouts, and enabling true dynamic executor scaling.

03

Node Groups & AZ Topology

Node topology is the most impactful infrastructure decision for Celeborn performance. Get this right before anything else.

🎯

Dedicated Node Groups

Separate EKS managed node groups for Celeborn Workers and Spark Executors. Never co-locate them — Workers are I/O and network saturated, Executors are CPU/memory saturated.

🗺️

Same AZ Co-location

Celeborn Worker nodes and Spark Executor nodes must be in the same AZ. Cross-AZ shuffle traffic adds 1–5ms per push and destroys throughput at scale.

EC2 Cluster Placement Group

Place both node groups in the same EC2 Cluster Placement Group. This ensures physical proximity within the AZ for maximum network bandwidth (up to 25–100 Gbps ENA).

🔒

Masters Span AZs

Masters are control plane only — not on the data path. Spread 3 Masters across 3 AZs for Raft quorum resilience. An AZ failure loses one Master but the other two maintain quorum.

YAMLceleborn-worker-nodepool.yaml (Karpenter)
apiVersion: karpenter.sh/v1
kind: NodePool
metadata:
  name: celeborn-workers
spec:
  template:
    metadata:
      labels:
        role: celeborn-worker
    spec:
      nodeClassRef:
        name: celeborn-nodeclass
      requirements:
        - key: "topology.kubernetes.io/zone"
          operator: In
          values: ["us-east-1a"]  # pin to same AZ as Spark executors
        - key: "karpenter.k8s.aws/instance-family"
          operator: In
          values: ["i4i", "r6g", "r6gd"]
        - key: "karpenter.sh/capacity-type"
          operator: In
          values: ["on-demand"]  # NEVER spot for workers
      taints:
        - key: dedicated
          value: celeborn
          effect: NoSchedule
  disruption:
    consolidationPolicy: WhenEmpty  # never consolidate live workers
    budgets:
      - nodes: "0"  # no involuntary disruption during business hours
🚫
Never use Spot Instances for Celeborn Workers

A spot reclamation mid-job kills all in-flight shuffle data on that worker. Stage recomputation follows. At scale with dozens of concurrent jobs, one spot reclamation cascades into multiple job failures. Use On-Demand + ODCRs instead.

04

StatefulSet & Helm Deployment

Celeborn's official Helm chart deploys both Masters and Workers as StatefulSets. This shapes every operational decision around upgrades, scaling, and node replacement.

YAMLvalues.yaml — Helm chart key overrides
# ── Masters ──────────────────────────────────────────────
master:
  replicas: 3
  resources:
    requests:
      cpu: "4"
      memory: "8Gi"
  podAntiAffinity: required     # 1 master per node, spread AZs
  topologySpreadConstraints:
    - topologyKey: "topology.kubernetes.io/zone"
      maxSkew: 1
      whenUnsatisfiable: DoNotSchedule

# ── Workers ──────────────────────────────────────────────
worker:
  replicas: 5                     # min 4 for production
  resources:
    requests:
      cpu: "16"
      memory: "64Gi"
  nodeSelector:
    role: celeborn-worker
  tolerations:
    - key: dedicated
      value: celeborn
      effect: NoSchedule
  terminationGracePeriodSeconds: 3600

# ── Celeborn config ───────────────────────────────────────
celeborn:
  celeborn.master.ha.enabled: true
  celeborn.master.ha.ratis.raft.server.storage.dir: "/mnt/ratis"
  celeborn.metrics.enabled: true
  celeborn.worker.storage.dirs: "/mnt/disk1:disktype=SSD:capacity=1000Gi,/mnt/disk2:disktype=SSD:capacity=1000Gi"
  celeborn.worker.flusher.buffer.size: "256k"
  celeborn.rpc.io.serverThreads: 64
  celeborn.rpc.io.clientThreads: 64
  celeborn.worker.graceful.shutdown.enabled: true
  celeborn.worker.graceful.shutdown.timeout: "3540s"  # 60s less than terminationGracePeriod
⚠️
StatefulSet immutable fields

VolumeClaimTemplates and podManagementPolicy are immutable in StatefulSets. You cannot change them via helm upgrade. Any change requires a blue/green migration — see the Upgrades section.

05

Storage: NVMe Instance Store vs. EBS

This is the single most consequential infrastructure decision for Celeborn on EKS.

NVMe Instance Store (i4i, i3en, im4gn)

  • Highest raw shuffle throughput
  • Lowest write/read latency (~100μs vs ~1ms EBS)
  • No EBS throttle ceiling
  • Requires Local Static Provisioner
  • Data lost on node termination or recycle
  • Karpenter consolidation destroys data
  • Requires replication enabled for safety

EBS GP3 (recommended for production)

  • ~5.7% lower throughput vs NVMe (Data on EKS benchmark)
  • Volume survives node recycles and host retirements
  • Detach/reattach during rolling upgrades
  • Independent capacity scaling
  • Provision 2+ volumes per worker for parallelism
  • 16,000 IOPS + 1,000 MB/s per volume
  • Simpler operations — no Local Static Provisioner

The Data on EKS benchmark confirmed that RAID 0 vs individual disks makes no measurable difference — just 0.0008% variance. Configure Celeborn to stripe across individual disks natively via multiple entries in celeborn.worker.storage.dirs.

CONFIGMulti-disk configuration — 2 EBS or 4 NVMe
# 2 EBS GP3 volumes — production baseline
celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1000Gi,/mnt/disk2:disktype=SSD:capacity=1000Gi

# 4 NVMe instance store disks — maximum performance
celeborn.worker.storage.dirs: /mnt/nvme0:disktype=SSD,/mnt/nvme1:disktype=SSD,/mnt/nvme2:disktype=SSD,/mnt/nvme3:disktype=SSD

# Rule of thumb: diskCount × 8 for push threads, diskCount × 8 for fetch threads
celeborn.worker.push.io.threads: 32   # 4 disks × 8
celeborn.worker.fetch.io.threads: 32  # 4 disks × 8

# Increase flusher buffer for HDFS/remote writes
celeborn.worker.flusher.buffer.size: 4m  # 4MB for HDFS, 256k ok for local disk
🔧
Using NVMe with Kubernetes Local Static Provisioner

Install kubernetes-sigs/sig-storage-local-static-provisioner to automatically discover and provision NVMe block devices as PersistentVolumes. Local PVs have node affinity — if a pod reschedules to a different node it will be stuck in Pending. Set PVC deletion policy to Delete to allow pods to create new volumes on new nodes.

06

Thread → Disk Mapping: r7g.16xlarge Deep Dive

The r7g.16xlarge is the recommended EKS Celeborn worker instance: 64 vCPU Graviton3, 512 GiB DDR5, 30 Gbps network, and a 20 Gbps (2,500 MB/s) aggregate EBS bandwidth ceiling.

📌
The two golden rules for thread tuning

push.io.threads = fetch.io.threads = diskCount × 8  |  OFFHEAP = diskCount × diskBuffer × 1.2
At the Celeborn default of 8 total threads, a 4-disk worker uses only 2 threads per disk. Correcting this is the single highest-impact tuning change.

512 GiB DDR5 Memory Allocation (4-disk baseline)
heap 16g
off-heap 77g
OS + JVM + other (~419 GiB)
■ CELEBORN_WORKER_MEMORY = 16g (JVM heap) ■ CELEBORN_WORKER_OFFHEAP_MEMORY = 4 × 16g × 1.2 = 77g (Netty push buffers)
EBS DisksTotal IOPSRequested MB/sEffective MB/sPer-disk MB/sVerdict
2 × GP332,0002,000Each disk gets full provisioned throughput
3 × GP348,0003,0002,500 ⚠833 ⚠Instance cap hit — disks share 2,500 MB/s
4 × GP3 ★64,0004,0002,500 ⚠625 ⚠Best balance: max IOPS parallelism + I/O queue depth despite cap
6 × GP396,0006,0002,500 ✗417 ✗Throughput gain saturated — extra disks add IOPS only
8 × GP3128,0008,0002,500 ✗313 ✗Only justified if IOPS-bound (many small files, high concurrency)
⚠️
4 disks is the sweet spot for r7g.16xlarge — not 2

More disks means more parallel I/O queues, lower per-disk queue depth, better IOPS distribution across concurrent Spark jobs, and higher total IOPS (64,000 vs 32,000). Set throughput per volume to 625 MB/s or 1000 and let the instance cap share it naturally.

YAMLStorageClass + StatefulSet volumeClaimTemplates — r7g.16xlarge, 4× EBS GP3
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: gp3-high-iops
provisioner: ebs.csi.aws.com
parameters:
  type: gp3
  iops: "16000"        # must be explicit — default gp3 = 3,000 IOPS only
  throughput: "1000"  # MB/s per volume; instance cap (2,500) shared across all
volumeBindingMode: WaitForFirstConsumer  # CRITICAL: binds PV to AZ where pod schedules
reclaimPolicy: Delete
allowVolumeExpansion: true

# ── Derived thread and memory config ─────────────────────────────────────────────
celeborn.worker.push.io.threads: 32   # 4 disks × 8
celeborn.worker.fetch.io.threads: 32  # 4 disks × 8
CELEBORN_WORKER_MEMORY: 16g
CELEBORN_WORKER_OFFHEAP_MEMORY: 77g  # 4 × 16g × 1.2 Netty overhead
🔒
WaitForFirstConsumer is non-negotiable

Without it, the EBS PV binds to a random AZ when the PVC is created — before the pod schedules. If the PV lands in us-east-1b and your NodePool pins workers to us-east-1a, the pod will be permanently stuck in Pending with "node had no available volume zone".

07

Key Configuration Knobs

Most defaults are tuned for small clusters and need adjustment for production EKS deployments.

ParameterDefaultRecommendedWhat it controls
celeborn.worker.storage.dirsMount points for shuffle data. Each entry = a parallel write lane.
celeborn.worker.push.io.threads8Threads handling incoming push from Spark executors. Severely underutilizes CPU at default.
celeborn.worker.fetch.io.threads8Threads serving shuffle reads to reducers.
celeborn.worker.flusher.buffer.size256kWrite buffer per flusher thread.
CELEBORN_WORKER_OFFHEAP_MEMORY2gOff-heap Netty push buffers. r7g.16xlarge + 4 disks = 77g. The primary bottleneck at scale.
CELEBORN_WORKER_MEMORY2gJVM heap. Off-heap does the heavy lifting.
celeborn.worker.replicate.shuffle.files.enabledfalseReplicates partition data to a second worker.
celeborn.worker.graceful.shutdown.enabledfalseEnables graceful shutdown for Karpenter drains and rolling updates.
celeborn.worker.graceful.shutdown.timeout600sSet 60s less than pod terminationGracePeriodSeconds.
celeborn.rpc.io.serverThreads8RPC server I/O threads. Increase for high connection counts.
celeborn.rpc.io.clientThreads8RPC client threads for replication and master communication.
celeborn.worker.commitFiles.timeout120sTimeout for commit operations on large shuffle stages.
celeborn.worker.replicate.fastFail.duration60sHow long to wait before failing replication.
🧵
Default thread counts severely under-utilize compute

At default settings (8 push + 8 fetch), Celeborn used only ~3–4 cores out of 64 on an r7g.16xlarge. With the corrected 4-disk formula (32 push + 32 fetch), utilization jumped to ~15 active cores.

08

Replication & High Availability

Celeborn provides two independent HA mechanisms at different layers.

🏛️

Master HA — Raft Consensus

3 Masters elected via Apache Ratis (Raft). Leader failure triggers re-election in seconds. Workers and Clients automatically reconnect to the new leader. Deploy Masters across 3 AZs.

📋

Worker Data Replication

When celeborn.worker.replicate.shuffle.files.enabled=true, each partition is pushed to two workers. Essential with NVMe instance store. ~2× write bandwidth cost.

CONFIGReplication configuration
# Server-side replication
celeborn.worker.replicate.shuffle.files.enabled: true
celeborn.worker.replicate.fastFail.duration: 240s

# Client-side replication toggle — disable when HDFS is the DFS tier
spark.celeborn.client.push.replicate.enabled: true

# Rack-aware replication — places primary and replica in different AZs
celeborn.client.reserveSlots.rackaware.enabled: true
09

Worker Sizing & Capacity Planning

Worker slot count drives capacity. Celeborn computes slots as: total usable disk size ÷ average shuffle file size. When slots are exhausted, the Master rejects new allocations.

Instance TypevCPURAMStorageBest For
r7g.16xlarge ★64512 GiB DDR54 × EBS GP3 (20 Gbps cap)Recommended production baseline. Graviton3, 30 Gbps network. 4-disk config = 64,000 IOPS, 2,500 MB/s effective.
r7g.8xlarge32256 GiB DDR52–3 × EBS GP3 (10 Gbps cap)Half-size baseline. 10 Gbps EBS cap = 1,250 MB/s max.
i4i.8xlarge32256 GB2 × 3.75TB NVMeMax raw throughput, no EBS cap. Requires replication + Local Static Provisioner.
i3en.6xlarge24192 GB2 × 7.5TB NVMeVery large shuffle data volumes (multi-TB per job).
r6gd.8xlarge32256 GB1 × 1.9TB NVMeMemory-heavy workloads, cost-optimized. Single NVMe limits parallelism.
📐
r7g.16xlarge capacity sizing (4-disk config)

4 × 1 TiB EBS GP3 = 4 TiB usable shuffle storage per node. At average shuffle partition size of 64 MB = ~65,000 slots per worker. For 50–100 concurrent Spark jobs, start with 5 workers and add when Grafana shows slot utilization above 80%.

10

Autoscaling Workers

Celeborn workers are hard to autoscale dynamically. They are stateful and you cannot kill a worker without draining it first. Most production deployments run a fixed worker fleet sized for peak load.

What works: Scale Up

Adding new worker replicas is safe. New pods register with the Master immediately. Existing jobs are unaffected.

⚠️

What's risky: Scale Down

Removing workers must go through the decommission API. A worker abruptly killed while holding active shuffle partitions causes job failures.

🕐

Time-based Scaling (Pinterest pattern)

Scale up worker clusters ahead of known batch job waves rather than reacting to real-time metrics.

🔐

Karpenter do-not-disrupt

Annotate worker pods with karpenter.sh/do-not-disrupt: "true" to prevent involuntary disruption. Only remove after triggering the decommission API.

11

Graceful Decommission

Abruptly killing a Celeborn worker pod destroys all in-flight shuffle partitions on that worker for every active job.

01
Drain Triggered
Karpenter or manual drain signals node eviction
02
SIGTERM → preStop
Pod receives SIGTERM, preStop hook fires immediately
03
Decommission API
Worker stops accepting new slots, completes in-flight ops
04
Grace Period Wait
K8s waits up to terminationGracePeriodSeconds
05
Clean Exit
Worker exits cleanly, or SIGKILL after grace period
YAMLWorker StatefulSet — graceful shutdown config
spec:
  template:
    metadata:
      annotations:
        karpenter.sh/do-not-disrupt: "true"
    spec:
      terminationGracePeriodSeconds: 3600
      containers:
      - name: celeborn-worker
        lifecycle:
          preStop:
            exec:
              command:
                - "/bin/sh"
                - "-c"
                - "/opt/celeborn/sbin/decommission-worker.sh"

# decommission-worker.sh calls the Celeborn HTTP API:
# curl -X POST http://localhost:9096/exit?type=Decommission
⏱️
Tune terminationGracePeriodSeconds to your workload

Start at 3600s (1 hour). Set Celeborn's internal celeborn.worker.graceful.shutdown.timeout to exactly 60 seconds less than this value to allow clean process exit before SIGKILL.

12

Upgrades — Blue/Green is the Only Safe Path

StatefulSet immutable fields mean you cannot do a simple helm upgrade for all changes.

Change TypeStrategyNotes
Config-only (no immutable fields)Rolling update via helm upgradeSafe. In-flight jobs continue on old workers while new jobs route to upgraded workers.
VolumeClaimTemplate changeBlue/Green StatefulSet migrationDeploy new StatefulSet with updated PVC spec. Decommission old workers one by one.
podManagementPolicy changeBlue/Green StatefulSet migrationCannot be changed in-place.
Major version upgrade (e.g. 0.4 → 0.5)Blue/Green with staged traffic shiftDeploy new version alongside old. Update Spark job configs to point to new master endpoints.
1

Deploy new StatefulSet (green)

Deploy the new Celeborn version. New Masters and Workers come up alongside the old ones. Validate all new Workers are registered and healthy.

2

Update Spark job endpoints

Update spark.celeborn.master.endpoints in your SparkApplication CRDs to point to the new Master addresses. New jobs use the new cluster.

3

Decommission old workers

Call the decommission API on each old worker. Wait for active shuffle count to drop to zero before proceeding.

4

Delete old StatefulSet

Once all old workers are drained and no active jobs reference them, delete the old StatefulSet.

13

Multi-tier Storage & S3 Status

Celeborn supports a tiered storage model: memory → local disk → DFS/object store. This is not a spill-on-AZ-failure mechanism.

Celeborn Worker Storage Hierarchy (hot → cold)
① Push Region (Memory)
Incoming push data buffered in off-heap memory. Flushed when buffer threshold exceeded.
② Local Disk (SSD/NVMe)
Primary persistent storage. Workers prefer this tier always.
③ DFS / Object Store
Overflow tier when local disk runs low. HDFS is stable. S3 under active development.
🚧
S3 as a DFS tier is not production-ready yet (as of v0.6.x)

There are known bugs in mixed SSD + S3 tier configurations where the S3 eviction path uses the wrong writer (LocalTierWriter), causing NoSuchFileException. Do not use S3 as a DFS tier in production until these are resolved. Watch CELEBORN-2143. HDFS is the stable choice today.

📦

HDFS (stable, tested)

Fully supported and battle-tested. Operationally heavier — adds another stateful system.

💾

Larger EBS volumes (pragmatic)

The simplest approach: provision large EBS GP3 volumes so local disk never fills. Avoids the DFS tier entirely for most workloads.

S3 Express One Zone (future)

Single-digit millisecond latency, same-AZ access. Wait for the S3 integration to stabilize.

🔄

Stage Recomputation (default fallback)

Without any DFS tier, Celeborn's LifecycleManager triggers Spark stage recomputation. Acceptable if recomputation cost is low and jobs run on EBS-backed workers.

14

AZ Resilience on AWS

RiskSeverityAWS Mitigation
AZ capacity unavailable for target instance typeMediumUse EC2 On-Demand Capacity Reservations (ODCRs) for your Celeborn worker instance family in the target AZ.
AZ-level failure (power, networking)HighMasters across 3 AZs survive. Workers and in-flight shuffle data lost. Jobs fail and recompute. Plan for it in SLAs.
Single worker node failureMediumEnable partition replication. Reads succeed from replica.
Scheduled host maintenance / instance retirementLowEBS-backed workers: volume survives retirement, reattaches to replacement node. NVMe: data lost.
Extended AZ degradation (>weeks)MediumMonitor AWS Health API. Have a runbook to shift target AZ if needed.
15

Benchmarks — Data on EKS TPC-DS 3TB

Results from the Data on EKS TPC-DS 3TB benchmark on Amazon EKS (EKS 1.33, r6g.8xlarge workers).

Overall result: Celeborn showed +16% overall runtime vs native Spark shuffle on TPC-DS 3TB. Performance is highly query-dependent — large shuffle queries improved, small shuffle queries regressed significantly.

Top 5 Query Improvements with Celeborn

q99-v2.4
+10.1%
q21-v2.4
+9.2%
q22-v2.4
+6.8%
q15-v2.4
+6.5%
q45-v2.4
+5.8%

Top 5 Query Regressions with Celeborn

q91-v2.4
-135.8%
q92-v2.4
-121.3%
q39b-v2.4
-100.9%
q65-v2.4
-78.9%
q68-v2.4
-73.3%
📌
Key benchmark finding: set spark.sql.adaptive.localShuffleReader.enabled=false

This single parameter had the largest impact on stability. With it enabled, the worst-performing query regressed by ~240%. Setting it to false reduced that to ~40%. Always disable this when using Celeborn.

16

Monitoring & Key Metrics

Use the community-maintained Grafana dashboards from the Celeborn GitHub repo (assets/grafana/).

MetricAlert ThresholdWhat it means
Worker slot utilizationAlert at >80%Approaching capacity. Scale workers before jobs fail with allocation errors.
Active shuffle countBaseline + watchNumber of in-progress shuffle operations. Spike = job surge incoming.
Push data latency (p99)Alert at >100msNetwork congestion or worker overload.
Fetch data latency (p99)Alert at >200msReducer read latency. High values indicate storage bottleneck.
Lost worker countAlert at >0Workers that missed heartbeat. Immediate action required.
Excluded worker countAlert at >0Workers isolated by Master due to disk or health issues.
Worker disk utilizationAlert at >85%Worker will be excluded by Master when disks fill.
Netty direct memoryAlert at >80% of OFFHEAPOff-heap push buffer pressure. Increase CELEBORN_WORKER_OFFHEAP_MEMORY.
17

Spark Integration Configuration

PROPERTIESspark-defaults.conf — Celeborn integration
# ── Core shuffle manager ─────────────────────────────────────
spark.shuffle.manager                          org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.shuffle.service.enabled                  false
spark.serializer                               org.apache.spark.serializer.KryoSerializer

# ── Master endpoints (all 3 for HA) ─────────────────────────
spark.celeborn.master.endpoints  celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local:9097,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local:9097,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local:9097

# ── AQE — critical settings ──────────────────────────────────
spark.sql.adaptive.enabled                     true
spark.sql.adaptive.localShuffleReader.enabled  false  # CRITICAL — see benchmark note
spark.sql.adaptive.skewJoin.enabled            true

# ── Dynamic allocation ───────────────────────────────────────
spark.dynamicAllocation.enabled                true
spark.dynamicAllocation.shuffleTracking.enabled false  # must be false with Celeborn
spark.shuffle.sort.io.plugin.class             org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO  # Spark >= 3.5

# ── Writer mode ──────────────────────────────────────────────
spark.celeborn.client.spark.shuffle.writer     hash  # hash or sort — hash uses more memory

# ── Replication ──────────────────────────────────────────────
spark.celeborn.client.push.replicate.enabled   true   # false if using HDFS as DFS tier

# ── Timeouts ─────────────────────────────────────────────────
spark.celeborn.client.push.stageEnd.timeout    120s
spark.network.timeout                          600s
18

Production Learnings

Distilled from Pinterest (Moka on EKS), Data on EKS benchmarks, and large-scale Alibaba deployments.

Pinterest: Celeborn solved noisy-neighbor shuffle issues from YARN ESS

Their previous Hadoop External Shuffle Service caused disk contention and shuffle timeouts. Celeborn's dedicated worker fleet eliminated this class of failure entirely and gave them 5% average job speedup at scale.

Thread count defaults are production anti-patterns

The defaults (8 push threads, 8 fetch threads) were tuned for small single-machine deployments. On a 32-core worker with 2 EBS disks, you need 32 push threads and 32 fetch threads minimum. Failing to tune this leaves 90% of your compute idle during shuffle operations.

Off-heap memory is the most common bottleneck

CELEBORN_WORKER_OFFHEAP_MEMORY defaults to 2g. At peak push load across dozens of concurrent jobs, this saturates immediately. Allocate at least 50% of available instance memory as off-heap.

Do not use Celeborn as a universal performance booster

Celeborn adds ~16% overhead on mixed workloads. Its real value is operational stability for large shuffle jobs, enabling true dynamic allocation, and decoupling storage from compute.

Size for peak, not average

Monitor peak slot utilization over a 2-week window and add 30% headroom. Proactive scaling beats reactive scaling for a stateful shuffle service.

EBS beats NVMe for most production teams

The 5.7% performance difference is rarely worth the operational complexity of NVMe instance store. Use EBS GP3 with 2+ volumes per worker and size the IOPS correctly.

ComponentDecisionRationale
Celeborn Masters3 replicas, span 3 AZsRaft quorum, control plane HA. Transparent to jobs on leader failover.
Celeborn WorkersDedicated nodegroup, same AZ as ExecutorsNo noisy neighbors. Co-location minimizes shuffle network latency.
StorageEBS GP3 (2+ volumes per worker)Operational safety. Volumes survive node events.
ReplicationEnabled for NVMe, optional for EBSNVMe nodes are ephemeral. EBS volumes persist.
Instance typeOn-demand only, ODCRs in target AZNo spot. ODCR pre-reserves capacity.
Disruptionkarpenter.sh/do-not-disrupt + preStop hookPrevent involuntary kills. Always decommission before termination.
DFS tierHDFS (stable) or large EBS (pragmatic)S3 tier not production-ready yet.
Spark AQElocalShuffleReader=falseMost impactful single setting — prevents severe query regressions.