File size: 5,317 Bytes
dbdd71f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45b7f9a
dbdd71f
e530681
dbdd71f
 
 
45b7f9a
 
 
dbdd71f
45b7f9a
 
 
dbdd71f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import pickle
import random
import shutil
from collections import Counter
from pathlib import Path

import numpy

import zipfile


SERVER_URL = "http://localhost:8000/"

INPUT_BROWSER_LIMIT = 550

DATA_DIR = Path("./data")
DEPLOYMENT_DIR = Path("./deployment")
ROOT_DIR = DEPLOYMENT_DIR / "users"

SHARED_BASE_MODULE_DIR = DEPLOYMENT_DIR / "base_modules"
SHARED_SMOOTHER_MODULE_DIR = DEPLOYMENT_DIR / "smoother_module"

KEY_SMOOTHER_MODULE_DIR = "EvaluationKey_Smoother"
KEY_BASE_MODULE_DIR = "EvaluationKey_Base_Modules"
ENCRYPTED_INPUT_DIR = "Encrypt_Input"
ENCRYPTED_OUTPUT_DIR = "Encrypt_Output"
FHE_COMPUTATION_TIMELINE = Path("server_fhe_computation_timeline.txt")

LABELS = ["European", "African", "Americas", "East Asian", "South Asian"]

ID_POPULATION = {0: "European", 3: "African", 2: "Americas", 1: "East Asian", 4: "South Asian"}

POPULATION_ID = {"European": 0, "African": 3, "Americas": 2, "East Asian": 1, "South Asian": 4}

COLORS = ["#FFD208", "#FFE46C", "#FFED9C", "#FFF6CE", "#FFD9A0"]

# load_pickle("data/meta_dict.pkl")
META = {"A": 5, "C": 1059079, "M": 10589, "NW": 100, "CT": 1059, "CTR": 0.1, "WSCM": 0.2, "SS": 75}

BUILD_GENS = [1, 2, 4, 6, 8, 12, 16, 24, 32, 48]

import os 

def load_pickle_from_zip(file_name, zip_path="data.zip"):
    """
    Load a pickle file from within a zip archive.
    """
    if not os.path.exists(zip_path):
        raise FileNotFoundError(f"The zip file '{zip_path}' does not exist.")
    
    with zipfile.ZipFile(zip_path, 'r') as z:
        if file_name not in z.namelist():
            print("-----", file_name, z.namelist())
            raise KeyError(f"The file '{file_name}' does not exist in the zip archive '{zip_path}'.")
        with z.open(file_name) as f:
            return pickle.load(f)

def generate_weighted_percentages():
    dominant_percentage = random.randint(50, 70)
    remaining_percentage = 100 - dominant_percentage
    other_percentages = [random.random() for _ in range(4)]

    total = sum(other_percentages)
    other_percentages = [round(p / total * remaining_percentage, 2) for p in other_percentages]

    percentages = [dominant_percentage] + other_percentages

    # Adjust the total to be exactly 100 (if rounding errors occurred)
    diff = round(100 - sum(percentages), 2)
    if diff != 0:
        percentages[0] += diff  # Adjust the dominant percentage to make the total 100

    return percentages


def select_random_ancestors():
    ancestors = list(ID_POPULATION.keys())
    random.shuffle(ancestors)
    return ancestors


def read_pickle(path):
    with open(path, "rb") as f:
        data = pickle.load(f)
    return data


def compute_distribution(y, size=5):
    y_pred = numpy.zeros(size)
    for k, v in Counter(y).items():
        y_pred[k] = v / len(y)
    return y_pred


def slide_window(data, smooth_win_size, y=None):
    N, W, A = data.shape

    pad = (smooth_win_size + 1) // 2
    data_padded = numpy.pad(data, ((0, 0), (pad, pad), (0, 0)), mode="reflect")
    X_slide = numpy.lib.stride_tricks.sliding_window_view(data_padded, (1, smooth_win_size, A))
    X_slide = X_slide[:, :W, :].reshape(N * W, -1)
    y_slide = None if y is None else y.reshape(N * W)

    return X_slide, y_slide


# def read_vcf(vcf_file):
#     return allel.read_vcf(vcf_file, region=None, fields="*")


def clean_dir(directory):
    """Remove the specified directory if it exists."""
    if directory.exists() and directory.is_dir():
        print(f"Removing existing model directory: {directory}")
        shutil.rmtree(directory)


def process_data_for_base_modules(meta, X_t):

    n_windows = meta["NW"]  # meta["C"] // meta["M"]
    context = meta["CT"]  # int(meta["M"] * meta['CTR'])

    if context != 0.0:
        pad_left = numpy.flip(X_t[:, 0:context], axis=1)
        pad_right = numpy.flip(X_t[:, -context:], axis=1)
        X_t = numpy.concatenate([pad_left, X_t, pad_right], axis=1)

    M_ = meta["M"] + 2 * context
    idx = numpy.arange(0, meta["C"], meta["M"])[:-2]
    X_b = numpy.lib.stride_tricks.sliding_window_view(X_t, M_, axis=1)[:, idx, :]
    rem = meta["C"] - meta["M"] * n_windows

    # print(f"{X_t.shape=} -> {X_b.shape=} | {n_windows=}, {context=}, {M_=}, {rem=}")

    return X_b, n_windows, M_, rem


def extract_model_number(path):
    try:
        return int(path.split("_")[-1])
    except (ValueError, IndexError):
        print(f"Error: Unable to extract model number from path: {path}")
        return None


def is_none(obj) -> bool:
    """
    Check if the object is None.
    Args:
        obj (any): The input to be checked.
    Returns:
        bool: True if the object is None or empty, False otherwise.
    """
    return obj is None or (obj is not None and (hasattr(obj, "__len__") and len(obj) == 0))


def load_pickle(path: str) -> numpy.array:
    """Load data.

    Args:
        path (str):

    Returns:
        Dict: The genome.
    """
    with open(path, "rb") as f:
        data = pickle.load(f)
    return data


def write_pickle(path: str, data) -> numpy.array:
    with open(path, "wb") as f:
        pickle.dump(data, f)


def write_bytes(path, data):
    """Save binary data."""
    with path.open("wb") as f:
        f.write(data)


def read_bytes(path):
    """Load data from a binary file."""
    with path.open("rb") as f:
        return f.read()