Streaming CDC Pipeline Recipe
This recipe demonstrates building a complete real-time Change Data Capture (CDC) pipeline using Delta Lake and Apache Spark Structured Streaming.
Overview
Change Data Capture is essential for keeping analytical datasets synchronized with operational systems. This example shows how to:
- Ingest CDC events from various sources (Kafka, files)
- Process events with exactly-once semantics
- Apply changes to target tables using MERGE operations
- Handle schema evolution in streaming contexts
- Monitor pipeline health and performance
Architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Source DB │ │ CDC Capture │ │ Message Bus │
│ (MySQL, etc.) │───▶│ (Debezium) │───▶│ (Kafka) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ File System │ │ │ │ Streaming │
│ (JSON files) │───▶│ Spark Streaming │───▶│ Processing │
└─────────────────┘ │ │ └─────────────────┘
│ │ │
┌─────────────────┐ │ CDC Processor │ ┌─────────────────┐
│ Batch Files │───▶│ │ │ Deduplication │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Delta Lake │ │ Schema │ │ Target │
│ Tables │◀───│ Evolution │◀───│ Tables │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Change Data │ │ Monitoring │ │ Alerting │
│ Feed │◀───│ Dashboard │───▶│ System │
└─────────────────┘ └──────────────────┘ └─────────────────┘
Pipeline Flow:
- Ingestion: CDC events from Kafka topics or JSON files
- Parsing: Extract table name, operation type, before/after states
- Processing: Add metadata, handle schema evolution, deduplication
- Storage: Write to Delta tables with ACID guarantees
- Monitoring: Track pipeline health and performance metrics
Key Features Demonstrated
1. Streaming Data Ingestion
- Kafka Integration: Real-time event consumption
- File-based Sources: For testing and development
- Schema Evolution: Handle changing data structures
- Watermarking: Handle late-arriving data
2. CDC Processing Patterns
- Deduplication: Remove duplicate events
- Idempotent Processing: Handle reprocessed events
- Transaction Ordering: Maintain data consistency
- Error Handling: Graceful failure recovery
3. Target Table Updates
- MERGE Operations: Upsert/delete patterns
- Primary Key Handling: Complex key structures
- Change Data Feed: Track all modifications
- ACID Guarantees: Transactional consistency
4. Monitoring & Observability
- Pipeline Health: Query status monitoring
- Performance Metrics: Throughput and latency tracking
- Error Reporting: Comprehensive error handling
- Alerting: Automated issue detection
Running the Example
# Install dependencies
pip install -r requirements.txt
# Run the CDC pipeline demo
python solution.py
Expected Output
The script will:
- Generate sample CDC events for customers and orders
- Start streaming pipelines for both tables
- Process events in real-time
- Apply changes to target Delta tables
- Display pipeline health and final results
Key Classes
StreamingCDCProcessor
Main class managing the CDC pipeline:
start_cdc_pipeline(): Initialize all streaming queries_process_cdc_events(): Handle event processing and deduplicationapply_changes_to_target(): Apply changes via MERGE operationsmonitor_pipeline_health(): Track pipeline status
CDC Event Structure
{
"table_name": "customers",
"operation": "INSERT",
"before": null,
"after": "{\"customer_id\": \"CUST_0001\", \"name\": \"John Doe\"}",
"timestamp": "2024-01-01T10:00:00Z",
"transaction_id": "TXN_1001",
"primary_key": "{\"customer_id\": \"CUST_0001\"}"
}
Configuration
Pipeline Configuration
config = {
"checkpoint_dir": "/tmp/cdc-checkpoints",
"tables": {
"customers": {
"source_type": "kafka", # or "file"
"kafka": {
"topic": "cdc.customers",
"bootstrap_servers": "localhost:9092"
},
"target_table": "/path/to/customers",
"primary_key": ["customer_id"]
}
}
}
Source Types Supported
- Kafka: Production-ready event streaming
- File: Development and testing
- Custom: Extensible for other sources
Production Considerations
Scalability
- Partitioning: Distribute load across workers
- Batch Sizing: Optimize micro-batch intervals
- Resource Allocation: Configure appropriate memory/CPU
Reliability
- Checkpointing: Enable fault tolerance
- Idempotency: Handle duplicate processing
- Backpressure: Handle load spikes gracefully
Monitoring
- Metrics Collection: Track throughput and latency
- Health Checks: Automated pipeline validation
- Alerting: Proactive issue notification
Troubleshooting
Common Issues
- Checkpoint Conflicts: Clear checkpoints for fresh starts
- Schema Mismatches: Validate source/target schemas
- Memory Issues: Adjust batch sizes and intervals
- Network Timeouts: Configure appropriate timeouts
Recovery Procedures
- Restart from Checkpoint: Automatic recovery
- Manual Replay: Reprocess specific time ranges
- Data Repair: Fix corrupted target tables
Performance Tuning
Streaming Optimizations
# Optimize batch processing
spark.conf.set("spark.sql.streaming.fileSink.log.cleanupDelay", "86400000")
spark.conf.set("spark.sql.streaming.fileSink.log.deletion", "true")
# Memory management
spark.conf.set("spark.sql.streaming.stateStore.minDeltasForSnapshot", "10")
CDC-Specific Tuning
- Deduplication Windows: Balance accuracy vs performance
- Merge Optimization: Use partition pruning
- Caching: Cache frequently accessed data
Integration Patterns
With Kafka
# Consumer configuration
kafka_options = {
"kafka.bootstrap.servers": "broker1:9092,broker2:9092",
"kafka.group.id": "cdc-consumer",
"kafka.auto.offset.reset": "earliest",
"kafka.enable.auto.commit": "false"
}
With Delta Live Tables
- Incremental Processing: Build on CDC foundation
- Quality Enforcement: Add data validation rules
- Lineage Tracking: End-to-end data lineage
Next Steps
- Implement custom CDC sources (Debezium, etc.)
- Add data quality validation rules
- Integrate with schema registries
- Implement advanced monitoring dashboards
- Add support for complex transformations