Spaces:
Running
Running
# app.py | |
import os | |
import json | |
import torch | |
import pandas as pd | |
import gradio as gr | |
from sqlalchemy import create_engine, text | |
from transformers import ( | |
TrainingArguments, | |
Trainer, | |
AutoModelForCausalLM, | |
AutoTokenizer, | |
DataCollatorForLanguageModeling | |
) | |
from datasets import Dataset | |
from peft import ( | |
prepare_model_for_kbit_training, | |
LoraConfig, | |
get_peft_model | |
) | |
from datetime import datetime | |
# Changed to a model that doesn't require flash-attention | |
MODEL_NAME = "deepseek-ai/deepseek-coder-6.7b-base" | |
OUTPUT_DIR = "/tmp/finetuned_models" | |
LOGS_DIR = "/tmp/training_logs" | |
class TrainingInterface: | |
def __init__(self): | |
self.current_status = "Idle" | |
self.progress = 0 | |
self.is_training = False | |
def get_database_url(self): | |
database_url = os.environ.get('DATABASE_URL') | |
if not database_url: | |
raise Exception("DATABASE_URL not found in environment variables") | |
return database_url | |
def fetch_training_data(self, progress=gr.Progress()): | |
try: | |
database_url = self.get_database_url() | |
engine = create_engine(database_url) | |
progress(0, desc="Connecting to database...") | |
with engine.connect() as conn: | |
result = conn.execute(text("SELECT COUNT(*) FROM bents")) | |
total_rows = result.scalar() | |
query = text("SELECT chunk_id, text FROM bents") | |
df = pd.read_sql_query(query, conn) | |
progress(0.5, desc="Data fetched successfully") | |
return df | |
except Exception as e: | |
raise gr.Error(f"Database error: {str(e)}") | |
def prepare_training_data(self, df, progress=gr.Progress()): | |
formatted_data = [] | |
try: | |
total_rows = len(df) | |
for idx, row in enumerate(df.iterrows()): | |
progress(idx/total_rows, desc="Preparing training data...") | |
_, row_data = row | |
chunk_id = str(row_data['chunk_id']).strip() | |
text = str(row_data['text']).strip() | |
if chunk_id and text: | |
formatted_text = f"Question: {chunk_id}\nAnswer: {text}" # Changed format for deepseek-coder | |
formatted_data.append({"text": formatted_text}) | |
if not formatted_data: | |
raise ValueError("No valid training data found") | |
return formatted_data | |
except Exception as e: | |
raise gr.Error(f"Data preparation error: {str(e)}") | |
def stop_training(self): | |
self.is_training = False | |
return "Training stopped by user." | |
def train_model( | |
self, | |
learning_rate=2e-4, | |
num_epochs=3, | |
batch_size=4, | |
progress=gr.Progress() | |
): | |
try: | |
self.is_training = True | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
specific_output_dir = os.path.join(OUTPUT_DIR, f"run_{timestamp}") | |
os.makedirs(specific_output_dir, exist_ok=True) | |
os.makedirs(LOGS_DIR, exist_ok=True) | |
progress(0.1, desc="Fetching data...") | |
if not self.is_training: | |
return "Training cancelled." | |
df = self.fetch_training_data() | |
formatted_data = self.prepare_training_data(df) | |
progress(0.2, desc="Loading model...") | |
if not self.is_training: | |
return "Training cancelled." | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
torch_dtype=torch.float16, | |
load_in_8bit=True, | |
device_map="auto" | |
) | |
progress(0.3, desc="Setting up LoRA...") | |
if not self.is_training: | |
return "Training cancelled." | |
# Updated LoRA config for deepseek-coder model | |
lora_config = LoraConfig( | |
r=16, | |
lora_alpha=32, | |
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], | |
lora_dropout=0.05, | |
bias="none", | |
task_type="CAUSAL_LM" | |
) | |
model = prepare_model_for_kbit_training(model) | |
model = get_peft_model(model, lora_config) | |
progress(0.4, desc="Configuring training...") | |
if not self.is_training: | |
return "Training cancelled." | |
training_args = TrainingArguments( | |
output_dir=specific_output_dir, | |
num_train_epochs=num_epochs, | |
per_device_train_batch_size=batch_size, | |
learning_rate=learning_rate, | |
fp16=True, | |
gradient_accumulation_steps=8, | |
gradient_checkpointing=True, | |
logging_dir=os.path.join(LOGS_DIR, f"run_{timestamp}"), | |
logging_steps=10, | |
save_strategy="epoch", | |
evaluation_strategy="no", # Changed to "no" since we don't have eval data | |
save_total_limit=2, | |
remove_unused_columns=False, | |
) | |
dataset = Dataset.from_dict({ | |
'text': [item['text'] for item in formatted_data] | |
}) | |
data_collator = DataCollatorForLanguageModeling( | |
tokenizer=tokenizer, | |
mlm=False | |
) | |
class ProgressCallback(gr.Progress): | |
def __init__(self, progress_callback, training_interface): | |
self.progress_callback = progress_callback | |
self.training_interface = training_interface | |
def on_train_begin(self, args, state, control, **kwargs): | |
if not self.training_interface.is_training: | |
control.should_training_stop = True | |
self.progress_callback(0.5, desc="Training started...") | |
def on_epoch_begin(self, args, state, control, **kwargs): | |
if not self.training_interface.is_training: | |
control.should_training_stop = True | |
epoch_progress = (state.epoch / args.num_train_epochs) | |
total_progress = 0.5 + (epoch_progress * 0.4) | |
self.progress_callback(total_progress, | |
desc=f"Training epoch {state.epoch + 1}/{args.num_train_epochs}...") | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=dataset, | |
data_collator=data_collator, | |
callbacks=[ProgressCallback(progress, self)] | |
) | |
if not self.is_training: | |
return "Training cancelled." | |
trainer.train() | |
if not self.is_training: | |
return "Training cancelled." | |
progress(0.9, desc="Saving model...") | |
trainer.save_model() | |
tokenizer.save_pretrained(specific_output_dir) | |
progress(1.0, desc="Training completed!") | |
return f"Training completed! Model saved in {specific_output_dir}" | |
except Exception as e: | |
self.is_training = False | |
raise gr.Error(f"Training error: {str(e)}") | |
def create_training_interface(): | |
interface = TrainingInterface() | |
with gr.Blocks(title="DeepSeek Coder Training Interface") as app: | |
gr.Markdown("# DeepSeek Coder Fine-tuning Interface") | |
with gr.Row(): | |
with gr.Column(): | |
learning_rate = gr.Slider( | |
minimum=1e-5, | |
maximum=1e-3, | |
value=2e-4, | |
label="Learning Rate" | |
) | |
num_epochs = gr.Slider( | |
minimum=1, | |
maximum=10, | |
value=3, | |
step=1, | |
label="Number of Epochs" | |
) | |
batch_size = gr.Slider( | |
minimum=1, | |
maximum=8, | |
value=4, | |
step=1, | |
label="Batch Size" | |
) | |
with gr.Row(): | |
train_button = gr.Button("Start Training", variant="primary") | |
stop_button = gr.Button("Stop Training", variant="secondary") | |
output_text = gr.Textbox( | |
label="Training Status", | |
placeholder="Training status will appear here...", | |
lines=10 | |
) | |
train_button.click( | |
fn=interface.train_model, | |
inputs=[learning_rate, num_epochs, batch_size], | |
outputs=output_text | |
) | |
stop_button.click( | |
fn=interface.stop_training, | |
inputs=[], | |
outputs=output_text | |
) | |
return app | |
if __name__ == "__main__": | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
os.makedirs(LOGS_DIR, exist_ok=True) | |
app = create_training_interface() | |
app.launch() |