import gradio as gr import os import uuid import threading import pandas as pd import torch from langchain.document_loaders import CSVLoader from langchain.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain.llms import HuggingFacePipeline from langchain.chains import LLMChain from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from langchain.prompts import PromptTemplate # Global model cache MODEL_CACHE = { "model": None, "tokenizer": None, "init_lock": threading.Lock() } # Create directories for user data os.makedirs("user_data", exist_ok=True) def initialize_model_once(): """Initialize the model once and cache it""" with MODEL_CACHE["init_lock"]: if MODEL_CACHE["model"] is None: # Use a smaller model for CPU environment model_name = "deepseek-ai/deepseek-coder-1.3b-instruct" # Load tokenizer and model with CPU-friendly configuration MODEL_CACHE["tokenizer"] = AutoTokenizer.from_pretrained(model_name) MODEL_CACHE["model"] = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float32, # Use float32 for CPU device_map="auto", low_cpu_mem_usage=True, # Optimize for low memory trust_remote_code=True ) return MODEL_CACHE["tokenizer"], MODEL_CACHE["model"] def create_llm_pipeline(): """Create a new pipeline using the cached model""" tokenizer, model = initialize_model_once() # Create a CPU-friendly pipeline pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=256, # Reduced for faster responses temperature=0.3, top_p=0.9, top_k=30, repetition_penalty=1.2, return_full_text=False, ) # Wrap pipeline in HuggingFacePipeline for LangChain compatibility return HuggingFacePipeline(pipeline=pipe) def create_conversational_chain(db, file_path): llm = create_llm_pipeline() # Load the file into pandas to enable code execution for data analysis df = pd.read_csv(file_path) # Create improved prompt template that focuses on direct answers, not code template = """ Berikut ini adalah informasi tentang file CSV: Kolom-kolom dalam file: {columns} Beberapa baris pertama: {sample_data} Konteks tambahan dari vector database: {context} Pertanyaan: {question} INSTRUKSI PENTING: 1. Jangan tampilkan kode Python, berikan jawaban langsung dalam Bahasa Indonesia. 2. Jika pertanyaan terkait statistik data (rata-rata, maksimum dll), lakukan perhitungan dan berikan hasilnya. 3. Jawaban harus singkat, jelas dan akurat berdasarkan data yang ada. 4. Gunakan format yang sesuai untuk angka (desimal 2 digit untuk nilai non-integer). 5. Jangan menyebutkan proses perhitungan, fokus pada hasil akhir. Jawaban: """ PROMPT = PromptTemplate( template=template, input_variables=["columns", "sample_data", "context", "question"] ) # Create retriever retriever = db.as_retriever(search_kwargs={"k": 3}) # Reduced k for better performance # Process query with better error handling def process_query(query, chat_history): try: # Get information from dataframe for context columns_str = ", ".join(df.columns.tolist()) sample_data = df.head(2).to_string() # Reduced to 2 rows for performance # Get context from vector database docs = retriever.get_relevant_documents(query) context = "\n\n".join([doc.page_content for doc in docs]) # Dynamically calculate answers for common statistical queries def preprocess_query(): query_lower = query.lower() result = None # Handle statistical queries directly if "rata-rata" in query_lower or "mean" in query_lower or "average" in query_lower: for col in df.columns: if col.lower() in query_lower and pd.api.types.is_numeric_dtype(df[col]): try: result = f"Rata-rata {col} adalah {df[col].mean():.2f}" except: pass elif "maksimum" in query_lower or "max" in query_lower or "tertinggi" in query_lower: for col in df.columns: if col.lower() in query_lower and pd.api.types.is_numeric_dtype(df[col]): try: result = f"Nilai maksimum {col} adalah {df[col].max():.2f}" except: pass elif "minimum" in query_lower or "min" in query_lower or "terendah" in query_lower: for col in df.columns: if col.lower() in query_lower and pd.api.types.is_numeric_dtype(df[col]): try: result = f"Nilai minimum {col} adalah {df[col].min():.2f}" except: pass elif "total" in query_lower or "jumlah" in query_lower or "sum" in query_lower: for col in df.columns: if col.lower() in query_lower and pd.api.types.is_numeric_dtype(df[col]): try: result = f"Total {col} adalah {df[col].sum():.2f}" except: pass elif "baris" in query_lower or "jumlah data" in query_lower or "row" in query_lower: result = f"Jumlah baris data adalah {len(df)}" elif "kolom" in query_lower or "field" in query_lower: if "nama" in query_lower or "list" in query_lower or "sebutkan" in query_lower: result = f"Kolom dalam data: {', '.join(df.columns.tolist())}" return result # Try direct calculation first direct_answer = preprocess_query() if direct_answer: return {"answer": direct_answer} # If no direct calculation, use the LLM chain = LLMChain(llm=llm, prompt=PROMPT) raw_result = chain.run( columns=columns_str, sample_data=sample_data, context=context, question=query ) # Clean the result cleaned_result = raw_result.strip() # If result is empty after cleaning, use a fallback if not cleaned_result: return {"answer": "Tidak dapat memproses jawaban. Silakan coba pertanyaan lain."} return {"answer": cleaned_result} except Exception as e: import traceback print(f"Error in process_query: {str(e)}") print(traceback.format_exc()) return {"answer": f"Terjadi kesalahan saat memproses pertanyaan: {str(e)}"} return process_query class ChatBot: def __init__(self, session_id): self.session_id = session_id self.chat_history = [] self.chain = None self.user_dir = f"user_data/{session_id}" self.csv_file_path = None os.makedirs(self.user_dir, exist_ok=True) def process_file(self, file): if file is None: return "Mohon upload file CSV terlebih dahulu." try: # Handle file from Gradio file_path = file.name if hasattr(file, 'name') else str(file) self.csv_file_path = file_path # Copy to user directory user_file_path = f"{self.user_dir}/uploaded.csv" # Verify the CSV can be loaded try: df = pd.read_csv(file_path) print(f"CSV verified: {df.shape[0]} rows, {len(df.columns)} columns") # Save a copy in user directory df.to_csv(user_file_path, index=False) self.csv_file_path = user_file_path except Exception as e: return f"Error membaca CSV: {str(e)}" # Load document with reduced chunk size for better memory usage try: loader = CSVLoader(file_path=file_path, encoding="utf-8", csv_args={ 'delimiter': ','}) data = loader.load() print(f"Documents loaded: {len(data)}") except Exception as e: return f"Error loading documents: {str(e)}" # Create vector database with optimized settings try: db_path = f"{self.user_dir}/db_faiss" # Use CPU-friendly embeddings with smaller dimensions embeddings = HuggingFaceEmbeddings( model_name='sentence-transformers/all-MiniLM-L6-v2', model_kwargs={'device': 'cpu'} ) db = FAISS.from_documents(data, embeddings) db.save_local(db_path) print(f"Vector database created at {db_path}") except Exception as e: return f"Error creating vector database: {str(e)}" # Create custom chain try: self.chain = create_conversational_chain(db, self.csv_file_path) print("Chain created successfully") except Exception as e: return f"Error creating chain: {str(e)}" # Add basic file info to chat history for context file_info = f"CSV berhasil dimuat dengan {df.shape[0]} baris dan {len(df.columns)} kolom. Kolom: {', '.join(df.columns.tolist())}" self.chat_history.append(("System", file_info)) return "File CSV berhasil diproses! Anda dapat mulai chat dengan model untuk analisis data." except Exception as e: import traceback print(traceback.format_exc()) return f"Error pemrosesan file: {str(e)}" def chat(self, message, history): if self.chain is None: return "Mohon upload file CSV terlebih dahulu." try: # Process the question with the chain result = self.chain(message, self.chat_history) # Get the answer with fallback answer = result.get("answer", "Maaf, tidak dapat menghasilkan jawaban. Silakan coba pertanyaan lain.") # Ensure we never return empty if not answer or answer.strip() == "": answer = "Maaf, tidak dapat menghasilkan jawaban yang sesuai. Silakan coba pertanyaan lain." # Update internal chat history self.chat_history.append((message, answer)) # Return just the answer for Gradio return answer except Exception as e: import traceback print(traceback.format_exc()) return f"Error: {str(e)}" # UI Code def create_gradio_interface(): with gr.Blocks(title="Chat with CSV using DeepSeek") as interface: session_id = gr.State(lambda: str(uuid.uuid4())) chatbot_state = gr.State(lambda: None) gr.HTML("

