Spaces:
Sleeping
Sleeping
File size: 8,816 Bytes
c4f4c06 169210f 57f629b c4f4c06 57f629b c4f4c06 a7b0aac c4f4c06 a7b0aac 57f629b 169210f 57f629b 169210f 57f629b 169210f 903b676 169210f 57f629b 169210f 903b676 169210f 25e3641 1e7a59d 25e3641 57f629b 25e3641 57f629b 25e3641 57f629b 25e3641 57f629b 25e3641 c4f4c06 25e3641 169210f 57f629b fb90d41 57f629b fb90d41 57f629b fb90d41 57f629b fb90d41 57f629b 169210f 57f629b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
import os
import glob
from pathlib import Path
from typing import List
from dotenv import load_dotenv
from openai import OpenAI
import gradio as gr
from pinecone import Pinecone, ServerlessSpec
# Load .env from the script's directory
env_path = Path(__file__).resolve().parent / '.env'
print("Loading .env from:", env_path)
load_dotenv(dotenv_path=env_path)
# Load environment variables
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_ENV = os.getenv("PINECONE_ENV")
INDEX_NAME = os.getenv("PINECONE_INDEX", "hr-handbook")
print("Loaded PINECONE_API_KEY:", PINECONE_API_KEY[:6] + "..." if PINECONE_API_KEY else "NOT FOUND")
print("Loaded OPENAI_API_KEY:", OPENAI_API_KEY[:6] + "..." if OPENAI_API_KEY else "NOT FOUND")
# Initialize OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)
# Define recommended questions before using them
recommended = {
"all": [
"Where can I find benefits information?",
"How do I request time off?",
"What is Made Tech’s mission",
"How does Made Tech support learning and mentoring?",
],
"benefits": [
"What does private medical insurance cover?",
"How do I join the pension scheme?",
"What is the maximum amount I can apply for under the Cycle to Work scheme?",
"How do I request a flexible working pattern?",
"How do I apply for Help to Buy Tech through TechScheme?",
"Can I increase or decrease my pension contributions?",
"When are Winter and Summer company parties held?",
],
"company": [
"What is the company's mission?",
"What values guide the work and culture at Made Tech?",
"What is Made Tech's purpose?",
"What is the role of a peer buddy in the onboarding process?",
"What policies should I read as a new employee?",
],
"guides": [
"How do I submit an expense?",
"Where is the hiring policy?",
"What is chalet time?",
"What accounts and tools are introduced during onboarding?",
"Will I receive a laptop before my first day?",
"What is Chalet Time? What are the priorities for using Chalet Time?",
"What should I do if I am planning to relocate?",
"How can I contribute to the handbook?",
],
"roles": [
"What does a data scientist do?",
"What is the duration of the Software Engineering Academy at Made Tech?",
"How do career levels work?",
"What types of needs does an Associate Product Manager explore in their role?",
"How are success criteria and measurable outcomes defined?",
"What are the responsibilities of a Delivery Support Analyst in PMO?",
"How do Delivery Directors contribute to Made Tech's commercial growth?",
"What are some key outcomes expected from a Delivery Director?",
],
"communities-of-practice": [
"How can I join a community of practice?",
"What is the purpose of the Book Club at Made Tech?",
"How often does the Book Club meet?",
"What is the EDGE approach to digital transformation?",
"How can I join the Book Club meetings?",
"When do CoPs meet?",
],
}
# Initialize Pinecone
def init_pinecone(index_name: str):
pc = Pinecone(api_key=PINECONE_API_KEY)
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=1536,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
return pc.Index(index_name)
# Load text files
def load_documents(root_dir: str) -> List[dict]:
docs = []
for path in Path(root_dir).rglob("*.txt"):
category = path.parts[1] if len(path.parts) > 1 else "general"
with open(path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
docs.append({"id": str(path), "text": content, "category": category})
return docs
# Embed and upsert
def index_documents(index, docs: List[dict]):
for batch_start in range(0, len(docs), 100):
batch = docs[batch_start:batch_start + 100]
ids = [doc["id"] for doc in batch]
texts = [doc["text"] for doc in batch]
embeddings = client.embeddings.create(input=texts, model="text-embedding-ada-002")
vectors = [
(id_, emb.embedding, {"category": doc["category"]})
for id_, emb, doc in zip(ids, embeddings.data, batch)
]
index.upsert(vectors)
# Query Pinecone
def retrieve(query: str, index, category: str = None, k: int = 5) -> List[str]:
embed = client.embeddings.create(input=[query], model="text-embedding-ada-002").data[0].embedding
kwargs = {"top_k": k, "include_metadata": True}
if category:
kwargs["filter"] = {"category": {"$eq": category}}
res = index.query(vector=embed, **kwargs)
return [m["metadata"].get("text", "") for m in res["matches"] if "metadata" in m and m["metadata"].get("text")]
# Generate answer
def generate_answer(query: str, docs: List[str]) -> str:
system_prompt = (
"You are a helpful HR assistant. Use the provided context to answer the question.\n"
"If the answer is not contained in the context, reply that you don't know."
)
context = "\n\n".join(docs)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}
]
response = client.chat.completions.create(model="gpt-3.5-turbo", messages=messages)
return response.choices[0].message.content.strip()
# Gradio logic
def answer_question(query: str, category: str):
docs = retrieve(query, pinecone_index, category)
return generate_answer(query, docs)
# Main logic
if __name__ == "__main__":
pinecone_index = init_pinecone(INDEX_NAME)
if not int(os.getenv("SKIP_INDEXING", "0")):
documents = load_documents(".")
index_documents(pinecone_index, documents)
categories = sorted(set(Path(p).parts[0] for p in glob.glob('*/*.txt')) | set(recommended.keys()) - {"all"})
with gr.Blocks() as demo:
gr.HTML("""
<style>
#banner-img {
display: flex;
justify-content: center;
margin-bottom: 20px;
}
#banner-img img {
max-width: 800px;
width: 100%;
height: auto;
border-radius: 10px;
}
.gradio-container .gr-tabnav button {
background: linear-gradient(to right, #36d1dc, #5b86e5) !important;
color: white !important;
border: none !important;
border-radius: 8px !important;
padding: 10px 16px;
margin: 0 4px;
font-weight: bold;
transition: 0.3s;
}
.gradio-container .gr-tabnav button:hover {
background: linear-gradient(to right, #5b86e5, #36d1dc) !important;
transform: scale(1.05);
}
.gradio-container .gr-tabnav button[aria-selected="true"] {
background: #1e3c72 !important;
font-weight: bold;
}
</style>
""")
with gr.Row():
banner_path = Path(__file__).resolve().parent / "bannerhr.png"
banner_value = str(banner_path) if banner_path.exists() else None
gr.Image(value=banner_value, show_label=False, show_download_button=False, elem_id="banner-img")
with gr.Tabs():
for cat in categories:
with gr.Tab(cat.capitalize()):
with gr.Row():
with gr.Column():
example_choices = recommended.get(cat, recommended["all"])
example_value = example_choices[0] if example_choices else ""
examples = gr.Dropdown(
choices=example_choices,
label="Recommended questions",
value=example_value
)
with gr.Column():
query = gr.Textbox(
label="Ask a question",
value=example_value
)
submit = gr.Button("Submit")
answer = gr.Textbox(label="Answer")
examples.change(lambda q: q, inputs=examples, outputs=query)
submit.click(lambda q, cat=cat: answer_question(q, cat), inputs=[query], outputs=answer)
demo.launch()
|