API Reference
Core Module
Core functionality for the sparksneeze library.
- class sparksneeze.core.SparkSneezeRunner(source_entity, target_entity, strategy)[source]
Bases:
objectMain runner class for sparksneeze operations.
The SparkSneezeRunner orchestrates the execution of data warehouse transformations using the strategy pattern. It handles source/target abstraction, Spark session management, and error handling.
Examples
Basic usage with DataFrame source:
>>> from pyspark.sql import SparkSession >>> from sparksneeze import SparkSneezeRunner >>> from sparksneeze.strategy import DropCreate >>> >>> spark = SparkSession.builder.getOrCreate() >>> df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) >>> runner = SparkSneezeRunner(df, "/path/to/target", DropCreate()) >>> result = runner.run(spark)
Using file paths:
>>> runner = SparkSneezeRunner( ... "/path/to/source.parquet", ... "/path/to/target.delta", ... DropCreate() ... ) >>> result = runner.run() # Auto-creates Spark session
With custom strategy configuration:
>>> from sparksneeze.strategy import Append >>> strategy = Append(auto_expand=True, auto_shrink=False) >>> runner = SparkSneezeRunner("source_table", "target_table", strategy) >>> result = runner.run()
- __init__(source_entity, target_entity, strategy)[source]
Initialize the runner with source, target, and strategy.
- Parameters:
source_entity (DataFrame | str | DataSource) – Source data entity. Can be: - DataFrame: Direct Spark DataFrame - str: File path (e.g., “/path/to/file.parquet”) or table name - DataSource: Custom data source implementation
target_entity (str | DataTarget) – Target data entity. Can be: - str: File path (e.g., “/path/to/target.delta”) or table name - DataTarget: Custom data target implementation
strategy (BaseStrategy) – Strategy instance to execute (DropCreate, Append, Upsert, etc.)
- Raises:
TypeError – If invalid types are provided for source_entity, target_entity, or strategy
Examples
DataFrame to file:
>>> df = spark.createDataFrame([(1, "test")], ["id", "value"]) >>> runner = SparkSneezeRunner(df, "/output/data.delta", DropCreate())
File to file:
>>> runner = SparkSneezeRunner("/input/data.csv", "/output/data.parquet", Append())
Table to table:
>>> runner = SparkSneezeRunner("source_db.table", "target_db.table", Upsert(keys=["id"]))
- run(spark_session=None)[source]
Execute the strategy on the configured source and target.
This method orchestrates the complete data transformation pipeline: 1. Creates or uses provided Spark session 2. Resolves source and target entities to concrete data abstractions 3. Executes the configured strategy 4. Returns detailed results with metadata
- Parameters:
spark_session (SparkSession | None) – Optional Spark session. If not provided, creates a local session with Delta Lake support automatically.
- Returns:
- Comprehensive result object containing:
success: Boolean indicating execution success
records_processed: Number of records processed
execution_time: Time taken for execution
metadata: Strategy-specific metadata
target_schema: Final target schema after execution
- Return type:
SparkSneezeResult
- Raises:
RuntimeError – If strategy execution fails or Spark session creation fails
TypeError – If source/target entities cannot be resolved
Examples
Basic execution:
>>> result = runner.run() >>> print(f"Processed {result.records_processed} records in {result.execution_time}s")
With existing Spark session:
>>> spark = SparkSession.builder.appName("MyApp").getOrCreate() >>> result = runner.run(spark_session=spark) >>> if result.success: ... print("Strategy executed successfully")
Error handling:
>>> try: ... result = runner.run() ... except RuntimeError as e: ... print(f"Execution failed: {e}")
- sparksneeze.core.sparksneeze(source_entity, target_entity, strategy)[source]
Create a sparksneeze runner instance.
- Parameters:
- Returns:
Runner instance ready to execute
- Return type:
Examples
>>> from sparksneeze import sparksneeze >>> from sparksneeze.strategy import DropCreate >>> >>> # Using DataFrame >>> runner = sparksneeze(my_df, "target_table", DropCreate()) >>> result = runner.run() >>> >>> # Using file paths >>> runner = sparksneeze("source.parquet", "target.delta", DropCreate()) >>> result = runner.run() >>> >>> # Using table names >>> runner = sparksneeze("source_table", "target_table", DropCreate()) >>> result = runner.run()