File size: 5,935 Bytes
cc9c554
 
 
 
20950eb
 
cc9c554
603bc54
cc9c554
 
 
 
 
 
 
 
603bc54
cc9c554
 
603bc54
cc9c554
 
 
 
 
 
 
603bc54
cc9c554
603bc54
cc9c554
603bc54
cc9c554
603bc54
cc9c554
 
 
 
 
 
 
603bc54
cc9c554
603bc54
cc9c554
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
603bc54
cc9c554
20950eb
603bc54
cc9c554
 
603bc54
20950eb
cc9c554
 
 
 
 
 
20950eb
 
 
 
 
 
 
 
603bc54
 
20950eb
cc9c554
603bc54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# This file will contain the logic for uploading data to BigQuery.

from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import uuid
from datetime import datetime

# --- Configuration: Set your project, dataset, and table details here ---
PROJECT_ID = "gem-creation"
DATASET_ID = "aura_mind_glow_data"
TABLE_ID = "farm_analysis"

def get_bigquery_client():
    """Returns an authenticated BigQuery client."""
    try:
        client = bigquery.Client(project=PROJECT_ID)
        print("βœ… Successfully authenticated with BigQuery.")
        return client
    except Exception as e:
        print(f"❌ Error authenticating with BigQuery: {e}")
        return None

def create_dataset_if_not_exists(client):
    """Creates the BigQuery dataset if it doesn't exist."""
    dataset_id = f"{PROJECT_ID}.{DATASET_ID}"
    try:
        client.get_dataset(dataset_id)  # Make an API request.
        print(f"ℹ️ Dataset {dataset_id} already exists.")
    except NotFound:
        print(f"🟑 Dataset {dataset_id} not found. Creating dataset...")
        dataset = bigquery.Dataset(dataset_id)
        dataset.location = "US"  # You can change the location if needed
        dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
        print(f"βœ… Created dataset {client.project}.{dataset.dataset_id}")


def create_table_if_not_exists(client):
    """Creates the BigQuery table if it doesn't exist."""
    table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
    try:
        client.get_table(table_id)  # Make an API request.
        print(f"ℹ️ Table {table_id} already exists.")
    except NotFound:
        print(f"🟑 Table {table_id} not found. Creating table...")
        schema = [
            bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
            bigquery.SchemaField("farmer_id", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("gps_latitude", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("gps_longitude", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("crop_type", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("crop_variety", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("ai_diagnosis", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("confidence_score", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("recommended_action", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("farmer_feedback", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("treatment_applied", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("outcome_image_id", "STRING", mode="NULLABLE"),
        ]
        table = bigquery.Table(table_id, schema=schema)
        table = client.create_table(table)  # Make an API request.
        print(f"βœ… Created table {table.project}.{table.dataset_id}.{table.table_id}")

def upload_diagnosis_to_bigquery(diagnosis_data: dict):
    """Uploads a single diagnosis record (from a dictionary) to BigQuery."""
    client = get_bigquery_client()
    if client is None:
        print("❌ BigQuery client not available. Cannot upload diagnosis.")
        return "BigQuery client not available."

    create_dataset_if_not_exists(client)
    create_table_if_not_exists(client)

    table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"

    if "analysis_id" not in diagnosis_data:
        diagnosis_data["analysis_id"] = str(uuid.uuid4())
    if "timestamp" not in diagnosis_data:
        diagnosis_data["timestamp"] = datetime.now().isoformat()

    rows_to_insert = [diagnosis_data]

    errors = client.insert_rows_json(table_id, rows_to_insert)
    if not errors:
        print(f"βœ… Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully.")
        return "Diagnosis uploaded successfully."
    else:
        print(f"❌ Encountered errors while inserting diagnosis record: {errors}")
        return f"Error uploading diagnosis: {errors}"


def upload_csv_to_bigquery(csv_file_path: str):
    """
    Uploads the contents of a CSV file to the specified BigQuery table.

    Args:
        csv_file_path (str): The local path to the CSV file.
    """
    client = get_bigquery_client()
    if client is None:
        print("❌ BigQuery client not available. Cannot upload CSV.")
        return

    create_dataset_if_not_exists(client)
    create_table_if_not_exists(client)

    table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"

    # Configure the load job
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,  # Skip the header row
        # We REMOVE autodetect=True. The job will now use the table's existing schema.
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    )

    print(f"πŸš€ Starting CSV upload from '{csv_file_path}' to table '{table_id}'...")

    try:
        with open(csv_file_path, "rb") as source_file:
            load_job = client.load_table_from_file(source_file, table_id, job_config=job_config)

        load_job.result()  # Wait for the job to complete

        destination_table = client.get_table(table_id)
        # To get the number of rows uploaded in this job, we look at the job's output statistics
        rows_uploaded = load_job.output_rows
        print(f"βœ… Job finished. Loaded {rows_uploaded} new rows. The table '{table_id}' now has a total of {destination_table.num_rows} rows.")
        return "CSV upload successful."
    except Exception as e:
        print(f"❌ An error occurred during the CSV upload: {e}")
        return f"Error during CSV upload: {e}"


if __name__ == "__main__":
    csv_file_to_upload = "farm_analysis_data.csv"
    
    print("--- Running BigQuery CSV Uploader Test ---")
    upload_csv_to_bigquery(csv_file_to_upload)
    print("--- Test complete ---")