Spaces:
Sleeping
Sleeping
File size: 5,935 Bytes
cc9c554 20950eb cc9c554 603bc54 cc9c554 603bc54 cc9c554 603bc54 cc9c554 603bc54 cc9c554 603bc54 cc9c554 603bc54 cc9c554 603bc54 cc9c554 603bc54 cc9c554 603bc54 cc9c554 603bc54 cc9c554 20950eb 603bc54 cc9c554 603bc54 20950eb cc9c554 20950eb 603bc54 20950eb cc9c554 603bc54 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# This file will contain the logic for uploading data to BigQuery.
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import uuid
from datetime import datetime
# --- Configuration: Set your project, dataset, and table details here ---
PROJECT_ID = "gem-creation"
DATASET_ID = "aura_mind_glow_data"
TABLE_ID = "farm_analysis"
def get_bigquery_client():
"""Returns an authenticated BigQuery client."""
try:
client = bigquery.Client(project=PROJECT_ID)
print("β
Successfully authenticated with BigQuery.")
return client
except Exception as e:
print(f"β Error authenticating with BigQuery: {e}")
return None
def create_dataset_if_not_exists(client):
"""Creates the BigQuery dataset if it doesn't exist."""
dataset_id = f"{PROJECT_ID}.{DATASET_ID}"
try:
client.get_dataset(dataset_id) # Make an API request.
print(f"βΉοΈ Dataset {dataset_id} already exists.")
except NotFound:
print(f"π‘ Dataset {dataset_id} not found. Creating dataset...")
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US" # You can change the location if needed
dataset = client.create_dataset(dataset, timeout=30) # Make an API request.
print(f"β
Created dataset {client.project}.{dataset.dataset_id}")
def create_table_if_not_exists(client):
"""Creates the BigQuery table if it doesn't exist."""
table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
try:
client.get_table(table_id) # Make an API request.
print(f"βΉοΈ Table {table_id} already exists.")
except NotFound:
print(f"π‘ Table {table_id} not found. Creating table...")
schema = [
bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("farmer_id", "STRING", mode="NULLABLE"),
bigquery.SchemaField("gps_latitude", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("gps_longitude", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("crop_type", "STRING", mode="NULLABLE"),
bigquery.SchemaField("crop_variety", "STRING", mode="NULLABLE"),
bigquery.SchemaField("ai_diagnosis", "STRING", mode="NULLABLE"),
bigquery.SchemaField("confidence_score", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("recommended_action", "STRING", mode="NULLABLE"),
bigquery.SchemaField("farmer_feedback", "STRING", mode="NULLABLE"),
bigquery.SchemaField("treatment_applied", "STRING", mode="NULLABLE"),
bigquery.SchemaField("outcome_image_id", "STRING", mode="NULLABLE"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table) # Make an API request.
print(f"β
Created table {table.project}.{table.dataset_id}.{table.table_id}")
def upload_diagnosis_to_bigquery(diagnosis_data: dict):
"""Uploads a single diagnosis record (from a dictionary) to BigQuery."""
client = get_bigquery_client()
if client is None:
print("β BigQuery client not available. Cannot upload diagnosis.")
return "BigQuery client not available."
create_dataset_if_not_exists(client)
create_table_if_not_exists(client)
table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
if "analysis_id" not in diagnosis_data:
diagnosis_data["analysis_id"] = str(uuid.uuid4())
if "timestamp" not in diagnosis_data:
diagnosis_data["timestamp"] = datetime.now().isoformat()
rows_to_insert = [diagnosis_data]
errors = client.insert_rows_json(table_id, rows_to_insert)
if not errors:
print(f"β
Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully.")
return "Diagnosis uploaded successfully."
else:
print(f"β Encountered errors while inserting diagnosis record: {errors}")
return f"Error uploading diagnosis: {errors}"
def upload_csv_to_bigquery(csv_file_path: str):
"""
Uploads the contents of a CSV file to the specified BigQuery table.
Args:
csv_file_path (str): The local path to the CSV file.
"""
client = get_bigquery_client()
if client is None:
print("β BigQuery client not available. Cannot upload CSV.")
return
create_dataset_if_not_exists(client)
create_table_if_not_exists(client)
table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
# Configure the load job
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1, # Skip the header row
# We REMOVE autodetect=True. The job will now use the table's existing schema.
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
print(f"π Starting CSV upload from '{csv_file_path}' to table '{table_id}'...")
try:
with open(csv_file_path, "rb") as source_file:
load_job = client.load_table_from_file(source_file, table_id, job_config=job_config)
load_job.result() # Wait for the job to complete
destination_table = client.get_table(table_id)
# To get the number of rows uploaded in this job, we look at the job's output statistics
rows_uploaded = load_job.output_rows
print(f"β
Job finished. Loaded {rows_uploaded} new rows. The table '{table_id}' now has a total of {destination_table.num_rows} rows.")
return "CSV upload successful."
except Exception as e:
print(f"β An error occurred during the CSV upload: {e}")
return f"Error during CSV upload: {e}"
if __name__ == "__main__":
csv_file_to_upload = "farm_analysis_data.csv"
print("--- Running BigQuery CSV Uploader Test ---")
upload_csv_to_bigquery(csv_file_to_upload)
print("--- Test complete ---") |