Chat with CSV using DeepSeek

") gr.HTML("

Asisten analisis CSV untuk berbagai kebutuhan

") with gr.Row(): with gr.Column(scale=1): file_input = gr.File( label="Upload CSV Anda", file_types=[".csv"] ) process_button = gr.Button("Proses CSV") with gr.Accordion("Informasi Model", open=False): gr.Markdown(""" **Fitur**: - Tanya jawab berbasis data - Analisis statistik otomatis - Support berbagai format CSV - Manajemen sesi per pengguna """) with gr.Column(scale=2): chatbot_interface = gr.Chatbot( label="Riwayat Chat", height=400 ) message_input = gr.Textbox( label="Ketik pesan Anda", placeholder="Tanyakan tentang data CSV Anda...", lines=2 ) submit_button = gr.Button("Kirim") clear_button = gr.Button("Bersihkan Chat") # Process file handler def handle_process_file(file, sess_id): chatbot = ChatBot(sess_id) result = chatbot.process_file(file) return chatbot, [(None, result)] process_button.click( fn=handle_process_file, inputs=[file_input, session_id], outputs=[chatbot_state, chatbot_interface] ) # Chat handlers def user_message_submitted(message, history, chatbot, sess_id): history = history + [(message, None)] return history, "", chatbot, sess_id def bot_response(history, chatbot, sess_id): if chatbot is None: chatbot = ChatBot(sess_id) history[-1] = (history[-1][0], "Mohon upload file CSV terlebih dahulu.") return chatbot, history user_message = history[-1][0] response = chatbot.chat(user_message, history[:-1]) history[-1] = (user_message, response) return chatbot, history submit_button.click( fn=user_message_submitted, inputs=[message_input, chatbot_interface, chatbot_state, session_id], outputs=[chatbot_interface, message_input, chatbot_state, session_id] ).then( fn=bot_response, inputs=[chatbot_interface, chatbot_state, session_id], outputs=[chatbot_state, chatbot_interface] ) message_input.submit( fn=user_message_submitted, inputs=[message_input, chatbot_interface, chatbot_state, session_id], outputs=[chatbot_interface, message_input, chatbot_state, session_id] ).then( fn=bot_response, inputs=[chatbot_interface, chatbot_state, session_id], outputs=[chatbot_state, chatbot_interface] ) # Clear chat handler def handle_clear_chat(chatbot): if chatbot is not None: chatbot.chat_history = [] return chatbot, [] clear_button.click( fn=handle_clear_chat, inputs=[chatbot_state], outputs=[chatbot_state, chatbot_interface] ) return interface # Launch the interface demo = create_gradio_interface() demo.launch(share=True)