Source code for bardi.pipeline

"""Defines a pipeline and a framework for steps to run in it"""

import os
import tracemalloc
from abc import ABCMeta, abstractmethod
from datetime import datetime
from typing import List, Literal, Tuple, TypedDict, Union

import pyarrow as pa

from bardi.data import Dataset, write_file


[docs] class DataWriteConfig(TypedDict): data_format: str data_format_args: Union[dict, None]
[docs] class Step(metaclass=ABCMeta): """Blueprint for creating new steps for data pre-processing pipelines""" @abstractmethod def __init__(self): """Constructor method""" self._data_write_config: DataWriteConfig = { "data_format": "parquet", "data_format_args": {"compression": "snappy", "use_dictionary": False}, } # Default write config
[docs] @abstractmethod def run( self, data: pa.Table, artifacts: dict, ) -> Tuple[Union[pa.Table, None], Union[dict, None]]: """Implement a run method in the step that will be called by the pipeline's run_pipeline method. Parameters ---------- data : PyArrow.Table Expect to receive a PyArrow table of data artifacts : dict Expect to receive a dict of artifacts from preceding steps, but artifacts can be ignored in the method if values are not needed from it Returns ------- Tuple[Union[pa.Table, None], Union[dict, None]] If the method performs a transformation of data then return the data as a PyArrow table. This will replace the pipeline object's processed_data attribute. If the method creates new artifacts besides the data, return a dictionary with keys corresponding to the name of the artifact and the values being the artifact's data or value """ pass
[docs] def set_write_config(self, data_config: DataWriteConfig) -> None: """Default implementation of the set_write_config method. Provide a method to customize the write configuration used by the step's write_outputs method if desired. Parameters ---------- data_config : DataWriteConfig A typed dictionary defining the data_format (i.e., parquet, csv, etc.) and any particular data_format_args available in the pyarrow API. """ self._data_write_config = data_config
[docs] def get_parameters(self) -> dict: """Default implementation of the get_parameters method. Called by the pipeline's get_parameters method. Implement a custom method if needed, such as removing large items from the dictionary. Returns ------- dict A dictionary copy of the object's configuration. """ params = vars(self).copy() return params
[docs] def write_data( self, write_path: str, data: Union[pa.Table, None], data_filename: Union[str, None] = None, ) -> None: """Default implementation of the write_data method. Reuse existing write patterns that exist in the data handlers. Parameters ---------- write_path : str A directory where a file will be written. data : Union[pa.Table, None] PyArrow Table of data. data_filename : Union[str, None] Overwrite default file name (no filetype extension). """ # Add the filename with the appropriate filetype extension to the write path if not data_filename: data_filename = f"{self.__class__.__name__}Data" file_path = os.path.join( write_path, (f'{data_filename}.{self._data_write_config["data_format"]}') ) # Call the data handler write_file function write_file( data=data, path=file_path, format=self._data_write_config["data_format"], **self._data_write_config["data_format_args"], )
[docs] def write_artifacts(self, write_path: str, artifacts: Union[dict, None]): """Default implementation of the write_artifacts method. Since only some steps require this method, the default behavior is 'pass'. Implement a custom method to write the specific artifacts produced in the step. Parameters ---------- write_path : str A directory where a file should be written. artifacts : Union[dict, None] A dictionary of artifacts from the step that should be written to a file """ pass
[docs] class Pipeline: """Organize Steps into a pipeline to operate on data from a Dataset Attributes ---------- dataset : bardi.data.data_handlers.Dataset A bardi dataset object with data to pre-process. write_path : str Directory in the file system to write outputs to. write_outputs : Literal["pipeline-outputs", "debug", False] Configuration for which outputs to write. * 'pipeline-outputs' will write all artifacts and final data. * 'debug' will write all artifacts and data from each step. * False will not write any files. data_write_config : DataWriteConfig Supply a custom write configuration specifying file types. Default will save data as parquet files. data_filename : str Supply a filename for the final output data. """ def __init__( self, dataset: Dataset = None, write_path: str = None, write_outputs: Literal["pipeline-outputs", "debug", False] = "pipeline-outputs", data_write_config: DataWriteConfig = None, data_filename: str = "bardi_processed_data", ): """Constructor Method""" # Reference a bardi dataset object that the pipeline will operate on self.dataset: Dataset = dataset # Pipeline configuration self.steps: List[Step] = [] self.num_steps = 0 self.write_outputs = write_outputs self.write_path = write_path self.data_filename = data_filename # File writing configuration if data_write_config: self.data_write_config = data_write_config else: self.data_write_config = { "data_format": "parquet", "data_format_args": {"compression": "snappy", "use_dictionary": False}, } # Default config, but can be overwritten if write_outputs: if not os.path.isdir(self.write_path): raise ValueError( "Pipeline write_outputs was configured to " "write, however a write_path was not " "correctly specified. Please provide a " "path to a directory to write files into." ) # Tracking information produced during pipeline execution self.artifacts = {} self.performance = {}
[docs] def add_step(self, step: Step) -> None: """Adds a step object to the list of pipeline execution steps. Also overwrites the step's data write configuration with a consistent pipeline data write configuration. Parameters ---------- step : bardi.pipeline.Step A bardi Step object to be added to the list of pipeline execution steps. """ if isinstance(step, Step): step.set_write_config(data_config=self.data_write_config) self.steps.append(step) self.num_steps += 1 else: raise TypeError("Only objects of type Step may be added to " "a bardi pipeline.")
[docs] def run_pipeline(self) -> None: """Calls the run and write_outputs method for each respective step object added to the pipeline. """ if isinstance(self.dataset, Dataset): self.processed_data = self.dataset.data else: raise TypeError( "Pipeline's Dataset must reference a bardi " "Dataset object. Please utilize bardi's data_" "handlers to create a bardi Dataset object with " "your data." ) # For each step of the pipeline, call its run method pipeline_start_time = datetime.now() for step_position, step in enumerate(self.steps, start=1): # Call the run method and time the execution step_start_time = datetime.now() if self.write_outputs == "debug": tracemalloc.start() results = step.run(data=self.processed_data, artifacts=self.artifacts) if self.write_outputs == "debug": step_max_mem = tracemalloc.get_traced_memory()[1] / 1000000 tracemalloc.stop() step_end_time = datetime.now() step_run_time = step_end_time - step_start_time # Record the performance self.performance[str(type(step))] = { "time": str(step_run_time), } if self.write_outputs == "debug": self.performance[str(type(step))]["memory (MB)"] = str(step_max_mem) print(f"{str(type(step))} run time: {step_run_time}") print(f"{str(type(step))} max memory (MB): {step_max_mem}") if isinstance(results, tuple): if isinstance(results[0], pa.Table): # Set the pipeline's processed_data attribute to the newest result self.processed_data = results[0] # Write the step's data output to a file if pipeline is configured # to do so if self.write_outputs == "debug": step.write_data(write_path=self.write_path, data=self.processed_data) elif ( step_position == self.num_steps and self.write_outputs == "pipeline-outputs" ): step.write_data( write_path=self.write_path, data=self.processed_data, data_filename=self.data_filename, ) if isinstance(results[1], dict): # If artifacts were returned by the step, add them to # the pipeline's total set of artifacts self.artifacts = {**self.artifacts, **results[1]} # Write the step's artifacts to files if pipeline is configured # to do so if self.write_outputs: step.write_artifacts(write_path=self.write_path, artifacts=self.artifacts) else: raise TypeError( "Pipeline expected step to return a tuple of " "PyArrow Table of data and a dictionary of " "artifacts. If the step doesn't return one of " "these, that position in the tuple can be " "empty, but it still needs to return a tuple." ) # Record the total pipeline performance pipeline_end_time = datetime.now() pipeline_run_time = pipeline_end_time - pipeline_start_time if self.write_outputs == "debug": print(f"Pipeline run time: {pipeline_run_time}") self.performance[str(type(self))] = str(pipeline_run_time)
[docs] def get_parameters(self, condensed: bool = True) -> dict: """Returns the parameters of the pipeline's dataset and parameters of each step. Parameters ---------- condensed : bool If True, the step configuration dictionary will exclude any attributes set to None or False. Returns ------- dict Dictionary of parameters used to configure each pipeline step object. """ pipeline_params = {"dataset": {}, "steps": {}, "performance": {}} # Get parameters for the pipeline's dataset if self.dataset: dataset_params = self.dataset.get_parameters() if condensed: condensed_params = {} for attribute in dataset_params.keys(): if dataset_params[attribute] not in [False, None]: condensed_params[attribute] = dataset_params[attribute] pipeline_params["dataset"][str(type(self.dataset))] = condensed_params else: pipeline_params["dataset"][str(type(self.dataset))] = dataset_params # Get parameters for each step in the pipeline for i, step in enumerate(self.steps): step_params = step.get_parameters() if condensed: condensed_params = {} for attribute in step_params.keys(): if step_params[attribute] not in [False, None]: condensed_params[attribute] = step_params[attribute] pipeline_params["steps"][str(type(step))] = condensed_params else: pipeline_params["steps"][str(type(step))] = step_params # Get the pipeline performance pipeline_params["performance"] = self.performance return pipeline_params