TraumaBackend / trauma /core /database.py
brestok's picture
init
50553ea
raw
history blame
3.79 kB
from datetime import datetime
from enum import Enum
from typing import Dict, Any, Type
from bson import ObjectId
from pydantic import GetCoreSchemaHandler, BaseModel, Field, AnyUrl
from pydantic.json_schema import JsonSchemaValue
from pydantic_core import core_schema
class PyObjectId:
@classmethod
def __get_pydantic_core_schema__(
cls, source: type, handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
return core_schema.with_info_after_validator_function(
cls.validate, core_schema.str_schema()
)
@classmethod
def __get_pydantic_json_schema__(
cls, schema: core_schema.CoreSchema, handler: GetCoreSchemaHandler
) -> JsonSchemaValue:
return {"type": "string"}
@classmethod
def validate(cls, value: str) -> ObjectId:
if not ObjectId.is_valid(value):
raise ValueError(f"Invalid ObjectId: {value}")
return ObjectId(value)
def __getattr__(self, item):
return getattr(self.__dict__['value'], item)
def __init__(self, value: str = None):
if value is None:
self.value = ObjectId()
else:
self.value = self.validate(value)
def __str__(self):
return str(self.value)
class MongoBaseModel(BaseModel):
id: str = Field(default_factory=lambda: str(PyObjectId()))
class Config:
arbitrary_types_allowed = True
def to_mongo(self) -> Dict[str, Any]:
def model_to_dict(model: BaseModel) -> Dict[str, Any]:
doc = {}
for name, value in model._iter():
key = model.__fields__[name].alias or name
if isinstance(value, BaseModel):
doc[key] = model_to_dict(value)
elif isinstance(value, list) and all(isinstance(i, BaseModel) for i in value):
doc[key] = [model_to_dict(item) for item in value]
elif value and isinstance(value, Enum):
doc[key] = value.value
elif isinstance(value, datetime):
doc[key] = value.isoformat()
elif value and isinstance(value, AnyUrl):
doc[key] = str(value)
else:
doc[key] = value
return doc
result = model_to_dict(self)
return result
@classmethod
def from_mongo(cls, data: Dict[str, Any]):
def restore_enums(inst: Any, model_cls: Type[BaseModel]) -> None:
for name, field in model_cls.__fields__.items():
value = getattr(inst, name)
if field and isinstance(field.annotation, type) and issubclass(field.annotation, Enum):
setattr(inst, name, field.annotation(value))
elif isinstance(value, BaseModel):
restore_enums(value, value.__class__)
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, BaseModel):
restore_enums(item, item.__class__)
elif isinstance(field.annotation, type) and issubclass(field.annotation, Enum):
value[i] = field.annotation(item)
elif isinstance(value, dict):
for k, v in value.items():
if isinstance(v, BaseModel):
restore_enums(v, v.__class__)
elif isinstance(field.annotation, type) and issubclass(field.annotation, Enum):
value[k] = field.annotation(v)
if data is None:
return None
instance = cls(**data)
restore_enums(instance, instance.__class__)
return instance