# This file will contain the logic for uploading data to BigQuery. from google.cloud import bigquery from google.cloud.exceptions import NotFound import uuid from datetime import datetime PROJECT_ID = "gem-creation" DATASET_ID = "aura_mind_glow_data" TABLE_ID = "farm_analysis" def get_bigquery_client(): """Returns an authenticated BigQuery client.""" try: client = bigquery.Client(project=PROJECT_ID) print("Successfully authenticated with BigQuery.") return client except Exception as e: print(f"Error authenticating with BigQuery: {e}") return None def create_dataset_if_not_exists(client): """Creates the BigQuery dataset if it doesn't exist.""" dataset_id = f"{PROJECT_ID}.{DATASET_ID}" try: client.get_dataset(dataset_id) # Make an API request. print(f"Dataset {dataset_id} already exists.") except NotFound: print(f"Dataset {dataset_id} is not found. Creating dataset...") dataset = bigquery.Dataset(dataset_id) dataset.location = "US" dataset = client.create_dataset(dataset, timeout=30) # Make an API request. print(f"Created dataset {client.project}.{dataset.dataset_id}") def create_table_if_not_exists(client): """Creates the BigQuery table if it doesn't exist.""" table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" try: client.get_table(table_id) # Make an API request. print(f"Table {table_id} already exists.") except NotFound: print(f"Table {table_id} is not found. Creating table...") schema = [ bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"), bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField("farmer_id", "STRING", mode="NULLABLE"), bigquery.SchemaField("gps_latitude", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("gps_longitude", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("crop_type", "STRING", mode="NULLABLE"), bigquery.SchemaField("crop_variety", "STRING", mode="NULLABLE"), bigquery.SchemaField("ai_diagnosis", "STRING", mode="NULLABLE"), bigquery.SchemaField("confidence_score", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("recommended_action", "STRING", mode="NULLABLE"), bigquery.SchemaField("farmer_feedback", "STRING", mode="NULLABLE"), bigquery.SchemaField("treatment_applied", "STRING", mode="NULLABLE"), bigquery.SchemaField("outcome_image_id", "STRING", mode="NULLABLE"), ] table = bigquery.Table(table_id, schema=schema) table = client.create_table(table) # Make an API request. print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}") def upload_diagnosis_to_bigquery(diagnosis_data: dict): """Uploads a single diagnosis record to BigQuery.""" client = get_bigquery_client() if client is None: print("BigQuery client not available. Cannot upload diagnosis.") return "BigQuery client not available." create_dataset_if_not_exists(client) create_table_if_not_exists(client) table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" # Add required fields if not present if "analysis_id" not in diagnosis_data: diagnosis_data["analysis_id"] = str(uuid.uuid4()) if "timestamp" not in diagnosis_data: diagnosis_data["timestamp"] = datetime.now().isoformat() rows_to_insert = [diagnosis_data] errors = client.insert_rows_json(table_id, rows_to_insert) if errors == []: print(f"Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully to BigQuery.") return "Diagnosis uploaded successfully." else: print(f"Encountered errors while inserting diagnosis record: {errors}") return f"Error uploading diagnosis: {errors}"