Spaces:
Sleeping
Sleeping
| # This file will contain the logic for uploading data to BigQuery. | |
| from google.cloud import bigquery | |
| from google.cloud.exceptions import NotFound | |
| import uuid | |
| from datetime import datetime | |
| PROJECT_ID = "gem-creation" | |
| DATASET_ID = "aura_mind_glow_data" | |
| TABLE_ID = "farm_analysis" | |
| def get_bigquery_client(): | |
| """Returns an authenticated BigQuery client.""" | |
| try: | |
| client = bigquery.Client(project=PROJECT_ID) | |
| print("Successfully authenticated with BigQuery.") | |
| return client | |
| except Exception as e: | |
| print(f"Error authenticating with BigQuery: {e}") | |
| return None | |
| def create_dataset_if_not_exists(client): | |
| """Creates the BigQuery dataset if it doesn't exist.""" | |
| dataset_id = f"{PROJECT_ID}.{DATASET_ID}" | |
| try: | |
| client.get_dataset(dataset_id) # Make an API request. | |
| print(f"Dataset {dataset_id} already exists.") | |
| except NotFound: | |
| print(f"Dataset {dataset_id} is not found. Creating dataset...") | |
| dataset = bigquery.Dataset(dataset_id) | |
| dataset.location = "US" | |
| dataset = client.create_dataset(dataset, timeout=30) # Make an API request. | |
| print(f"Created dataset {client.project}.{dataset.dataset_id}") | |
| def create_table_if_not_exists(client): | |
| """Creates the BigQuery table if it doesn't exist.""" | |
| table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" | |
| try: | |
| client.get_table(table_id) # Make an API request. | |
| print(f"Table {table_id} already exists.") | |
| except NotFound: | |
| print(f"Table {table_id} is not found. Creating table...") | |
| schema = [ | |
| bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"), | |
| bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"), | |
| bigquery.SchemaField("farmer_id", "STRING", mode="NULLABLE"), | |
| bigquery.SchemaField("gps_latitude", "FLOAT", mode="NULLABLE"), | |
| bigquery.SchemaField("gps_longitude", "FLOAT", mode="NULLABLE"), | |
| bigquery.SchemaField("crop_type", "STRING", mode="NULLABLE"), | |
| bigquery.SchemaField("crop_variety", "STRING", mode="NULLABLE"), | |
| bigquery.SchemaField("ai_diagnosis", "STRING", mode="NULLABLE"), | |
| bigquery.SchemaField("confidence_score", "FLOAT", mode="NULLABLE"), | |
| bigquery.SchemaField("recommended_action", "STRING", mode="NULLABLE"), | |
| bigquery.SchemaField("farmer_feedback", "STRING", mode="NULLABLE"), | |
| bigquery.SchemaField("treatment_applied", "STRING", mode="NULLABLE"), | |
| bigquery.SchemaField("outcome_image_id", "STRING", mode="NULLABLE"), | |
| ] | |
| table = bigquery.Table(table_id, schema=schema) | |
| table = client.create_table(table) # Make an API request. | |
| print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}") | |
| def upload_diagnosis_to_bigquery(diagnosis_data: dict): | |
| """Uploads a single diagnosis record to BigQuery.""" | |
| client = get_bigquery_client() | |
| if client is None: | |
| print("BigQuery client not available. Cannot upload diagnosis.") | |
| return "BigQuery client not available." | |
| create_dataset_if_not_exists(client) | |
| create_table_if_not_exists(client) | |
| table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" | |
| # Add required fields if not present | |
| if "analysis_id" not in diagnosis_data: | |
| diagnosis_data["analysis_id"] = str(uuid.uuid4()) | |
| if "timestamp" not in diagnosis_data: | |
| diagnosis_data["timestamp"] = datetime.now().isoformat() | |
| rows_to_insert = [diagnosis_data] | |
| errors = client.insert_rows_json(table_id, rows_to_insert) | |
| if errors == []: | |
| print(f"Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully to BigQuery.") | |
| return "Diagnosis uploaded successfully." | |
| else: | |
| print(f"Encountered errors while inserting diagnosis record: {errors}") | |
| return f"Error uploading diagnosis: {errors}" |