Source code for sparksneeze.core

"""Core functionality for the sparksneeze library."""

from typing import Union, Optional
from pyspark.sql import DataFrame, SparkSession

from .strategy import BaseStrategy, SparkSneezeResult
from .data_sources import DataSource, create_data_source
from .data_targets import DataTarget, create_data_target
from .logging import get_logger
from .spark_utils import create_spark_session_with_delta


[docs] class SparkSneezeRunner: """Main runner class for sparksneeze operations. The SparkSneezeRunner orchestrates the execution of data warehouse transformations using the strategy pattern. It handles source/target abstraction, Spark session management, and error handling. Examples: Basic usage with DataFrame source: >>> from pyspark.sql import SparkSession >>> from sparksneeze import SparkSneezeRunner >>> from sparksneeze.strategy import DropCreate >>> >>> spark = SparkSession.builder.getOrCreate() >>> df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) >>> runner = SparkSneezeRunner(df, "/path/to/target", DropCreate()) >>> result = runner.run(spark) Using file paths: >>> runner = SparkSneezeRunner( ... "/path/to/source.parquet", ... "/path/to/target.delta", ... DropCreate() ... ) >>> result = runner.run() # Auto-creates Spark session With custom strategy configuration: >>> from sparksneeze.strategy import Append >>> strategy = Append(auto_expand=True, auto_shrink=False) >>> runner = SparkSneezeRunner("source_table", "target_table", strategy) >>> result = runner.run() """
[docs] def __init__( self, source_entity: Union[DataFrame, str, DataSource], target_entity: Union[str, DataTarget], strategy: BaseStrategy, ): """Initialize the runner with source, target, and strategy. Args: source_entity: Source data entity. Can be: - DataFrame: Direct Spark DataFrame - str: File path (e.g., "/path/to/file.parquet") or table name - DataSource: Custom data source implementation target_entity: Target data entity. Can be: - str: File path (e.g., "/path/to/target.delta") or table name - DataTarget: Custom data target implementation strategy: Strategy instance to execute (DropCreate, Append, Upsert, etc.) Raises: TypeError: If invalid types are provided for source_entity, target_entity, or strategy Examples: DataFrame to file: >>> df = spark.createDataFrame([(1, "test")], ["id", "value"]) >>> runner = SparkSneezeRunner(df, "/output/data.delta", DropCreate()) File to file: >>> runner = SparkSneezeRunner("/input/data.csv", "/output/data.parquet", Append()) Table to table: >>> runner = SparkSneezeRunner("source_db.table", "target_db.table", Upsert(keys=["id"])) """ self.source_entity = source_entity self.target_entity = target_entity self.strategy = strategy self.logger = get_logger(f"{__name__}.SparkSneezeRunner")
[docs] def run(self, spark_session: Optional[SparkSession] = None) -> SparkSneezeResult: """Execute the strategy on the configured source and target. This method orchestrates the complete data transformation pipeline: 1. Creates or uses provided Spark session 2. Resolves source and target entities to concrete data abstractions 3. Executes the configured strategy 4. Returns detailed results with metadata Args: spark_session: Optional Spark session. If not provided, creates a local session with Delta Lake support automatically. Returns: SparkSneezeResult: Comprehensive result object containing: - success: Boolean indicating execution success - records_processed: Number of records processed - execution_time: Time taken for execution - metadata: Strategy-specific metadata - target_schema: Final target schema after execution Raises: RuntimeError: If strategy execution fails or Spark session creation fails TypeError: If source/target entities cannot be resolved Examples: Basic execution: >>> result = runner.run() >>> print(f"Processed {result.records_processed} records in {result.execution_time}s") With existing Spark session: >>> spark = SparkSession.builder.appName("MyApp").getOrCreate() >>> result = runner.run(spark_session=spark) >>> if result.success: ... print("Strategy executed successfully") Error handling: >>> try: ... result = runner.run() ... except RuntimeError as e: ... print(f"Execution failed: {e}") """ strategy_name = self.strategy.__class__.__name__ try: # Create Spark session if not provided if spark_session is None: spark_session = self._create_default_spark_session() # Create data source and target with spark session source = create_data_source(self.source_entity, spark_session) target = create_data_target(self.target_entity, spark_session) # Execute strategy with proper data source and target abstractions result = self.strategy.execute(source, target) return result except RuntimeError as e: self.logger.error(f"{strategy_name} failed: {str(e)}") raise except Exception as e: self.logger.error(f"Unexpected error in {strategy_name}: {str(e)}") # Wrap unexpected errors raise RuntimeError( f"Unexpected error during {strategy_name} execution: {str(e)}" ) from e
def _create_default_spark_session(self) -> SparkSession: """Create a default Spark session for local execution with Delta support.""" try: session = create_spark_session_with_delta( app_name="SparkSneeze", master="local[*]" ) return session except Exception as e: self.logger.error(f"Failed to create Spark session: {str(e)}") raise RuntimeError(f"Failed to create Spark session: {str(e)}")
[docs] def sparksneeze( source_entity: Union[DataFrame, str, DataSource], target_entity: Union[str, DataTarget], strategy: BaseStrategy, ) -> SparkSneezeRunner: """Create a sparksneeze runner instance. Args: source_entity: Source data entity (DataFrame, path, table name, or DataSource) target_entity: Target data entity (path, table name, or DataTarget) strategy: Strategy instance to execute Returns: SparkSneezeRunner: Runner instance ready to execute Examples: >>> from sparksneeze import sparksneeze >>> from sparksneeze.strategy import DropCreate >>> >>> # Using DataFrame >>> runner = sparksneeze(my_df, "target_table", DropCreate()) >>> result = runner.run() >>> >>> # Using file paths >>> runner = sparksneeze("source.parquet", "target.delta", DropCreate()) >>> result = runner.run() >>> >>> # Using table names >>> runner = sparksneeze("source_table", "target_table", DropCreate()) >>> result = runner.run() """ return SparkSneezeRunner(source_entity, target_entity, strategy)