|
''' |
|
|
|
import gradio as gr |
|
import os |
|
if os.environ.get("SPACES_ZERO_GPU") is not None: |
|
import spaces |
|
else: |
|
class spaces: |
|
@staticmethod |
|
def GPU(func): |
|
def wrapper(*args, **kwargs): |
|
return func(*args, **kwargs) |
|
return wrapper |
|
|
|
@spaces.GPU |
|
def fake_gpu(): |
|
pass |
|
|
|
# Define a function to respond to user input |
|
def respond(message, history): |
|
# Create a response based on the user's message |
|
response = "You said: " + message |
|
|
|
# Append the message and response to history |
|
history.append({"role": "user", "content": message}) |
|
history.append({"role": "assistant", "content": response}) |
|
|
|
return history |
|
|
|
# Create the Gradio interface |
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Chatbot Interface") |
|
|
|
# Initialize chatbot with the new message type |
|
chatbot_interface = gr.Chatbot(type='messages') # Specify type='messages' |
|
user_input = gr.Textbox(label="Your Message", placeholder="Type something...") |
|
submit_btn = gr.Button("Send") |
|
|
|
# Define the behavior of the submit button |
|
submit_btn.click(fn=respond, inputs=[user_input, chatbot_interface], outputs=chatbot_interface) |
|
|
|
# Launch the Gradio application |
|
demo.launch() |
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
|
|
""" |
|
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference |
|
""" |
|
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") |
|
|
|
|
|
def respond( |
|
message, |
|
history: list[tuple[str, str]], |
|
system_message, |
|
max_tokens, |
|
temperature, |
|
top_p, |
|
): |
|
messages = [{"role": "system", "content": system_message}] |
|
|
|
for val in history: |
|
if val[0]: |
|
messages.append({"role": "user", "content": val[0]}) |
|
if val[1]: |
|
messages.append({"role": "assistant", "content": val[1]}) |
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
response = "" |
|
|
|
for message in client.chat_completion( |
|
messages, |
|
max_tokens=max_tokens, |
|
stream=True, |
|
temperature=temperature, |
|
top_p=top_p, |
|
): |
|
token = message.choices[0].delta.content |
|
|
|
response += token |
|
yield response |
|
|
|
|
|
""" |
|
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface |
|
""" |
|
demo = gr.ChatInterface( |
|
respond, |
|
additional_inputs=[ |
|
gr.Textbox(value="You are a friendly Chatbot.", label="System message"), |
|
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), |
|
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), |
|
gr.Slider( |
|
minimum=0.1, |
|
maximum=1.0, |
|
value=0.95, |
|
step=0.05, |
|
label="Top-p (nucleus sampling)", |
|
), |
|
], |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|
|
|
|
|
|
import gradio as gr |
|
from langchain.chains import LLMChain |
|
from langchain.prompts import PromptTemplate |
|
from langchain_huggingface import HuggingFaceEndpoint |
|
from langgraph.graph import StateGraph,END,START |
|
from typing import TypedDict |
|
|
|
class InputState(TypedDict): |
|
string_var :str |
|
numeric_var :int |
|
|
|
def changeState(input: InputState): |
|
print(f"Current value: {input}") |
|
return input |
|
|
|
# Define the LLM models |
|
llm1 = HuggingFaceEndpoint(model='t5-small') |
|
llm2 = HuggingFaceEndpoint(model='t5-large') |
|
|
|
# Define the agent functions |
|
def agent1(response): |
|
return f"Agent 1: {response}" |
|
|
|
def agent2(response): |
|
return f"Agent 2: {response}" |
|
|
|
# Define the prompts and LLM chains |
|
chain1 = LLMChain(llm=llm1, prompt=PromptTemplate( |
|
input_variables=["query"], |
|
template="You are in state s1. {{query}}" |
|
)) |
|
chain2 = LLMChain(llm=llm2, prompt=PromptTemplate( |
|
input_variables=["query"], |
|
template="You are in state s2. {{query}}" |
|
)) |
|
|
|
|
|
# Create a state graph with required schemas for inputs and outputs |
|
graph = StateGraph(InputState) |
|
|
|
# Add states to the graph |
|
graph.add_node("s1",changeState) |
|
graph.add_node("s2",changeState) |
|
|
|
# Define transitions |
|
graph.add_edge(START, "s1") # Transition from s1 to s2 |
|
graph.add_edge("s1", "s2") # Transition from s2 to s1 |
|
graph.add_edge("s2", END) |
|
|
|
# Initialize the current state |
|
current_state = "s1" |
|
|
|
def handle_input(query): |
|
global current_state |
|
output = '' |
|
|
|
# Process user input based on current state |
|
if current_state == "s1": |
|
output = chain1.invoke(input=query) # Invoke chain1 with user input |
|
response = agent1(output) # Process output through Agent 1 |
|
current_state = "s2" # Transition to state s2 |
|
elif current_state == "s2": |
|
output = chain2.invoke(input=query) # Invoke chain2 with user input |
|
response = agent2(output) # Process output through Agent 2 |
|
current_state = "s1" # Transition back to state s1 |
|
|
|
return response |
|
|
|
# Create the Gradio interface |
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Chatbot Interface") |
|
chatbot_interface = gr.Chatbot() |
|
user_input = gr.Textbox(label="Your Message", placeholder="Type something here...") |
|
submit_btn = gr.Button("Send") |
|
|
|
# Define the behavior of the submit button |
|
submit_btn.click( |
|
fn=lambda input_text: handle_input(input_text), # Handle user input |
|
inputs=[user_input], |
|
outputs=chatbot_interface |
|
) |
|
|
|
# Launch the Gradio application |
|
demo.launch() |
|
''' |
|
''' |
|
from typing import Annotated, Sequence, TypedDict |
|
import operator |
|
import functools |
|
|
|
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage |
|
from langchain_community.tools.tavily_search import TavilySearchResults |
|
from langchain_experimental.tools import PythonREPLTool |
|
from langchain.agents import create_openai_tools_agent |
|
from langchain_huggingface import HuggingFacePipeline |
|
from langgraph.graph import StateGraph, END |
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
|
|
# SETUP: HuggingFace Model and Pipeline |
|
#name = "meta-llama/Llama-3.2-1B" |
|
#name="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B" |
|
#name="deepseek-ai/deepseek-llm-7b-chat" |
|
#name="openai-community/gpt2" |
|
#name="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" |
|
#name="microsoft/Phi-3.5-mini-instruct" |
|
name="Qwen/Qwen2.5-7B-Instruct-1M" |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(name,truncation=True) |
|
tokenizer.pad_token = tokenizer.eos_token |
|
model = AutoModelForCausalLM.from_pretrained(name) |
|
|
|
pipe = pipeline( |
|
"text-generation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
device_map="auto", |
|
max_new_tokens=500, # text to generate for outputs |
|
) |
|
print ("pipeline is created") |
|
|
|
# Wrap in LangChain's HuggingFacePipeline |
|
llm = HuggingFacePipeline(pipeline=pipe) |
|
|
|
# Members and Final Options |
|
members = ["Researcher", "Coder"] |
|
options = ["FINISH"] + members |
|
|
|
# Supervisor prompt |
|
system_prompt = ( |
|
"You are a supervisor tasked with managing a conversation between the following workers: {members}." |
|
" Given the following user request, respond with the workers to act next. Each worker will perform a task" |
|
" and respond with their results and status. When all workers are finished, respond with FINISH." |
|
) |
|
|
|
# Prompt template required for the workflow |
|
prompt = ChatPromptTemplate.from_messages( |
|
[ |
|
("system", system_prompt), |
|
MessagesPlaceholder(variable_name="messages"), |
|
("system", "Given the conversation above, who should act next? Or Should we FINISH? Select one of: {options}"), |
|
] |
|
).partial(options=str(options), members=", ".join(members)) |
|
|
|
print ("Prompt Template created") |
|
|
|
# Supervisor routing logic |
|
def route_tool_response(llm_response): |
|
""" |
|
Parse the LLM response to determine the next step based on routing logic. |
|
""" |
|
if "FINISH" in llm_response: |
|
return "FINISH" |
|
for member in members: |
|
if member in llm_response: |
|
return member |
|
return "Unknown" |
|
|
|
def supervisor_chain(state): |
|
""" |
|
Supervisor logic to interact with HuggingFacePipeline and decide the next worker. |
|
""" |
|
messages = state.get("messages", []) |
|
print(f"[TRACE] Supervisor received messages: {messages}") # Trace input messages |
|
user_prompt = prompt.format(messages=messages) |
|
|
|
try: |
|
llm_response = pipe(user_prompt, max_new_tokens=500)[0]["generated_text"] |
|
print(f"[TRACE] LLM Response: {llm_response}") # Trace LLM interaction |
|
except Exception as e: |
|
raise RuntimeError(f"LLM processing error: {e}") |
|
|
|
next_action = route_tool_response(llm_response) |
|
print(f"[TRACE] Supervisor deciding next action: {next_action}") # Trace state changes |
|
return {"next": next_action} |
|
|
|
# AgentState definition |
|
class AgentState(TypedDict): |
|
messages: Annotated[Sequence[BaseMessage], operator.add] |
|
next: str |
|
|
|
# Create tools |
|
tavily_tool = TavilySearchResults(max_results=5) |
|
python_repl_tool = PythonREPLTool() |
|
|
|
# Create agents with their respective prompts |
|
research_agent = create_openai_tools_agent( |
|
llm=llm, |
|
tools=[tavily_tool], |
|
prompt=ChatPromptTemplate.from_messages( |
|
[ |
|
SystemMessage(content="You are a web researcher."), |
|
MessagesPlaceholder(variable_name="messages"), |
|
MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder |
|
] |
|
), |
|
) |
|
|
|
print ("Created agents with their respective prompts") |
|
|
|
code_agent = create_openai_tools_agent( |
|
llm=llm, |
|
tools=[python_repl_tool], |
|
prompt=ChatPromptTemplate.from_messages( |
|
[ |
|
SystemMessage(content="You may generate safe Python code for analysis."), |
|
MessagesPlaceholder(variable_name="messages"), |
|
MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder |
|
] |
|
), |
|
) |
|
|
|
|
|
print ("create_openai_tools_agent") |
|
|
|
|
|
# Create the workflow |
|
workflow = StateGraph(AgentState) |
|
|
|
# Nodes |
|
workflow.add_node("Researcher", research_agent) # Pass the agent directly (no .run required) |
|
workflow.add_node("Coder", code_agent) # Pass the agent directly |
|
workflow.add_node("supervisor", supervisor_chain) |
|
|
|
# Add edges for workflow transitions |
|
for member in members: |
|
workflow.add_edge(member, "supervisor") |
|
|
|
workflow.add_conditional_edges( |
|
"supervisor", |
|
lambda x: x["next"], |
|
{k: k for k in members} | {"FINISH": END} # Dynamically map workers to their actions |
|
) |
|
print("[DEBUG] Workflow edges added: supervisor -> members/FINISH based on 'next'") |
|
|
|
# Define entry point |
|
workflow.set_entry_point("supervisor") |
|
|
|
print(workflow) |
|
|
|
# Compile the workflow |
|
graph = workflow.compile() |
|
|
|
#from IPython.display import display, Image |
|
#display(Image(graph.get_graph().draw_mermaid_png())) |
|
|
|
# Properly formatted initial state |
|
initial_state = { |
|
"messages": [ |
|
#HumanMessage(content="Code hello world and print it to the terminal.") # Correct format for user input |
|
HumanMessage(content="Write Code for printing \"hello world\" in Python. Keep it precise.") # Correct format for user input |
|
] |
|
} |
|
|
|
# Execute the workflow |
|
try: |
|
print(f"[TRACE] Initial workflow state: {initial_state}") |
|
result = graph.invoke(initial_state) |
|
|
|
print(f"[TRACE] Workflow Result: {result}") # Final workflow result |
|
except Exception as e: |
|
print(f"[ERROR] Workflow execution failed: {e}") |
|
|
|
''' |
|
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM |
|
from langchain_huggingface import HuggingFacePipeline |
|
from langchain.tools import Tool |
|
from langchain.agents import create_react_agent |
|
from langgraph.graph import StateGraph, END |
|
from pydantic import BaseModel |
|
import gradio as gr |
|
import os |
|
if os.environ.get("SPACES_ZERO_GPU") is not None: |
|
import spaces |
|
else: |
|
class spaces: |
|
@staticmethod |
|
def GPU(func): |
|
def wrapper(*args, **kwargs): |
|
return func(*args, **kwargs) |
|
return wrapper |
|
|
|
@spaces.GPU |
|
def fake_gpu(): |
|
pass |
|
|
|
|
|
|
|
|
|
def create_llm(): |
|
model_name = "Qwen/Qwen2.5-7B-Instruct-1M" |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForCausalLM.from_pretrained(model_name) |
|
|
|
llm_pipeline = pipeline( |
|
task="text-generation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
device=-1, |
|
max_new_tokens=200 |
|
) |
|
return HuggingFacePipeline(pipeline=llm_pipeline) |
|
|
|
|
|
|
|
|
|
llm = create_llm() |
|
|
|
|
|
registration_agent = Tool( |
|
name="registration_check", |
|
description="Check if a patient is registered.", |
|
func=lambda details: registration_tool(details.get("visitor_name"), details.get("visitor_mobile")) |
|
) |
|
|
|
|
|
scheduling_agent = Tool( |
|
name="schedule_appointment", |
|
description="Fetch available time slots for a doctor.", |
|
func=lambda details: doctor_slots_tool(details.get("doctor_name")) |
|
) |
|
|
|
|
|
payment_agent = Tool( |
|
name="process_payment", |
|
description="Generate a payment link and confirm the payment.", |
|
func=lambda details: confirm_payment_tool(details.get("transaction_id")) |
|
) |
|
|
|
|
|
email_agent = Tool( |
|
name="send_email", |
|
description="Send appointment confirmation email to the visitor.", |
|
func=lambda details: email_tool( |
|
details.get("visitor_email"), |
|
details.get("appointment_details"), |
|
details.get("hospital_location") |
|
) |
|
) |
|
|
|
|
|
|
|
|
|
def registration_tool(visitor_name: str, visitor_mobile: str) -> bool: |
|
registered_visitors = [{"visitor_name": "John Doe", "visitor_mobile": "1234567890"}] |
|
return any( |
|
v["visitor_name"] == visitor_name and v["visitor_mobile"] == visitor_mobile |
|
for v in registered_visitors |
|
) |
|
|
|
def register_visitor(visitor_name: str, visitor_mobile: str) -> bool: |
|
"""Register a new user if not already registered.""" |
|
return True |
|
|
|
def doctor_slots_tool(doctor_name: str): |
|
available_slots = { |
|
"Dr. Smith": ["10:00 AM", "2:00 PM"], |
|
"Dr. Brown": ["12:00 PM"] |
|
} |
|
return available_slots.get(doctor_name, []) |
|
|
|
def payment_tool(amount: float): |
|
"""Generate a payment link.""" |
|
return f"http://mock-payment-link.com/pay?amount={amount}" |
|
|
|
def confirm_payment_tool(transaction_id: str) -> dict: |
|
"""Confirm the payment.""" |
|
if transaction_id == "TIMEOUT": |
|
return {"status": "FAILED", "reason_code": "timeout"} |
|
elif transaction_id == "SUCCESS": |
|
return {"status": "SUCCESS", "reason_code": None} |
|
else: |
|
return {"status": "FAILED", "reason_code": "other_error"} |
|
|
|
def email_tool(visitor_email: str, appointment_details: str, hospital_location: str) -> bool: |
|
"""Simulate sending an email to the visitor with appointment details.""" |
|
print(f"Sending email to {visitor_email}...") |
|
print(f"Appointment Details: {appointment_details}") |
|
print(f"Hospital Location: {hospital_location}") |
|
|
|
return True |
|
|
|
|
|
|
|
|
|
class VisitorState(BaseModel): |
|
visitor_name: str = "" |
|
visitor_mobile: str = "" |
|
visitor_email: str = "" |
|
doctor_name: str = "" |
|
department_name: str = "" |
|
selected_slot: str = "" |
|
messages: list = [] |
|
payment_confirmed: bool = False |
|
email_sent: bool = False |
|
|
|
def input_state(state: VisitorState): |
|
"""InputState: Collect visitor details.""" |
|
return {"messages": ["Please provide your name, mobile number, and email."], "next": "RegistrationState"} |
|
|
|
def registration_state(state: VisitorState): |
|
"""Registration State: Check and register visitor.""" |
|
is_registered = registration_tool(state.visitor_name, state.visitor_mobile) |
|
if is_registered: |
|
return {"messages": ["Visitor is registered."], "next": "SchedulingState"} |
|
else: |
|
successfully_registered = register_visitor(state.visitor_name, state.visitor_mobile) |
|
if successfully_registered: |
|
return {"messages": ["Visitor has been successfully registered."], "next": "SchedulingState"} |
|
else: |
|
return {"messages": ["Registration failed. Please try again later."], "next": END} |
|
|
|
def scheduling_state(state: VisitorState): |
|
"""SchedulingState: Fetch available slots for a doctor.""" |
|
available_slots = doctor_slots_tool(state.doctor_name) |
|
if available_slots: |
|
state.selected_slot = available_slots[0] |
|
return {"messages": [f"Slot selected for {state.doctor_name}: {state.selected_slot}"], "next": "PaymentState"} |
|
else: |
|
return {"messages": [f"No available slots for {state.doctor_name}."], "next": END} |
|
|
|
def payment_state(state: VisitorState): |
|
"""PaymentState: Generate payment link and confirm.""" |
|
payment_link = payment_tool(500) |
|
state.messages.append(f"Please proceed to pay at: {payment_link}") |
|
|
|
|
|
payment_response = confirm_payment_tool("SUCCESS") |
|
if payment_response["status"] == "SUCCESS": |
|
state.payment_confirmed = True |
|
return {"messages": ["Payment successful. Appointment is being finalized."], "next": "FinalState"} |
|
elif payment_response["reason_code"] == "timeout": |
|
return {"messages": ["Payment timed out. Retrying payment..."], "next": "PaymentState"} |
|
else: |
|
return {"messages": ["Payment failed due to an error. Please try again later."], "next": END} |
|
|
|
def final_state(state: VisitorState): |
|
"""FinalState: Send email confirmation and finalize the appointment.""" |
|
if state.payment_confirmed: |
|
appointment_details = f"Doctor: {state.doctor_name}\nTime: {state.selected_slot}" |
|
hospital_location = "123 Main St, Springfield, USA" |
|
email_success = email_tool(state.visitor_email, appointment_details, hospital_location) |
|
|
|
if email_success: |
|
state.email_sent = True |
|
return {"messages": [f"Appointment confirmed. Details sent to your email: {state.visitor_email}"], "next": END} |
|
else: |
|
return {"messages": ["Appointment confirmed, but failed to send email. Please contact support."], "next": END} |
|
else: |
|
return {"messages": ["Payment confirmation failed. Appointment could not be finalized."], "next": END} |
|
|
|
|
|
|
|
|
|
workflow = StateGraph(VisitorState) |
|
|
|
|
|
workflow.add_node("InputState", input_state) |
|
workflow.add_node("RegistrationState", registration_state) |
|
workflow.add_node("SchedulingState", scheduling_state) |
|
workflow.add_node("PaymentState", payment_state) |
|
workflow.add_node("FinalState", final_state) |
|
|
|
|
|
workflow.add_edge("InputState", "RegistrationState") |
|
workflow.add_edge("RegistrationState", "SchedulingState") |
|
workflow.add_edge("SchedulingState", "PaymentState") |
|
workflow.add_edge("PaymentState", "FinalState") |
|
|
|
|
|
workflow.set_entry_point("InputState") |
|
compiled_graph = workflow.compile() |
|
|
|
|
|
|
|
|
|
def gradio_interface(visitor_name, visitor_mobile, visitor_email, doctor_name, department_name): |
|
"""Interface for Gradio application.""" |
|
state = VisitorState( |
|
visitor_name=visitor_name, |
|
visitor_mobile=visitor_mobile, |
|
visitor_email=visitor_email, |
|
doctor_name=doctor_name, |
|
department_name=department_name, |
|
) |
|
|
|
result = compiled_graph.invoke(state.model_dump()) |
|
return "\n".join(result["messages"]) |
|
|
|
iface = gr.Interface( |
|
fn=gradio_interface, |
|
inputs=[ |
|
gr.Textbox(label="Visitor Name"), |
|
gr.Textbox(label="Visitor Mobile Number"), |
|
gr.Textbox(label="Visitor Email"), |
|
gr.Textbox(label="Doctor Name"), |
|
gr.Textbox(label="Department Name"), |
|
], |
|
outputs="textbox", |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
iface.launch() |
|
|
|
|