Metadata Module

Overview

SparkSneeze automatically enriches all data with standardized metadata fields for tracking, auditing, and data lineage. This ensures consistent data governance across all strategies and operations.

Every record processed by SparkSneeze receives metadata fields that track:

  • Validity periods - When records are valid from/to

  • Active status - Whether records are currently active

  • Data fingerprinting - Hash of data columns for change detection

  • System information - Strategy, version, and processing details

Metadata Fields

All metadata fields use a configurable prefix (default: _META):

_META_valid_from (TimestampType)

Record validity start timestamp. Set to the current time when records are processed, ensuring all records in a batch have the same timestamp.

_META_valid_to (TimestampType)

Record validity end timestamp. Set to 2999-12-31 23:59:59 for active records, indicating they are currently valid.

_META_active (BooleanType)

Active record indicator. Always True for records processed by DropCreate, Truncate, Append, and Upsert strategies. The Historize strategy may set this to False for superseded records.

_META_row_hash (StringType)

Hash of data columns using Spark’s MurmurHash3 algorithm. Automatically excludes:

  • All metadata fields (_META_*)

  • Key columns (for Upsert and Historize strategies)

  • Any columns specified in hash_columns configuration

_META_system_info (StringType)

JSON containing system metadata about the processing operation:

{
  "sparksneeze_version": "0.1.4",
  "strategy": "DropCreate",
  "created_at": "2025-06-19T10:30:00Z",
  "user": "system"
}

Configuration

MetadataConfig Class

Customize metadata behavior using the MetadataConfig class:

class MetadataConfig
Parameters:
  • prefix (str) – Prefix for metadata field names (default: “_META”)

  • valid_from_field (str) – Name of valid from field (default: “valid_from”)

  • valid_to_field (str) – Name of valid to field (default: “valid_to”)

  • active_field (str) – Name of active field (default: “active”)

  • row_hash_field (str) – Name of row hash field (default: “row_hash”)

  • system_info_field (str) – Name of system info field (default: “system_info”)

  • default_valid_from (datetime) – Default valid from timestamp (default: current time)

  • default_valid_to (datetime) – Default valid to timestamp (default: 2999-12-31)

  • hash_columns (List[str]) – Specific columns to hash (default: auto-detect)

Usage Examples

Basic Usage (Default Metadata)

from sparksneeze import sparksneeze
from sparksneeze.strategy import DropCreate

# Automatic metadata with default configuration
result = sparksneeze(df, "my_table", DropCreate()).run()

# Resulting table will have columns:
# - Original data columns
# - _META_valid_from
# - _META_valid_to
# - _META_active
# - _META_row_hash
# - _META_system_info

Custom Prefix

from sparksneeze.metadata import MetadataConfig
from sparksneeze.strategy import Append

# Use custom prefix for metadata fields
config = MetadataConfig(prefix="_AUDIT")
strategy = Append(metadata_config=config)

result = sparksneeze(df, "my_table", strategy).run()

# Resulting columns: _AUDIT_valid_from, _AUDIT_valid_to, etc.

Specific Hash Columns

from sparksneeze.metadata import MetadataConfig
from sparksneeze.strategy import Upsert

# Only hash specific columns for change detection
config = MetadataConfig(
    hash_columns=["name", "email", "department"]
)

strategy = Upsert(key="employee_id", metadata_config=config)
result = sparksneeze(df, "employees", strategy).run()

# Hash will only include name, email, department
# Key column (employee_id) automatically excluded

Custom Validity Periods

from datetime import datetime
from sparksneeze.metadata import MetadataConfig
from sparksneeze.strategy import Historize

# Set specific validity period
config = MetadataConfig(
    default_valid_from=datetime(2025, 1, 1),
    default_valid_to=datetime(2025, 12, 31)
)

strategy = Historize(
    key="user_id",
    metadata_config=config,
    valid_from=datetime(2025, 6, 1)  # Override for this operation
)

result = sparksneeze(df, "user_history", strategy).run()

Strategy-Specific Behavior

DropCreate Strategy

  • All records get _META_active=True

  • _META_valid_from set to current timestamp

  • _META_valid_to set to 2999-12-31

  • Hash includes all data columns

Truncate Strategy

  • Same behavior as DropCreate

  • All records treated as new and active

Append Strategy

  • New records get _META_active=True

  • Same timestamp behavior as DropCreate

  • Hash includes all data columns

Upsert Strategy

  • All records get _META_active=True

  • Key columns automatically excluded from hash

  • Enables change detection on non-key columns

Historize Strategy

  • Uses custom valid_from and valid_to parameters

  • Key columns automatically excluded from hash

  • Active status managed by historization logic

  • Supports slowly changing dimensions (SCD Type 2)

Querying Data with Metadata

Active Records Only

SELECT * FROM my_table
WHERE _META_active = true

Current State (Latest Records)

SELECT * FROM my_table
WHERE _META_active = true
AND _META_valid_from <= current_timestamp()
AND _META_valid_to > current_timestamp()

Change Detection

-- Find records that changed between runs
SELECT a.id, a._META_row_hash as old_hash, b._META_row_hash as new_hash
FROM previous_table a
JOIN current_table b ON a.id = b.id
WHERE a._META_row_hash != b._META_row_hash

Data Lineage Tracking

-- Track which strategy processed each record
SELECT
    get_json_object(_META_system_info, '$.strategy') as strategy,
    get_json_object(_META_system_info, '$.created_at') as processed_at,
    count(*) as record_count
FROM my_table
GROUP BY 1, 2
ORDER BY processed_at DESC

Best Practices

  1. Consistent Configuration: Use the same MetadataConfig across related tables for consistency

  2. Hash Column Selection: For large tables, consider specifying hash_columns to include only business-critical fields

  3. Query Patterns: Always filter on _META_active=true when querying current data

  4. Archival Strategy: Use metadata timestamps to implement data retention policies

  5. Change Detection: Leverage _META_row_hash for efficient change detection in ETL pipelines

  6. Monitoring: Query _META_system_info to track data processing patterns and strategy usage