bardi.pipeline
bardi.pipeline module
Defines a pipeline and a framework for steps to run in it
- class bardi.pipeline.DataWriteConfig[source]
Bases:
TypedDict
- data_format: str
- data_format_args: dict | None
- class bardi.pipeline.Pipeline(dataset: Dataset = None, write_path: str = None, write_outputs: Literal['pipeline-outputs', 'debug', False] = 'pipeline-outputs', data_write_config: DataWriteConfig = None, data_filename: str = 'bardi_processed_data')[source]
Bases:
object
Organize Steps into a pipeline to operate on data from a Dataset
- dataset
A bardi dataset object with data to pre-process.
- write_path
Directory in the file system to write outputs to.
- Type:
str
- write_outputs
- Configuration for which outputs to write.
‘pipeline-outputs’ will write all artifacts and final data.
‘debug’ will write all artifacts and data from each step.
False will not write any files.
- Type:
Literal[“pipeline-outputs”, “debug”, False]
- data_write_config
Supply a custom write configuration specifying file types. Default will save data as parquet files.
- Type:
- data_filename
Supply a filename for the final output data.
- Type:
str
- add_step(step: Step) None [source]
Adds a step object to the list of pipeline execution steps.
Also overwrites the step’s data write configuration with a consistent pipeline data write configuration.
- Parameters:
step (bardi.pipeline.Step) – A bardi Step object to be added to the list of pipeline execution steps.
- get_parameters(condensed: bool = True) dict [source]
Returns the parameters of the pipeline’s dataset and parameters of each step.
- Parameters:
condensed (bool) – If True, the step configuration dictionary will exclude any attributes set to None or False.
- Returns:
Dictionary of parameters used to configure each pipeline step object.
- Return type:
dict
- class bardi.pipeline.Step[source]
Bases:
object
Blueprint for creating new steps for data pre-processing pipelines
- get_parameters() dict [source]
Default implementation of the get_parameters method.
Called by the pipeline’s get_parameters method. Implement a custom method if needed, such as removing large items from the dictionary.
- Returns:
A dictionary copy of the object’s configuration.
- Return type:
dict
- abstract run(data: Table, artifacts: dict) Tuple[Table | None, dict | None] [source]
Implement a run method in the step that will be called by the pipeline’s run_pipeline method.
- Parameters:
data (PyArrow.Table) – Expect to receive a PyArrow table of data
artifacts (dict) – Expect to receive a dict of artifacts from preceding steps, but artifacts can be ignored in the method if values are not needed from it
- Returns:
If the method performs a transformation of data then return the data as a PyArrow table. This will replace the pipeline object’s processed_data attribute. If the method creates new artifacts besides the data, return a dictionary with keys corresponding to the name of the artifact and the values being the artifact’s data or value
- Return type:
Tuple[Union[pa.Table, None], Union[dict, None]]
- set_write_config(data_config: DataWriteConfig) None [source]
Default implementation of the set_write_config method.
Provide a method to customize the write configuration used by the step’s write_outputs method if desired.
- Parameters:
data_config (DataWriteConfig) – A typed dictionary defining the data_format (i.e., parquet, csv, etc.) and any particular data_format_args available in the pyarrow API.
- write_artifacts(write_path: str, artifacts: dict | None)[source]
Default implementation of the write_artifacts method.
Since only some steps require this method, the default behavior is ‘pass’.
Implement a custom method to write the specific artifacts produced in the step.
- Parameters:
write_path (str) – A directory where a file should be written.
artifacts (Union[dict, None]) – A dictionary of artifacts from the step that should be written to a file
- write_data(write_path: str, data: Table | None, data_filename: str | None = None) None [source]
Default implementation of the write_data method.
Reuse existing write patterns that exist in the data handlers.
- Parameters:
write_path (str) – A directory where a file will be written.
data (Union[pa.Table, None]) – PyArrow Table of data.
data_filename (Union[str, None]) – Overwrite default file name (no filetype extension).