The Challenge

At Osfin.ai, we faced a significant data engineering challenge: processing up to 250 million records daily for our dispute management system. The existing pipeline was slow, error-prone, and couldn't scale with growing data volumes.

Our goal was to achieve 3x improvement in processing speed while maintaining data quality and reducing operational overhead.

Architecture Overview

We designed a distributed ETL pipeline using Apache Spark on AWS EMR:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   S3 Raw    │────▶│  Spark EMR  │────▶│  S3 Clean   │
│    Data     │     │  Processing │     │    Data     │
└─────────────┘     └─────────────┘     └─────────────┘
                           │
                           ▼
                    ┌─────────────┐
                    │  Redshift   │
                    │  Warehouse  │
                    └─────────────┘

Key Optimizations

1. Partitioning Strategy

The single most impactful optimization was proper data partitioning. We partitioned data by date and customer_id, which matched our query patterns:

df.write \
  .partitionBy("date", "customer_id") \
  .parquet("s3://bucket/processed/")

This reduced query times by 70% for date-range queries.

2. Broadcast Joins

For joining large transaction tables with smaller reference data (like customer metadata), we used broadcast joins:

from pyspark.sql.functions import broadcast

result = transactions.join(
    broadcast(customer_metadata),
    "customer_id"
)

This eliminated expensive shuffle operations for small tables under 100MB.

3. Caching Intermediate Results

# Cache frequently accessed DataFrames
validated_data = raw_data.filter(col("status").isNotNull())
validated_data.cache()

# Use for multiple downstream operations
summary = validated_data.groupBy("category").count()
details = validated_data.filter(col("amount") > 1000)

4. Optimized File Sizes

Small files are a killer for Spark performance. We coalesced output to optimal file sizes:

# Target ~128MB per file
target_size_mb = 128
current_size_mb = df.count() * avg_row_size / (1024 * 1024)
num_partitions = max(1, int(current_size_mb / target_size_mb))

df.coalesce(num_partitions).write.parquet(output_path)

5. Predicate Pushdown

When reading Parquet files, we pushed filters down to the file read level:

# Filter is pushed to Parquet reader
df = spark.read.parquet("s3://bucket/data/") \
    .filter(col("date") >= "2025-01-01") \
    .filter(col("status") == "ACTIVE")

Handling Data Quality

We implemented a multi-layer validation approach:

def validate_record(row):
    errors = []
    
    # Required fields
    if row.customer_id is None:
        errors.append("missing_customer_id")
    
    # Data type validation
    if not isinstance(row.amount, (int, float)):
        errors.append("invalid_amount_type")
    
    # Business rules
    if row.amount < 0:
        errors.append("negative_amount")
    
    return errors

# Separate valid and invalid records
df_with_errors = df.withColumn(
    "validation_errors", 
    validate_udf(struct(*df.columns))
)

valid_records = df_with_errors.filter(size("validation_errors") == 0)
invalid_records = df_with_errors.filter(size("validation_errors") > 0)

Monitoring and Alerting

We built comprehensive monitoring using CloudWatch:

  • Processing time per batch: Alert if > 2x baseline
  • Record count validation: Alert if deviation > 10%
  • Error rate: Alert if > 1% of records fail validation
  • S3 data lag: Alert if data older than 4 hours

Results

Metric Before After Improvement
Processing Time (250M records) 6 hours 2 hours 3x faster
Cost per Run $150 $60 60% savings
Error Rate 2.5% 0.3% 88% reduction
Manual Interventions/Week 5 0 Eliminated

Lessons Learned

  1. Profile before optimizing: Use Spark UI to identify actual bottlenecks
  2. Right-size your cluster: More nodes isn't always better
  3. Data skew kills performance: Monitor partition sizes
  4. Test at scale: Optimizations that work on 1% of data may fail at 100%
  5. Automate everything: Manual processes don't scale