aura-mind-glow / bigquery_uploader.py
surfiniaburger's picture
rm sqlite
20950eb
raw
history blame
3.93 kB
# This file will contain the logic for uploading data to BigQuery.
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import uuid
from datetime import datetime
PROJECT_ID = "gem-creation"
DATASET_ID = "aura_mind_glow_data"
TABLE_ID = "farm_analysis"
def get_bigquery_client():
"""Returns an authenticated BigQuery client."""
try:
client = bigquery.Client(project=PROJECT_ID)
print("Successfully authenticated with BigQuery.")
return client
except Exception as e:
print(f"Error authenticating with BigQuery: {e}")
return None
def create_dataset_if_not_exists(client):
"""Creates the BigQuery dataset if it doesn't exist."""
dataset_id = f"{PROJECT_ID}.{DATASET_ID}"
try:
client.get_dataset(dataset_id) # Make an API request.
print(f"Dataset {dataset_id} already exists.")
except NotFound:
print(f"Dataset {dataset_id} is not found. Creating dataset...")
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, timeout=30) # Make an API request.
print(f"Created dataset {client.project}.{dataset.dataset_id}")
def create_table_if_not_exists(client):
"""Creates the BigQuery table if it doesn't exist."""
table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
try:
client.get_table(table_id) # Make an API request.
print(f"Table {table_id} already exists.")
except NotFound:
print(f"Table {table_id} is not found. Creating table...")
schema = [
bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("farmer_id", "STRING", mode="NULLABLE"),
bigquery.SchemaField("gps_latitude", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("gps_longitude", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("crop_type", "STRING", mode="NULLABLE"),
bigquery.SchemaField("crop_variety", "STRING", mode="NULLABLE"),
bigquery.SchemaField("ai_diagnosis", "STRING", mode="NULLABLE"),
bigquery.SchemaField("confidence_score", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("recommended_action", "STRING", mode="NULLABLE"),
bigquery.SchemaField("farmer_feedback", "STRING", mode="NULLABLE"),
bigquery.SchemaField("treatment_applied", "STRING", mode="NULLABLE"),
bigquery.SchemaField("outcome_image_id", "STRING", mode="NULLABLE"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table) # Make an API request.
print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")
def upload_diagnosis_to_bigquery(diagnosis_data: dict):
"""Uploads a single diagnosis record to BigQuery."""
client = get_bigquery_client()
if client is None:
print("BigQuery client not available. Cannot upload diagnosis.")
return "BigQuery client not available."
create_dataset_if_not_exists(client)
create_table_if_not_exists(client)
table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
# Add required fields if not present
if "analysis_id" not in diagnosis_data:
diagnosis_data["analysis_id"] = str(uuid.uuid4())
if "timestamp" not in diagnosis_data:
diagnosis_data["timestamp"] = datetime.now().isoformat()
rows_to_insert = [diagnosis_data]
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors == []:
print(f"Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully to BigQuery.")
return "Diagnosis uploaded successfully."
else:
print(f"Encountered errors while inserting diagnosis record: {errors}")
return f"Error uploading diagnosis: {errors}"