File size: 7,516 Bytes
d660b02
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
from datetime import datetime as dt
from pathlib import Path

import click
from loguru import logger

from llm_engineering import settings
from pipelines import (
    digital_data_etl,
    end_to_end_data,
    evaluating,
    export_artifact_to_json,
    feature_engineering,
    generate_datasets,
    training,
)
from clearml import PipelineDecorator

import yaml
from pathlib import Path

def parse_yaml_config(config_path):
    """Parse YAML config file."""
    with open(config_path, 'r') as file:
        config = yaml.safe_load(file)
    return config

@click.command(

    help="""

LLM Engineering project CLI v0.0.1. 



Main entry point for the pipeline execution. 

This entrypoint is where everything comes together.



Run the ZenML LLM Engineering project pipelines with various options.



Run a pipeline with the required parameters. This executes

all steps in the pipeline in the correct order using the orchestrator

stack component that is configured in your active ZenML stack.



Examples:



  \b

  # Run the pipeline with default options

  python run.py

               

  \b

  # Run the pipeline without cache

  python run.py --no-cache

  

  \b

  # Run only the ETL pipeline

  python run.py --only-etl



"""

)
@click.option(

    "--no-cache",

    is_flag=True,

    default=False,

    help="Disable caching for the pipeline run.",

)
@click.option(

    "--run-end-to-end-data",

    is_flag=True,

    default=False,

    help="Whether to run all the data pipelines in one go.",

)
@click.option(

    "--run-etl",

    is_flag=True,

    default=False,

    help="Whether to run the ETL pipeline.",

)
@click.option(

    "--run-export-artifact-to-json",

    is_flag=True,

    default=False,

    help="Whether to run the Artifact -> JSON pipeline",

)
@click.option(

    "--etl-config-filename",

    default="digital_data_etl_paul_iusztin.yaml",

    help="Filename of the ETL config file.",

)
@click.option(

    "--run-feature-engineering",

    is_flag=True,

    default=False,

    help="Whether to run the FE pipeline.",

)
@click.option(

    "--run-generate-instruct-datasets",

    is_flag=True,

    default=False,

    help="Whether to run the instruct dataset generation pipeline.",

)
@click.option(

    "--run-generate-preference-datasets",

    is_flag=True,

    default=False,

    help="Whether to run the preference dataset generation pipeline.",

)
@click.option(

    "--run-training",

    is_flag=True,

    default=False,

    help="Whether to run the training pipeline.",

)
@click.option(

    "--run-evaluation",

    is_flag=True,

    default=False,

    help="Whether to run the evaluation pipeline.",

)
@click.option(

    "--export-settings",

    is_flag=True,

    default=False,

    help="Whether to export your settings to ZenML or not.",

)
def main(

    no_cache: bool = False,

    run_end_to_end_data: bool = False,

    run_etl: bool = False,

    etl_config_filename: str = "digital_data_etl_cs370.yaml",

    run_export_artifact_to_json: bool = False,

    run_feature_engineering: bool = False,

    run_generate_instruct_datasets: bool = False,

    run_generate_preference_datasets: bool = False,

    run_training: bool = False,

    run_evaluation: bool = False,

    export_settings: bool = False,

) -> None:
    assert (
        run_end_to_end_data
        or run_etl
        or run_export_artifact_to_json
        or run_feature_engineering
        or run_generate_instruct_datasets
        or run_generate_preference_datasets
        or run_training
        or run_evaluation
        or export_settings
    ), "Please specify an action to run."

    if export_settings:
        logger.info("Exporting settings to ZenML secrets.")
        settings.export()

    pipeline_args = {
        "enable_cache": not no_cache,
    }
    root_dir = Path(__file__).resolve().parent.parent
    PipelineDecorator.run_locally()

    if run_end_to_end_data:
        run_args_end_to_end = {}
        pipeline_args["config_path"] = root_dir / "configs" / "end_to_end_data.yaml"
        assert pipeline_args["config_path"].exists(), f"Config file not found: {pipeline_args['config_path']}"
        pipeline_args["run_name"] = f"end_to_end_data_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
        run_args_end_to_end = parse_yaml_config(pipeline_args["config_path"])
        end_to_end_data(**run_args_end_to_end.get("parameters"))

    if run_etl:
        run_args_etl = {}
        pipeline_args["config_path"] = root_dir / "configs" / etl_config_filename
        assert pipeline_args["config_path"].exists(), f"Config file not found: {pipeline_args['config_path']}"
        pipeline_args["run_name"] = f"digital_data_etl_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
        run_args_etl = parse_yaml_config(pipeline_args["config_path"])
        digital_data_etl(**run_args_etl.get("parameters"))


    if run_export_artifact_to_json:
        run_args_etl = {}
        pipeline_args["config_path"] = root_dir / "configs" / "export_artifact_to_json.yaml"
        assert pipeline_args["config_path"].exists(), f"Config file not found: {pipeline_args['config_path']}"
        pipeline_args["run_name"] = f"export_artifact_to_json_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
        run_args_etl = parse_yaml_config(pipeline_args["config_path"])
        export_artifact_to_json(**run_args_etl.get("parameters"))

    if run_feature_engineering:
        run_args_fe = {}
        pipeline_args["config_path"] = root_dir / "configs" / "feature_engineering.yaml"
        pipeline_args["run_name"] = f"feature_engineering_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
        run_args_fe = parse_yaml_config(pipeline_args["config_path"])
        logger.warning(pipeline_args)
        logger.warning(run_args_fe)
        feature_engineering(**run_args_fe.get("parameters"))


    if run_generate_instruct_datasets:
        run_args_cd = {}
        pipeline_args["config_path"] = root_dir / "configs" / "generate_instruct_datasets.yaml"
        pipeline_args["run_name"] = f"generate_instruct_datasets_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
        run_args_cd = parse_yaml_config(pipeline_args["config_path"])
        generate_datasets(**run_args_cd.get("parameters"))

    if run_generate_preference_datasets:
        run_args_cd = {}
        pipeline_args["config_path"] = root_dir / "configs" / "generate_preference_datasets.yaml"
        pipeline_args["run_name"] = f"generate_preference_datasets_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
        run_args_cd = parse_yaml_config(pipeline_args["config_path"])
        generate_datasets(**run_args_cd.get("parameters"))

    if run_training:
        run_args_cd = {}
        pipeline_args["config_path"] = root_dir / "configs" / "training.yaml"
        pipeline_args["run_name"] = f"training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
        run_args_cd = parse_yaml_config(pipeline_args["config_path"])
        training(**run_args_cd.get("parameters"))

    if run_evaluation:
        run_args_cd = {}
        pipeline_args["config_path"] = root_dir / "configs" / "evaluating.yaml"
        pipeline_args["run_name"] = f"evaluation_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
        run_args_cd = parse_yaml_config(pipeline_args["config_path"])
        evaluating(**run_args_cd.get("parameters"))


if __name__ == "__main__":
    main()