|
|
import asyncio |
|
|
import http |
|
|
import json |
|
|
from datetime import datetime |
|
|
|
|
|
import requests |
|
|
import uvicorn |
|
|
from asgi_correlation_id import CorrelationIdMiddleware |
|
|
from fastapi import FastAPI, Request |
|
|
from fastapi.exceptions import RequestValidationError, HTTPException |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import FileResponse, JSONResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from pymongo import __version__ as pymongo_version |
|
|
from pymongo.errors import PyMongoError |
|
|
|
|
|
from my_ghost_writer import pymongo_operations_rw, exception_handlers |
|
|
from my_ghost_writer.constants import (app_logger, ALLOWED_ORIGIN_LIST, API_MODE, DOMAIN, IS_TESTING, STATIC_FOLDER, |
|
|
WORDSAPI_KEY, WORDSAPI_URL, RAPIDAPI_HOST, ME_CONFIG_MONGODB_USE_OK, ME_CONFIG_MONGODB_HEALTHCHECK_SLEEP, |
|
|
STATIC_FOLDER_LITEKOBOLDAINET, LOG_LEVEL, PORT) |
|
|
from my_ghost_writer.pymongo_utils import mongodb_health_check |
|
|
from my_ghost_writer.text_parsers import text_stemming |
|
|
from my_ghost_writer.type_hints import RequestTextFrequencyBody, RequestQueryThesaurusWordsapiBody |
|
|
|
|
|
|
|
|
async def mongo_health_check_background_task(): |
|
|
app_logger.info(f"starting task, ME_CONFIG_MONGODB_USE_OK:{ME_CONFIG_MONGODB_USE_OK}...") |
|
|
while ME_CONFIG_MONGODB_USE_OK: |
|
|
try: |
|
|
db_ok["mongo_ok"] = health_mongo() == "Mongodb: still alive..." |
|
|
except PyMongoError: |
|
|
db_ok["mongo_ok"] = False |
|
|
await asyncio.sleep(ME_CONFIG_MONGODB_HEALTHCHECK_SLEEP) |
|
|
|
|
|
|
|
|
async def lifespan(app: FastAPI): |
|
|
task = asyncio.create_task(mongo_health_check_background_task()) |
|
|
yield |
|
|
task.cancel() |
|
|
try: |
|
|
await task |
|
|
except asyncio.CancelledError: |
|
|
pass |
|
|
|
|
|
|
|
|
fastapi_title = "My Ghost Writer" |
|
|
app = FastAPI(title=fastapi_title, version="1.0", lifespan=lifespan) |
|
|
app_logger.info(f"allowed_origins:{ALLOWED_ORIGIN_LIST}, IS_TESTING:{IS_TESTING}, LOG_LEVEL:{LOG_LEVEL}!") |
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=ALLOWED_ORIGIN_LIST, |
|
|
allow_credentials=True, |
|
|
allow_methods=["GET", "POST"] |
|
|
) |
|
|
db_ok = {"mongo_ok": ME_CONFIG_MONGODB_USE_OK} |
|
|
|
|
|
|
|
|
@app.middleware("http") |
|
|
async def request_middleware(request, call_next): |
|
|
from my_ghost_writer.middlewares import logging_middleware |
|
|
|
|
|
return await logging_middleware(request, call_next) |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
def health(): |
|
|
from nltk import __version__ as nltk_version |
|
|
from fastapi import __version__ as fastapi_version |
|
|
from my_ghost_writer.__version__ import __version__ as ghost_writer_version |
|
|
app_logger.info( |
|
|
f"still alive... FastAPI version:{fastapi_version}, nltk version:{nltk_version}, my-ghost-writer version:{ghost_writer_version}!") |
|
|
return "Still alive..." |
|
|
|
|
|
|
|
|
@app.get("/health-mongo") |
|
|
def health_mongo() -> str: |
|
|
app_logger.info(f"pymongo driver version:{pymongo_version}!") |
|
|
if ME_CONFIG_MONGODB_USE_OK: |
|
|
try: |
|
|
db_ok["mongo_ok"] = mongodb_health_check() |
|
|
return "Mongodb: still alive..." |
|
|
except PyMongoError as pme: |
|
|
app_logger.error(str(pme)) |
|
|
db_ok["mongo_ok"] = False |
|
|
raise HTTPException("mongo not ok!") |
|
|
return f"ME_CONFIG_MONGODB_USE_OK:{ME_CONFIG_MONGODB_USE_OK}..." |
|
|
|
|
|
|
|
|
@app.post("/words-frequency") |
|
|
def get_words_frequency(body: RequestTextFrequencyBody | str) -> JSONResponse: |
|
|
t0 = datetime.now() |
|
|
app_logger.info(f"body type: {type(body)}.") |
|
|
app_logger.info(f"body: {body}.") |
|
|
body_validated = RequestTextFrequencyBody.model_validate_json(body) |
|
|
text = body_validated.text |
|
|
app_logger.info(f"LOG_LEVEL: '{LOG_LEVEL}', length of text: {len(text)}, type of 'text':'{type(text)}'.") |
|
|
if len(text) < 100: |
|
|
app_logger.debug(f"text from request: {text} ...") |
|
|
n_total_rows, words_stems_dict = text_stemming(text) |
|
|
dumped = json.dumps(words_stems_dict) |
|
|
app_logger.debug(f"dumped: {dumped} ...") |
|
|
t1 = datetime.now() |
|
|
duration = (t1 - t0).total_seconds() |
|
|
content_response = {'words_frequency': dumped, "duration": f"{duration:.3f}", "n_total_rows": n_total_rows} |
|
|
app_logger.info(f"content_response: {content_response["duration"]}, {content_response["n_total_rows"]} ...") |
|
|
app_logger.debug(f"content_response: {content_response} ...") |
|
|
return JSONResponse(status_code=200, content=content_response) |
|
|
|
|
|
|
|
|
@app.post("/thesaurus-wordsapi") |
|
|
def get_thesaurus_wordsapi(body: RequestQueryThesaurusWordsapiBody | str) -> JSONResponse: |
|
|
t0 = datetime.now() |
|
|
app_logger.info(f"body type: {type(body)} => {body}.") |
|
|
body_validated = RequestQueryThesaurusWordsapiBody.model_validate_json(body) |
|
|
query = body_validated.query |
|
|
use_mongo: bool = db_ok["mongo_ok"] |
|
|
app_logger.info(f"query: {type(query)} => {query}, use mongo? {use_mongo}.") |
|
|
if use_mongo: |
|
|
try: |
|
|
response = pymongo_operations_rw.get_document_by_word(query=query) |
|
|
t1 = datetime.now() |
|
|
duration = (t1 - t0).total_seconds() |
|
|
app_logger.info(f"found local data, duration: {duration:.3f}s.") |
|
|
return JSONResponse(status_code=200, content={"duration": duration, "thesaurus": response, "source": "local"}) |
|
|
except (PyMongoError, AssertionError) as pme: |
|
|
app_logger.info(f"{pme}! Let's try the remote service...") |
|
|
|
|
|
url = f"{WORDSAPI_URL}/{query}" |
|
|
app_logger.info(f"url: {type(url)} => {url}.") |
|
|
headers = { |
|
|
"x-rapidapi-key": WORDSAPI_KEY, |
|
|
"x-rapidapi-host": RAPIDAPI_HOST |
|
|
} |
|
|
response = requests.get(url, headers=headers) |
|
|
t1 = datetime.now() |
|
|
duration = (t1 - t0).total_seconds() |
|
|
app_logger.info(f"response.status_code: {response.status_code}, duration: {duration:.3f}s.") |
|
|
msg = f"API response is not 200: '{response.status_code}', query={query}, url={url}, duration: {duration:.3f}s." |
|
|
try: |
|
|
assert response.status_code < 500, msg |
|
|
try: |
|
|
assert response.status_code < 400, msg |
|
|
except AssertionError: |
|
|
msg = response.json() |
|
|
app_logger.info(f"msg_404:{msg}.") |
|
|
return JSONResponse(status_code=response.status_code, content={"msg": msg}) |
|
|
response_json = response.json() |
|
|
if use_mongo: |
|
|
app_logger.debug(f"use_mongo:{use_mongo}, inserting response '{response_json}' by query '{query}' on db...") |
|
|
pymongo_operations_rw.insert_document(response_json) |
|
|
del response_json["_id"] |
|
|
t2 = datetime.now() |
|
|
duration = (t2 - t1).total_seconds() |
|
|
app_logger.info(f"response_json: inserted json on local db, duration: {duration:.3f}s. ...") |
|
|
return JSONResponse(status_code=200, |
|
|
content={"duration": duration, "thesaurus": response_json, "source": "wordsapi"}) |
|
|
except AssertionError as ae500: |
|
|
app_logger.error(f"URL: query => {type(query)} {query}; url => {type(url)} {url}.") |
|
|
app_logger.error(f"headers type: {type(headers)}...") |
|
|
|
|
|
app_logger.error("response:") |
|
|
app_logger.error(str(response)) |
|
|
app_logger.error(str(ae500)) |
|
|
msg = f"request with query '{query}' has response with status '{http.HTTPStatus(response.status_code).phrase}'" |
|
|
app_logger.error(f"type_msg:{type(msg)}, msg:{msg}.") |
|
|
raise HTTPException(status_code=response.status_code, detail=msg) |
|
|
|
|
|
|
|
|
@app.exception_handler(RequestValidationError) |
|
|
def request_validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse: |
|
|
return exception_handlers.request_validation_exception_handler(request, exc) |
|
|
|
|
|
|
|
|
@app.exception_handler(HTTPException) |
|
|
def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: |
|
|
return exception_handlers.http_exception_handler(request, exc) |
|
|
|
|
|
|
|
|
try: |
|
|
app.mount("/static", StaticFiles(directory=STATIC_FOLDER, html=True), name="static") |
|
|
except Exception as ex_mount_static: |
|
|
app_logger.error( |
|
|
f"Failed to mount static folder: {STATIC_FOLDER}, exception: {ex_mount_static}, API_MODE: {API_MODE}!") |
|
|
if not API_MODE: |
|
|
app_logger.exception(f"since API_MODE is {API_MODE} we will raise the exception!") |
|
|
raise ex_mount_static |
|
|
try: |
|
|
app.mount("/lite.koboldai.net", StaticFiles(directory=STATIC_FOLDER_LITEKOBOLDAINET, html=True), name="lite.koboldai.net") |
|
|
except Exception as ex_mount_static1: |
|
|
app_logger.error( |
|
|
f"Failed to mount static folder: {STATIC_FOLDER_LITEKOBOLDAINET}, exception: {ex_mount_static1}, API_MODE: {API_MODE}!") |
|
|
if not API_MODE: |
|
|
app_logger.exception(f"since API_MODE is {API_MODE} we will raise the exception!") |
|
|
raise ex_mount_static1 |
|
|
|
|
|
|
|
|
app.add_middleware(CorrelationIdMiddleware) |
|
|
|
|
|
try: |
|
|
@app.get("/") |
|
|
@app.get("/static/") |
|
|
def index() -> FileResponse: |
|
|
return FileResponse(path=STATIC_FOLDER / "index.html", media_type="text/html") |
|
|
except Exception as ex_route_main: |
|
|
app_logger.error(f"Failed to prepare the main route, exception: {ex_route_main}, API_MODE: {API_MODE}!") |
|
|
if not API_MODE: |
|
|
app_logger.exception(f"since API_MODE is {API_MODE} we will raise the exception!") |
|
|
raise ex_route_main |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
app_logger.info( |
|
|
f"Starting fastapi/gradio application {fastapi_title}, run in api mode: {API_MODE} (no static folder and main route)...") |
|
|
uvicorn.run("my_ghost_writer.app:app", host=DOMAIN, port=PORT, reload=bool(IS_TESTING)) |
|
|
except Exception as ex_run: |
|
|
print(f"fastapi/gradio application {fastapi_title}, exception:{ex_run}!") |
|
|
app_logger.exception(f"fastapi/gradio application {fastapi_title}, exception:{ex_run}!") |
|
|
|
|
|
app_logger.error(f"ALLOWED_ORIGIN_LIST: '{ALLOWED_ORIGIN_LIST}'") |
|
|
app_logger.error(f"API_MODE: '{API_MODE}'") |
|
|
app_logger.error(f"DOMAIN: '{DOMAIN}'") |
|
|
app_logger.error(f"IS_TESTING: '{IS_TESTING}'") |
|
|
app_logger.error(f"LOG_LEVEL: '{LOG_LEVEL}'") |
|
|
app_logger.error(f"PORT: '{PORT}'") |
|
|
app_logger.error(f"STATIC_FOLDER: '{STATIC_FOLDER}'") |
|
|
raise ex_run |
|
|
|