|  | import os | 
					
						
						|  | import gradio as gr | 
					
						
						|  | import pandas as pd | 
					
						
						|  |  | 
					
						
						|  | def get_or_create_env_var(var_name, default_value): | 
					
						
						|  |  | 
					
						
						|  | value = os.environ.get(var_name) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if value is None: | 
					
						
						|  | os.environ[var_name] = default_value | 
					
						
						|  | value = default_value | 
					
						
						|  |  | 
					
						
						|  | return value | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | env_var_name = 'GRADIO_OUTPUT_FOLDER' | 
					
						
						|  | default_value = 'output/' | 
					
						
						|  |  | 
					
						
						|  | output_folder = get_or_create_env_var(env_var_name, default_value) | 
					
						
						|  | print(f'The value of {env_var_name} is {output_folder}') | 
					
						
						|  |  | 
					
						
						|  | def get_file_path_with_extension(file_path): | 
					
						
						|  |  | 
					
						
						|  | basename = os.path.basename(file_path) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return basename | 
					
						
						|  |  | 
					
						
						|  | def get_file_path_end(file_path): | 
					
						
						|  |  | 
					
						
						|  | basename = os.path.basename(file_path) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | filename_without_extension, _ = os.path.splitext(basename) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return filename_without_extension | 
					
						
						|  |  | 
					
						
						|  | def detect_file_type(filename): | 
					
						
						|  | """Detect the file type based on its extension.""" | 
					
						
						|  | if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')): | 
					
						
						|  | return 'csv' | 
					
						
						|  | elif filename.endswith('.xlsx'): | 
					
						
						|  | return 'xlsx' | 
					
						
						|  | elif filename.endswith('.parquet'): | 
					
						
						|  | return 'parquet' | 
					
						
						|  | elif filename.endswith('.pdf'): | 
					
						
						|  | return 'pdf' | 
					
						
						|  | elif filename.endswith('.jpg'): | 
					
						
						|  | return 'jpg' | 
					
						
						|  | elif filename.endswith('.jpeg'): | 
					
						
						|  | return 'jpeg' | 
					
						
						|  | elif filename.endswith('.png'): | 
					
						
						|  | return 'png' | 
					
						
						|  | else: | 
					
						
						|  | raise ValueError("Unsupported file type.") | 
					
						
						|  |  | 
					
						
						|  | def read_file(filename): | 
					
						
						|  | """Read the file based on its detected type.""" | 
					
						
						|  | file_type = detect_file_type(filename) | 
					
						
						|  |  | 
					
						
						|  | if file_type == 'csv': | 
					
						
						|  | return pd.read_csv(filename, low_memory=False) | 
					
						
						|  | elif file_type == 'xlsx': | 
					
						
						|  | return pd.read_excel(filename) | 
					
						
						|  | elif file_type == 'parquet': | 
					
						
						|  | return pd.read_parquet(filename) | 
					
						
						|  |  | 
					
						
						|  | def ensure_output_folder_exists(): | 
					
						
						|  | """Checks if the 'output/' folder exists, creates it if not.""" | 
					
						
						|  |  | 
					
						
						|  | folder_name = "output/" | 
					
						
						|  |  | 
					
						
						|  | if not os.path.exists(folder_name): | 
					
						
						|  |  | 
					
						
						|  | os.makedirs(folder_name) | 
					
						
						|  | print(f"Created the 'output/' folder.") | 
					
						
						|  | else: | 
					
						
						|  | print(f"The 'output/' folder already exists.") | 
					
						
						|  |  | 
					
						
						|  | def put_columns_in_df(in_file): | 
					
						
						|  | new_choices = [] | 
					
						
						|  | concat_choices = [] | 
					
						
						|  | all_sheet_names = [] | 
					
						
						|  | number_of_excel_files = 0 | 
					
						
						|  |  | 
					
						
						|  | for file in in_file: | 
					
						
						|  | file_name = file.name | 
					
						
						|  | file_type = detect_file_type(file_name) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | file_end = get_file_path_with_extension(file_name) | 
					
						
						|  |  | 
					
						
						|  | if file_type == 'xlsx': | 
					
						
						|  | number_of_excel_files += 1 | 
					
						
						|  | new_choices = [] | 
					
						
						|  | print("Running through all xlsx sheets") | 
					
						
						|  | anon_xlsx = pd.ExcelFile(file_name) | 
					
						
						|  | new_sheet_names = anon_xlsx.sheet_names | 
					
						
						|  |  | 
					
						
						|  | for sheet_name in new_sheet_names: | 
					
						
						|  |  | 
					
						
						|  | df = pd.read_excel(file_name, sheet_name=sheet_name) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print(f"Sheet Name: {sheet_name}") | 
					
						
						|  | print(df.head()) | 
					
						
						|  |  | 
					
						
						|  | new_choices.extend(list(df.columns)) | 
					
						
						|  |  | 
					
						
						|  | all_sheet_names.extend(new_sheet_names) | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | df = read_file(file_name) | 
					
						
						|  | new_choices = list(df.columns) | 
					
						
						|  |  | 
					
						
						|  | concat_choices.extend(new_choices) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | concat_choices = list(set(concat_choices)) | 
					
						
						|  |  | 
					
						
						|  | if number_of_excel_files > 0: | 
					
						
						|  | return gr.Dropdown(choices=concat_choices, value=concat_choices[0]), gr.Dropdown(choices=all_sheet_names, value=all_sheet_names[0], visible=True), file_end | 
					
						
						|  | else: | 
					
						
						|  | return gr.Dropdown(choices=concat_choices, value=concat_choices[0]), gr.Dropdown(visible=False), file_end | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def add_folder_to_path(folder_path: str): | 
					
						
						|  | ''' | 
					
						
						|  | Check if a folder exists on your system. If so, get the absolute path and then add it to the system Path variable if it doesn't already exist. | 
					
						
						|  | ''' | 
					
						
						|  |  | 
					
						
						|  | if os.path.exists(folder_path) and os.path.isdir(folder_path): | 
					
						
						|  | print(folder_path, "folder exists.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | absolute_path = os.path.abspath(folder_path) | 
					
						
						|  |  | 
					
						
						|  | current_path = os.environ['PATH'] | 
					
						
						|  | if absolute_path not in current_path.split(os.pathsep): | 
					
						
						|  | full_path_extension = absolute_path + os.pathsep + current_path | 
					
						
						|  | os.environ['PATH'] = full_path_extension | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | print(f"Directory {folder_path} already exists in PATH.") | 
					
						
						|  | else: | 
					
						
						|  | print(f"Folder not found at {folder_path} - not added to PATH") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def reveal_feedback_buttons(): | 
					
						
						|  | return gr.Radio(visible=True), gr.Textbox(visible=True), gr.Button(visible=True), gr.Markdown(visible=True) | 
					
						
						|  |  | 
					
						
						|  | def wipe_logs(feedback_logs_loc, usage_logs_loc): | 
					
						
						|  | try: | 
					
						
						|  | os.remove(feedback_logs_loc) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print("Could not remove feedback logs file", e) | 
					
						
						|  | try: | 
					
						
						|  | os.remove(usage_logs_loc) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print("Could not remove usage logs file", e) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | async def get_connection_params(request: gr.Request): | 
					
						
						|  | base_folder = "" | 
					
						
						|  |  | 
					
						
						|  | if request: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print("Session hash:", request.session_hash) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | CUSTOM_CLOUDFRONT_HEADER_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER', '') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | CUSTOM_CLOUDFRONT_HEADER_VALUE_var = get_or_create_env_var('CUSTOM_CLOUDFRONT_HEADER_VALUE', '') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if CUSTOM_CLOUDFRONT_HEADER_var and CUSTOM_CLOUDFRONT_HEADER_VALUE_var: | 
					
						
						|  | if CUSTOM_CLOUDFRONT_HEADER_var in request.headers: | 
					
						
						|  | supplied_cloudfront_custom_value = request.headers[CUSTOM_CLOUDFRONT_HEADER_var] | 
					
						
						|  | if supplied_cloudfront_custom_value == CUSTOM_CLOUDFRONT_HEADER_VALUE_var: | 
					
						
						|  | print("Custom Cloudfront header found:", supplied_cloudfront_custom_value) | 
					
						
						|  | else: | 
					
						
						|  | raise(ValueError, "Custom Cloudfront header value does not match expected value.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if request.username: | 
					
						
						|  | out_session_hash = request.username | 
					
						
						|  | base_folder = "user-files/" | 
					
						
						|  | print("Request username found:", out_session_hash) | 
					
						
						|  |  | 
					
						
						|  | elif 'x-cognito-id' in request.headers: | 
					
						
						|  | out_session_hash = request.headers['x-cognito-id'] | 
					
						
						|  | base_folder = "user-files/" | 
					
						
						|  | print("Cognito ID found:", out_session_hash) | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | out_session_hash = request.session_hash | 
					
						
						|  | base_folder = "temp-files/" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | output_folder = base_folder + out_session_hash + "/" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return out_session_hash, output_folder, out_session_hash | 
					
						
						|  | else: | 
					
						
						|  | print("No session parameters found.") | 
					
						
						|  | return "","" |