bardi.pipeline

bardi.pipeline module

Defines a pipeline and a framework for steps to run in it

class bardi.pipeline.DataWriteConfig[source]

Bases: TypedDict

data_format: str
data_format_args: dict | None
class bardi.pipeline.Pipeline(dataset: Dataset = None, write_path: str = None, write_outputs: Literal['pipeline-outputs', 'debug', False] = 'pipeline-outputs', data_write_config: DataWriteConfig = None, data_filename: str = 'bardi_processed_data')[source]

Bases: object

Organize Steps into a pipeline to operate on data from a Dataset

dataset

A bardi dataset object with data to pre-process.

Type:

bardi.data.data_handlers.Dataset

write_path

Directory in the file system to write outputs to.

Type:

str

write_outputs
Configuration for which outputs to write.
  • ‘pipeline-outputs’ will write all artifacts and final data.

  • ‘debug’ will write all artifacts and data from each step.

  • False will not write any files.

Type:

Literal[“pipeline-outputs”, “debug”, False]

data_write_config

Supply a custom write configuration specifying file types. Default will save data as parquet files.

Type:

DataWriteConfig

data_filename

Supply a filename for the final output data.

Type:

str

add_step(step: Step) None[source]

Adds a step object to the list of pipeline execution steps.

Also overwrites the step’s data write configuration with a consistent pipeline data write configuration.

Parameters:

step (bardi.pipeline.Step) – A bardi Step object to be added to the list of pipeline execution steps.

get_parameters(condensed: bool = True) dict[source]

Returns the parameters of the pipeline’s dataset and parameters of each step.

Parameters:

condensed (bool) – If True, the step configuration dictionary will exclude any attributes set to None or False.

Returns:

Dictionary of parameters used to configure each pipeline step object.

Return type:

dict

run_pipeline() None[source]

Calls the run and write_outputs method for each respective step object added to the pipeline.

class bardi.pipeline.Step[source]

Bases: object

Blueprint for creating new steps for data pre-processing pipelines

get_parameters() dict[source]

Default implementation of the get_parameters method.

Called by the pipeline’s get_parameters method. Implement a custom method if needed, such as removing large items from the dictionary.

Returns:

A dictionary copy of the object’s configuration.

Return type:

dict

abstract run(data: Table, artifacts: dict) Tuple[Table | None, dict | None][source]

Implement a run method in the step that will be called by the pipeline’s run_pipeline method.

Parameters:
  • data (PyArrow.Table) – Expect to receive a PyArrow table of data

  • artifacts (dict) – Expect to receive a dict of artifacts from preceding steps, but artifacts can be ignored in the method if values are not needed from it

Returns:

If the method performs a transformation of data then return the data as a PyArrow table. This will replace the pipeline object’s processed_data attribute. If the method creates new artifacts besides the data, return a dictionary with keys corresponding to the name of the artifact and the values being the artifact’s data or value

Return type:

Tuple[Union[pa.Table, None], Union[dict, None]]

set_write_config(data_config: DataWriteConfig) None[source]

Default implementation of the set_write_config method.

Provide a method to customize the write configuration used by the step’s write_outputs method if desired.

Parameters:

data_config (DataWriteConfig) – A typed dictionary defining the data_format (i.e., parquet, csv, etc.) and any particular data_format_args available in the pyarrow API.

write_artifacts(write_path: str, artifacts: dict | None)[source]

Default implementation of the write_artifacts method.

Since only some steps require this method, the default behavior is ‘pass’.

Implement a custom method to write the specific artifacts produced in the step.

Parameters:
  • write_path (str) – A directory where a file should be written.

  • artifacts (Union[dict, None]) – A dictionary of artifacts from the step that should be written to a file

write_data(write_path: str, data: Table | None, data_filename: str | None = None) None[source]

Default implementation of the write_data method.

Reuse existing write patterns that exist in the data handlers.

Parameters:
  • write_path (str) – A directory where a file will be written.

  • data (Union[pa.Table, None]) – PyArrow Table of data.

  • data_filename (Union[str, None]) – Overwrite default file name (no filetype extension).