The Challenge
At Osfin.ai, we faced a significant data engineering challenge: processing up to 250 million records daily for our dispute management system. The existing pipeline was slow, error-prone, and couldn't scale with growing data volumes.
Our goal was to achieve 3x improvement in processing speed while maintaining data quality and reducing operational overhead.
Architecture Overview
We designed a distributed ETL pipeline using Apache Spark on AWS EMR:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ S3 Raw │────▶│ Spark EMR │────▶│ S3 Clean │
│ Data │ │ Processing │ │ Data │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Redshift │
│ Warehouse │
└─────────────┘
Key Optimizations
1. Partitioning Strategy
The single most impactful optimization was proper data partitioning. We partitioned data by date and customer_id, which matched our query patterns:
df.write \
.partitionBy("date", "customer_id") \
.parquet("s3://bucket/processed/")
This reduced query times by 70% for date-range queries.
2. Broadcast Joins
For joining large transaction tables with smaller reference data (like customer metadata), we used broadcast joins:
from pyspark.sql.functions import broadcast
result = transactions.join(
broadcast(customer_metadata),
"customer_id"
)
This eliminated expensive shuffle operations for small tables under 100MB.
3. Caching Intermediate Results
# Cache frequently accessed DataFrames
validated_data = raw_data.filter(col("status").isNotNull())
validated_data.cache()
# Use for multiple downstream operations
summary = validated_data.groupBy("category").count()
details = validated_data.filter(col("amount") > 1000)
4. Optimized File Sizes
Small files are a killer for Spark performance. We coalesced output to optimal file sizes:
# Target ~128MB per file
target_size_mb = 128
current_size_mb = df.count() * avg_row_size / (1024 * 1024)
num_partitions = max(1, int(current_size_mb / target_size_mb))
df.coalesce(num_partitions).write.parquet(output_path)
5. Predicate Pushdown
When reading Parquet files, we pushed filters down to the file read level:
# Filter is pushed to Parquet reader
df = spark.read.parquet("s3://bucket/data/") \
.filter(col("date") >= "2025-01-01") \
.filter(col("status") == "ACTIVE")
Handling Data Quality
We implemented a multi-layer validation approach:
def validate_record(row):
errors = []
# Required fields
if row.customer_id is None:
errors.append("missing_customer_id")
# Data type validation
if not isinstance(row.amount, (int, float)):
errors.append("invalid_amount_type")
# Business rules
if row.amount < 0:
errors.append("negative_amount")
return errors
# Separate valid and invalid records
df_with_errors = df.withColumn(
"validation_errors",
validate_udf(struct(*df.columns))
)
valid_records = df_with_errors.filter(size("validation_errors") == 0)
invalid_records = df_with_errors.filter(size("validation_errors") > 0)
Monitoring and Alerting
We built comprehensive monitoring using CloudWatch:
- Processing time per batch: Alert if > 2x baseline
- Record count validation: Alert if deviation > 10%
- Error rate: Alert if > 1% of records fail validation
- S3 data lag: Alert if data older than 4 hours
Results
| Metric | Before | After | Improvement |
|---|---|---|---|
| Processing Time (250M records) | 6 hours | 2 hours | 3x faster |
| Cost per Run | $150 | $60 | 60% savings |
| Error Rate | 2.5% | 0.3% | 88% reduction |
| Manual Interventions/Week | 5 | 0 | Eliminated |
Lessons Learned
- Profile before optimizing: Use Spark UI to identify actual bottlenecks
- Right-size your cluster: More nodes isn't always better
- Data skew kills performance: Monitor partition sizes
- Test at scale: Optimizations that work on 1% of data may fail at 100%
- Automate everything: Manual processes don't scale