Architecture Overview
Celeborn has three distinct components with different operational characteristics. Understanding these boundaries is the prerequisite for every deployment decision.
Masters are control plane only — they do not touch shuffle data. Workers are the data plane. All performance decisions (storage, placement, thread tuning) apply to Workers, not Masters. Masters should span AZs for Raft quorum. Workers should be co-located with Spark executors.
Minimum production setup: 3 Masters + 4+ Workers. Never run multiple Masters or Workers on the same node.
When to Use Celeborn
Celeborn is a strategic choice, not a universal performance accelerator. Based on Data on EKS TPC-DS 3TB benchmarks, the trade-offs are clear.
✓ Right fit for Celeborn
- Large shuffle workloads (>100GB per job)
- Spark Dynamic Resource Allocation needed
- Long-running complex pipelines (>30min)
- High executor churn causing shuffle loss
- Multi-tenant clusters needing shuffle isolation
- Hadoop ESS migration to Kubernetes
- Storage-compute decoupling required
✗ Not the right fit
- Small shuffle queries (sub-GB per stage)
- OLAP-style queries with minimal shuffle
- Latency-sensitive jobs where +16% overhead matters
- Simple ETL with bounded data volumes
- Clusters without dedicated storage-dense nodes
- Teams without infra capacity to operate a stateful service
Pinterest's Moka platform (replacing 14,000-node Hadoop processing 120PB/day) adopted Celeborn on EKS and saw overall Spark job performance improve 5% on average, with primary gains in operational stability, reduced shuffle timeouts, and enabling true dynamic executor scaling.
Node Groups & AZ Topology
Node topology is the most impactful infrastructure decision for Celeborn performance. Get this right before anything else.
Dedicated Node Groups
Separate EKS managed node groups for Celeborn Workers and Spark Executors. Never co-locate them — Workers are I/O and network saturated, Executors are CPU/memory saturated.
Same AZ Co-location
Celeborn Worker nodes and Spark Executor nodes must be in the same AZ. Cross-AZ shuffle traffic adds 1–5ms per push and destroys throughput at scale.
EC2 Cluster Placement Group
Place both node groups in the same EC2 Cluster Placement Group. This ensures physical proximity within the AZ for maximum network bandwidth (up to 25–100 Gbps ENA).
Masters Span AZs
Masters are control plane only — not on the data path. Spread 3 Masters across 3 AZs for Raft quorum resilience. An AZ failure loses one Master but the other two maintain quorum.
apiVersion: karpenter.sh/v1 kind: NodePool metadata: name: celeborn-workers spec: template: metadata: labels: role: celeborn-worker spec: nodeClassRef: name: celeborn-nodeclass requirements: - key: "topology.kubernetes.io/zone" operator: In values: ["us-east-1a"] # pin to same AZ as Spark executors - key: "karpenter.k8s.aws/instance-family" operator: In values: ["i4i", "r6g", "r6gd"] - key: "karpenter.sh/capacity-type" operator: In values: ["on-demand"] # NEVER spot for workers taints: - key: dedicated value: celeborn effect: NoSchedule disruption: consolidationPolicy: WhenEmpty # never consolidate live workers budgets: - nodes: "0" # no involuntary disruption during business hours
A spot reclamation mid-job kills all in-flight shuffle data on that worker. Stage recomputation follows. At scale with dozens of concurrent jobs, one spot reclamation cascades into multiple job failures. Use On-Demand + ODCRs instead.
StatefulSet & Helm Deployment
Celeborn's official Helm chart deploys both Masters and Workers as StatefulSets. This shapes every operational decision around upgrades, scaling, and node replacement.
# ── Masters ────────────────────────────────────────────── master: replicas: 3 resources: requests: cpu: "4" memory: "8Gi" podAntiAffinity: required # 1 master per node, spread AZs topologySpreadConstraints: - topologyKey: "topology.kubernetes.io/zone" maxSkew: 1 whenUnsatisfiable: DoNotSchedule # ── Workers ────────────────────────────────────────────── worker: replicas: 5 # min 4 for production resources: requests: cpu: "16" memory: "64Gi" nodeSelector: role: celeborn-worker tolerations: - key: dedicated value: celeborn effect: NoSchedule terminationGracePeriodSeconds: 3600 # ── Celeborn config ─────────────────────────────────────── celeborn: celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: "/mnt/ratis" celeborn.metrics.enabled: true celeborn.worker.storage.dirs: "/mnt/disk1:disktype=SSD:capacity=1000Gi,/mnt/disk2:disktype=SSD:capacity=1000Gi" celeborn.worker.flusher.buffer.size: "256k" celeborn.rpc.io.serverThreads: 64 celeborn.rpc.io.clientThreads: 64 celeborn.worker.graceful.shutdown.enabled: true celeborn.worker.graceful.shutdown.timeout: "3540s" # 60s less than terminationGracePeriod
VolumeClaimTemplates and podManagementPolicy are immutable in StatefulSets. You cannot change them via helm upgrade. Any change requires a blue/green migration — see the Upgrades section.
Storage: NVMe Instance Store vs. EBS
This is the single most consequential infrastructure decision for Celeborn on EKS.
NVMe Instance Store (i4i, i3en, im4gn)
- Highest raw shuffle throughput
- Lowest write/read latency (~100μs vs ~1ms EBS)
- No EBS throttle ceiling
- Requires Local Static Provisioner
- Data lost on node termination or recycle
- Karpenter consolidation destroys data
- Requires replication enabled for safety
EBS GP3 (recommended for production)
- ~5.7% lower throughput vs NVMe (Data on EKS benchmark)
- Volume survives node recycles and host retirements
- Detach/reattach during rolling upgrades
- Independent capacity scaling
- Provision 2+ volumes per worker for parallelism
- 16,000 IOPS + 1,000 MB/s per volume
- Simpler operations — no Local Static Provisioner
The Data on EKS benchmark confirmed that RAID 0 vs individual disks makes no measurable difference — just 0.0008% variance. Configure Celeborn to stripe across individual disks natively via multiple entries in celeborn.worker.storage.dirs.
# 2 EBS GP3 volumes — production baseline celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1000Gi,/mnt/disk2:disktype=SSD:capacity=1000Gi # 4 NVMe instance store disks — maximum performance celeborn.worker.storage.dirs: /mnt/nvme0:disktype=SSD,/mnt/nvme1:disktype=SSD,/mnt/nvme2:disktype=SSD,/mnt/nvme3:disktype=SSD # Rule of thumb: diskCount × 8 for push threads, diskCount × 8 for fetch threads celeborn.worker.push.io.threads: 32 # 4 disks × 8 celeborn.worker.fetch.io.threads: 32 # 4 disks × 8 # Increase flusher buffer for HDFS/remote writes celeborn.worker.flusher.buffer.size: 4m # 4MB for HDFS, 256k ok for local disk
Install kubernetes-sigs/sig-storage-local-static-provisioner to automatically discover and provision NVMe block devices as PersistentVolumes. Local PVs have node affinity — if a pod reschedules to a different node it will be stuck in Pending. Set PVC deletion policy to Delete to allow pods to create new volumes on new nodes.
Thread → Disk Mapping: r7g.16xlarge Deep Dive
The r7g.16xlarge is the recommended EKS Celeborn worker instance: 64 vCPU Graviton3, 512 GiB DDR5, 30 Gbps network, and a 20 Gbps (2,500 MB/s) aggregate EBS bandwidth ceiling.
push.io.threads = fetch.io.threads = diskCount × 8 | OFFHEAP = diskCount × diskBuffer × 1.2
At the Celeborn default of 8 total threads, a 4-disk worker uses only 2 threads per disk. Correcting this is the single highest-impact tuning change.
| EBS Disks | Total IOPS | Requested MB/s | Effective MB/s | Per-disk MB/s | Verdict |
|---|---|---|---|---|---|
| 2 × GP3 | 32,000 | 2,000 | 2,000 | 1,000 ✓ | Each disk gets full provisioned throughput |
| 3 × GP3 | 48,000 | 3,000 | 2,500 ⚠ | 833 ⚠ | Instance cap hit — disks share 2,500 MB/s |
| 4 × GP3 ★ | 64,000 | 4,000 | 2,500 ⚠ | 625 ⚠ | Best balance: max IOPS parallelism + I/O queue depth despite cap |
| 6 × GP3 | 96,000 | 6,000 | 2,500 ✗ | 417 ✗ | Throughput gain saturated — extra disks add IOPS only |
| 8 × GP3 | 128,000 | 8,000 | 2,500 ✗ | 313 ✗ | Only justified if IOPS-bound (many small files, high concurrency) |
More disks means more parallel I/O queues, lower per-disk queue depth, better IOPS distribution across concurrent Spark jobs, and higher total IOPS (64,000 vs 32,000). Set throughput per volume to 625 MB/s or 1000 and let the instance cap share it naturally.
apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: gp3-high-iops provisioner: ebs.csi.aws.com parameters: type: gp3 iops: "16000" # must be explicit — default gp3 = 3,000 IOPS only throughput: "1000" # MB/s per volume; instance cap (2,500) shared across all volumeBindingMode: WaitForFirstConsumer # CRITICAL: binds PV to AZ where pod schedules reclaimPolicy: Delete allowVolumeExpansion: true # ── Derived thread and memory config ───────────────────────────────────────────── celeborn.worker.push.io.threads: 32 # 4 disks × 8 celeborn.worker.fetch.io.threads: 32 # 4 disks × 8 CELEBORN_WORKER_MEMORY: 16g CELEBORN_WORKER_OFFHEAP_MEMORY: 77g # 4 × 16g × 1.2 Netty overhead
Without it, the EBS PV binds to a random AZ when the PVC is created — before the pod schedules. If the PV lands in us-east-1b and your NodePool pins workers to us-east-1a, the pod will be permanently stuck in Pending with "node had no available volume zone".
Key Configuration Knobs
Most defaults are tuned for small clusters and need adjustment for production EKS deployments.
| Parameter | Default | Recommended | What it controls |
|---|---|---|---|
| celeborn.worker.storage.dirs | — | 1 entry per disk | Mount points for shuffle data. Each entry = a parallel write lane. |
| celeborn.worker.push.io.threads | 8 | diskCount × 8 | Threads handling incoming push from Spark executors. Severely underutilizes CPU at default. |
| celeborn.worker.fetch.io.threads | 8 | diskCount × 8 | Threads serving shuffle reads to reducers. |
| celeborn.worker.flusher.buffer.size | 256k | 256k (local) / 4m (HDFS) | Write buffer per flusher thread. |
| CELEBORN_WORKER_OFFHEAP_MEMORY | 2g | diskCount × 16g × 1.2 | Off-heap Netty push buffers. r7g.16xlarge + 4 disks = 77g. The primary bottleneck at scale. |
| CELEBORN_WORKER_MEMORY | 2g | 16g (r7g.16xlarge) | JVM heap. Off-heap does the heavy lifting. |
| celeborn.worker.replicate.shuffle.files.enabled | false | true (NVMe) / optional (EBS) | Replicates partition data to a second worker. |
| celeborn.worker.graceful.shutdown.enabled | false | true | Enables graceful shutdown for Karpenter drains and rolling updates. |
| celeborn.worker.graceful.shutdown.timeout | 600s | terminationGracePeriod - 60s | Set 60s less than pod terminationGracePeriodSeconds. |
| celeborn.rpc.io.serverThreads | 8 | 64 | RPC server I/O threads. Increase for high connection counts. |
| celeborn.rpc.io.clientThreads | 8 | 64 | RPC client threads for replication and master communication. |
| celeborn.worker.commitFiles.timeout | 120s | 240s | Timeout for commit operations on large shuffle stages. |
| celeborn.worker.replicate.fastFail.duration | 60s | 240s | How long to wait before failing replication. |
At default settings (8 push + 8 fetch), Celeborn used only ~3–4 cores out of 64 on an r7g.16xlarge. With the corrected 4-disk formula (32 push + 32 fetch), utilization jumped to ~15 active cores.
Replication & High Availability
Celeborn provides two independent HA mechanisms at different layers.
Master HA — Raft Consensus
3 Masters elected via Apache Ratis (Raft). Leader failure triggers re-election in seconds. Workers and Clients automatically reconnect to the new leader. Deploy Masters across 3 AZs.
Worker Data Replication
When celeborn.worker.replicate.shuffle.files.enabled=true, each partition is pushed to two workers. Essential with NVMe instance store. ~2× write bandwidth cost.
# Server-side replication celeborn.worker.replicate.shuffle.files.enabled: true celeborn.worker.replicate.fastFail.duration: 240s # Client-side replication toggle — disable when HDFS is the DFS tier spark.celeborn.client.push.replicate.enabled: true # Rack-aware replication — places primary and replica in different AZs celeborn.client.reserveSlots.rackaware.enabled: true
Worker Sizing & Capacity Planning
Worker slot count drives capacity. Celeborn computes slots as: total usable disk size ÷ average shuffle file size. When slots are exhausted, the Master rejects new allocations.
| Instance Type | vCPU | RAM | Storage | Best For |
|---|---|---|---|---|
| r7g.16xlarge ★ | 64 | 512 GiB DDR5 | 4 × EBS GP3 (20 Gbps cap) | Recommended production baseline. Graviton3, 30 Gbps network. 4-disk config = 64,000 IOPS, 2,500 MB/s effective. |
| r7g.8xlarge | 32 | 256 GiB DDR5 | 2–3 × EBS GP3 (10 Gbps cap) | Half-size baseline. 10 Gbps EBS cap = 1,250 MB/s max. |
| i4i.8xlarge | 32 | 256 GB | 2 × 3.75TB NVMe | Max raw throughput, no EBS cap. Requires replication + Local Static Provisioner. |
| i3en.6xlarge | 24 | 192 GB | 2 × 7.5TB NVMe | Very large shuffle data volumes (multi-TB per job). |
| r6gd.8xlarge | 32 | 256 GB | 1 × 1.9TB NVMe | Memory-heavy workloads, cost-optimized. Single NVMe limits parallelism. |
4 × 1 TiB EBS GP3 = 4 TiB usable shuffle storage per node. At average shuffle partition size of 64 MB = ~65,000 slots per worker. For 50–100 concurrent Spark jobs, start with 5 workers and add when Grafana shows slot utilization above 80%.
Autoscaling Workers
Celeborn workers are hard to autoscale dynamically. They are stateful and you cannot kill a worker without draining it first. Most production deployments run a fixed worker fleet sized for peak load.
What works: Scale Up
Adding new worker replicas is safe. New pods register with the Master immediately. Existing jobs are unaffected.
What's risky: Scale Down
Removing workers must go through the decommission API. A worker abruptly killed while holding active shuffle partitions causes job failures.
Time-based Scaling (Pinterest pattern)
Scale up worker clusters ahead of known batch job waves rather than reacting to real-time metrics.
Karpenter do-not-disrupt
Annotate worker pods with karpenter.sh/do-not-disrupt: "true" to prevent involuntary disruption. Only remove after triggering the decommission API.
Graceful Decommission
Abruptly killing a Celeborn worker pod destroys all in-flight shuffle partitions on that worker for every active job.
spec: template: metadata: annotations: karpenter.sh/do-not-disrupt: "true" spec: terminationGracePeriodSeconds: 3600 containers: - name: celeborn-worker lifecycle: preStop: exec: command: - "/bin/sh" - "-c" - "/opt/celeborn/sbin/decommission-worker.sh" # decommission-worker.sh calls the Celeborn HTTP API: # curl -X POST http://localhost:9096/exit?type=Decommission
Start at 3600s (1 hour). Set Celeborn's internal celeborn.worker.graceful.shutdown.timeout to exactly 60 seconds less than this value to allow clean process exit before SIGKILL.
Upgrades — Blue/Green is the Only Safe Path
StatefulSet immutable fields mean you cannot do a simple helm upgrade for all changes.
| Change Type | Strategy | Notes |
|---|---|---|
| Config-only (no immutable fields) | Rolling update via helm upgrade | Safe. In-flight jobs continue on old workers while new jobs route to upgraded workers. |
| VolumeClaimTemplate change | Blue/Green StatefulSet migration | Deploy new StatefulSet with updated PVC spec. Decommission old workers one by one. |
| podManagementPolicy change | Blue/Green StatefulSet migration | Cannot be changed in-place. |
| Major version upgrade (e.g. 0.4 → 0.5) | Blue/Green with staged traffic shift | Deploy new version alongside old. Update Spark job configs to point to new master endpoints. |
Deploy new StatefulSet (green)
Deploy the new Celeborn version. New Masters and Workers come up alongside the old ones. Validate all new Workers are registered and healthy.
Update Spark job endpoints
Update spark.celeborn.master.endpoints in your SparkApplication CRDs to point to the new Master addresses. New jobs use the new cluster.
Decommission old workers
Call the decommission API on each old worker. Wait for active shuffle count to drop to zero before proceeding.
Delete old StatefulSet
Once all old workers are drained and no active jobs reference them, delete the old StatefulSet.
Multi-tier Storage & S3 Status
Celeborn supports a tiered storage model: memory → local disk → DFS/object store. This is not a spill-on-AZ-failure mechanism.
There are known bugs in mixed SSD + S3 tier configurations where the S3 eviction path uses the wrong writer (LocalTierWriter), causing NoSuchFileException. Do not use S3 as a DFS tier in production until these are resolved. Watch CELEBORN-2143. HDFS is the stable choice today.
HDFS (stable, tested)
Fully supported and battle-tested. Operationally heavier — adds another stateful system.
Larger EBS volumes (pragmatic)
The simplest approach: provision large EBS GP3 volumes so local disk never fills. Avoids the DFS tier entirely for most workloads.
S3 Express One Zone (future)
Single-digit millisecond latency, same-AZ access. Wait for the S3 integration to stabilize.
Stage Recomputation (default fallback)
Without any DFS tier, Celeborn's LifecycleManager triggers Spark stage recomputation. Acceptable if recomputation cost is low and jobs run on EBS-backed workers.
AZ Resilience on AWS
| Risk | Severity | AWS Mitigation |
|---|---|---|
| AZ capacity unavailable for target instance type | Medium | Use EC2 On-Demand Capacity Reservations (ODCRs) for your Celeborn worker instance family in the target AZ. |
| AZ-level failure (power, networking) | High | Masters across 3 AZs survive. Workers and in-flight shuffle data lost. Jobs fail and recompute. Plan for it in SLAs. |
| Single worker node failure | Medium | Enable partition replication. Reads succeed from replica. |
| Scheduled host maintenance / instance retirement | Low | EBS-backed workers: volume survives retirement, reattaches to replacement node. NVMe: data lost. |
| Extended AZ degradation (>weeks) | Medium | Monitor AWS Health API. Have a runbook to shift target AZ if needed. |
Benchmarks — Data on EKS TPC-DS 3TB
Results from the Data on EKS TPC-DS 3TB benchmark on Amazon EKS (EKS 1.33, r6g.8xlarge workers).
Overall result: Celeborn showed +16% overall runtime vs native Spark shuffle on TPC-DS 3TB. Performance is highly query-dependent — large shuffle queries improved, small shuffle queries regressed significantly.
Top 5 Query Improvements with Celeborn
Top 5 Query Regressions with Celeborn
This single parameter had the largest impact on stability. With it enabled, the worst-performing query regressed by ~240%. Setting it to false reduced that to ~40%. Always disable this when using Celeborn.
Monitoring & Key Metrics
Use the community-maintained Grafana dashboards from the Celeborn GitHub repo (assets/grafana/).
| Metric | Alert Threshold | What it means |
|---|---|---|
| Worker slot utilization | Alert at >80% | Approaching capacity. Scale workers before jobs fail with allocation errors. |
| Active shuffle count | Baseline + watch | Number of in-progress shuffle operations. Spike = job surge incoming. |
| Push data latency (p99) | Alert at >100ms | Network congestion or worker overload. |
| Fetch data latency (p99) | Alert at >200ms | Reducer read latency. High values indicate storage bottleneck. |
| Lost worker count | Alert at >0 | Workers that missed heartbeat. Immediate action required. |
| Excluded worker count | Alert at >0 | Workers isolated by Master due to disk or health issues. |
| Worker disk utilization | Alert at >85% | Worker will be excluded by Master when disks fill. |
| Netty direct memory | Alert at >80% of OFFHEAP | Off-heap push buffer pressure. Increase CELEBORN_WORKER_OFFHEAP_MEMORY. |
Spark Integration Configuration
# ── Core shuffle manager ───────────────────────────────────── spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager spark.shuffle.service.enabled false spark.serializer org.apache.spark.serializer.KryoSerializer # ── Master endpoints (all 3 for HA) ───────────────────────── spark.celeborn.master.endpoints celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local:9097,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local:9097,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local:9097 # ── AQE — critical settings ────────────────────────────────── spark.sql.adaptive.enabled true spark.sql.adaptive.localShuffleReader.enabled false # CRITICAL — see benchmark note spark.sql.adaptive.skewJoin.enabled true # ── Dynamic allocation ─────────────────────────────────────── spark.dynamicAllocation.enabled true spark.dynamicAllocation.shuffleTracking.enabled false # must be false with Celeborn spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO # Spark >= 3.5 # ── Writer mode ────────────────────────────────────────────── spark.celeborn.client.spark.shuffle.writer hash # hash or sort — hash uses more memory # ── Replication ────────────────────────────────────────────── spark.celeborn.client.push.replicate.enabled true # false if using HDFS as DFS tier # ── Timeouts ───────────────────────────────────────────────── spark.celeborn.client.push.stageEnd.timeout 120s spark.network.timeout 600s
Production Learnings
Distilled from Pinterest (Moka on EKS), Data on EKS benchmarks, and large-scale Alibaba deployments.
Pinterest: Celeborn solved noisy-neighbor shuffle issues from YARN ESS
Their previous Hadoop External Shuffle Service caused disk contention and shuffle timeouts. Celeborn's dedicated worker fleet eliminated this class of failure entirely and gave them 5% average job speedup at scale.
Thread count defaults are production anti-patterns
The defaults (8 push threads, 8 fetch threads) were tuned for small single-machine deployments. On a 32-core worker with 2 EBS disks, you need 32 push threads and 32 fetch threads minimum. Failing to tune this leaves 90% of your compute idle during shuffle operations.
Off-heap memory is the most common bottleneck
CELEBORN_WORKER_OFFHEAP_MEMORY defaults to 2g. At peak push load across dozens of concurrent jobs, this saturates immediately. Allocate at least 50% of available instance memory as off-heap.
Do not use Celeborn as a universal performance booster
Celeborn adds ~16% overhead on mixed workloads. Its real value is operational stability for large shuffle jobs, enabling true dynamic allocation, and decoupling storage from compute.
Size for peak, not average
Monitor peak slot utilization over a 2-week window and add 30% headroom. Proactive scaling beats reactive scaling for a stateful shuffle service.
EBS beats NVMe for most production teams
The 5.7% performance difference is rarely worth the operational complexity of NVMe instance store. Use EBS GP3 with 2+ volumes per worker and size the IOPS correctly.
| Component | Decision | Rationale |
|---|---|---|
| Celeborn Masters | 3 replicas, span 3 AZs | Raft quorum, control plane HA. Transparent to jobs on leader failover. |
| Celeborn Workers | Dedicated nodegroup, same AZ as Executors | No noisy neighbors. Co-location minimizes shuffle network latency. |
| Storage | EBS GP3 (2+ volumes per worker) | Operational safety. Volumes survive node events. |
| Replication | Enabled for NVMe, optional for EBS | NVMe nodes are ephemeral. EBS volumes persist. |
| Instance type | On-demand only, ODCRs in target AZ | No spot. ODCR pre-reserves capacity. |
| Disruption | karpenter.sh/do-not-disrupt + preStop hook | Prevent involuntary kills. Always decommission before termination. |
| DFS tier | HDFS (stable) or large EBS (pragmatic) | S3 tier not production-ready yet. |
| Spark AQE | localShuffleReader=false | Most impactful single setting — prevents severe query regressions. |