Schema Evolution

Introduction

Schema evolution is a core feature of sparksneeze that allows automatic adaptation of target schemas as source data structure changes over time. This enables robust data pipelines that can handle evolving data sources without manual intervention.

All strategies that support schema evolution provide two key parameters:

  • auto_expand (default: True) - Automatically add new columns from source to target

  • auto_shrink (default: False) - Automatically remove columns from target that don’t exist in source

How Schema Evolution Works

Schema evolution occurs during strategy execution in the following order:

  1. Source Analysis: Compare source schema with existing target schema

  2. Type Compatibility: Verify column types are compatible or can be safely cast

  3. Schema Expansion: Add missing columns to target if auto_expand=True

  4. Schema Shrinkage: Remove extra columns from target if auto_shrink=True

  5. Data Alignment: Align source data with evolved target schema

  6. Metadata Application: Apply metadata with appropriate hash calculations

Auto-Expand Behavior

When auto_expand=True (default behavior):

# Day 1: Target has columns [id, name, age]
# Day 2: Source has columns [id, name, age, email, phone]

# Result: Target schema expands to [id, name, age, email, phone]
# Existing data gets NULL values for new columns
# New data populates all columns normally

Key Points: - New columns are added to target schema - Existing records get NULL values for new columns - Existing metadata (including hashes) remains unchanged - New records get metadata calculated on full expanded schema

Auto-Shrink Behavior

When auto_shrink=True:

# Day 1: Target has columns [id, name, age, email, phone] with 100 existing records
# Day 2: Source has columns [id, name, age] (missing email, phone)

# Result: Target schema shrinks to [id, name, age]
# IMPORTANT: email and phone columns are physically DROPPED
# Data in dropped columns is permanently lost
# Existing records preserve their original hash values
# New records get hash calculated on shrunk schema

Critical Behavior:

Warning

When auto_shrink=True, data in removed columns is permanently lost. This operation cannot be undone.

Hash Calculation During Schema Evolution

One of the most important aspects of schema evolution is maintaining data integrity through proper hash calculations:

Existing Data Hash Preservation

When schema evolution occurs (expand or shrink), existing target data maintains its original hash values:

# Example: Target evolution from [id, name, age, salary] to [id, name, age]

# Existing records keep original hash: hash(id, name, age, salary)
# Even though 'salary' column is dropped, hash remains unchanged
# This preserves data lineage and audit trails

New Data Hash Calculation

New source data gets hash values calculated based on the final evolved schema:

# After schema shrink from [id, name, age, salary] to [id, name, age]

# New records get hash: hash(id, name, age)
# Hash calculation excludes dropped columns

Mixed Hash Scenarios

This creates scenarios where a single target contains records with different hash calculation bases:

# Target after auto_shrink evolution:
# - 100 existing records: hash(id, name, age, salary)
# - 50 new records: hash(id, name, age)

# This is normal and expected behavior

Strategy-Specific Evolution Behavior

Append Strategy

  • Existing target data: Completely preserved, including original metadata and hashes

  • Schema changes: Applied to target structure

  • New source data: Aligned to evolved target schema

  • Data loss: Only occurs with auto_shrink=True (columns dropped permanently)

from sparksneeze.strategy import Append

# Conservative: Only expand schema, never shrink
strategy = Append(auto_expand=True, auto_shrink=False)

# Aggressive: Allow both expansion and shrinkage
strategy = Append(auto_expand=True, auto_shrink=True)

Truncate Strategy

  • Existing target data: Completely cleared before processing

  • Schema changes: Applied to target structure

  • New source data: All data processed uniformly with evolved schema

  • Hash consistency: All records have same hash calculation base

from sparksneeze.strategy import Truncate

# All data processed with consistent schema evolution
strategy = Truncate(auto_expand=True, auto_shrink=True)

Upsert and Historize Strategies

  • Key preservation: Key columns are never dropped during auto_shrink

  • Existing data: Hash calculations follow same mixed-hash patterns as Append

  • Schema evolution: Applied before key-based operations

Data Loss Scenarios

When Data is Lost:

  1. auto_shrink=True: Columns not in source are permanently dropped

  2. Type incompatibility: Incompatible data types may cause data loss during casting

  3. Delta Lake: Column drops are irreversible (even with time travel)

Prevention Strategies:

# 1. Use conservative settings
strategy = Append(auto_expand=True, auto_shrink=False)

# 2. Backup before schema evolution
target.backup("backup_table_name")

# 3. Test schema evolution on sample data first
test_result = sparksneeze(sample_df, test_target, strategy).run()

Best Practices

  1. Start Conservative: Use auto_expand=True, auto_shrink=False initially

  2. Monitor Schema Changes: Log and review schema evolution operations

  3. Backup Critical Data: Always backup before enabling auto_shrink

  4. Test Evolution: Validate schema evolution on development data first

  5. Document Changes: Track schema evolution decisions and impacts

Example: Complete Schema Evolution Workflow

from sparksneeze import sparksneeze
from sparksneeze.strategy import Append
from sparksneeze.metadata import MetadataConfig

# Setup with schema evolution enabled
metadata_config = MetadataConfig()
strategy = Append(
    auto_expand=True,    # Allow new columns
    auto_shrink=False,   # Prevent data loss
    metadata_config=metadata_config
)

# Execute with automatic schema adaptation
result = sparksneeze(
    source_df,
    "target_table",
    strategy
).run()

if result.success:
    print(f"Schema evolution completed: {result.message}")
    # Check for any schema changes in logs
else:
    print(f"Schema evolution failed: {result.message}")

Troubleshooting Schema Evolution

Common Issues:

  1. Type Mismatch: Source and target have incompatible column types

    • Solution: Ensure compatible types or implement custom type casting

  2. Metadata Conflicts: Existing metadata conflicts with new schema

    • Solution: Use consistent MetadataConfig across operations

  3. Hash Validation Failures: Mixed hash calculations cause validation issues

    • Solution: Understand that mixed hashes are expected behavior after evolution

  4. Data Loss Surprise: Unexpected data loss from auto_shrink

    • Solution: Always backup before enabling auto_shrink operations

Debug Schema Evolution:

import logging

# Enable detailed schema evolution logging
logging.getLogger("sparksneeze.schema_evolution").setLevel(logging.DEBUG)

# Run operation with verbose output
result = sparksneeze(source_df, target, strategy).run()