File size: 5,140 Bytes
f7ab812
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
from constants import HES


def categorize_title(title: str, patterns: dict) -> str:
    """
    Categorize a title based on a dictionary of patterns.

    Parameters:
    title (str): The title to categorize.
    patterns (dict): A dictionary where the keys are the categories and the values are the patterns to match.

    Returns:
    str: The category of the title."""
    for category, pattern in patterns.items():
        if re.search(pattern, title):
            return category
    return "Uncategorized"  # For rows that don't fit any of the patterns


def get_category(
    df: pd.DataFrame, column: str, categories: list, cat: str
) -> pd.DataFrame:
    """ "
    Get a subset of a DataFrame based on the category of the titles in a column.

    Parameters:
    df (pd.DataFrame): The DataFrame to filter.
    column (str): The column containing the titles.
    categories (list): A list of categories.

    Returns:
    pd.DataFrame: The subset of the DataFrame that matches the category."""

    patterns = {
        categories[0]: r"^(?!.*\b\d{4}\b).*$",  # No 4-digit year anywhere in the title
        categories[1]: r"^\b\d{4}\b$",  # Starts with a 4-digit year and nothing else
        categories[
            2
        ]: r"^\b\d{4}\b.*\bQ[1-4]\b",  # Starts with a year and contains "Q1", "Q2", etc.
        categories[
            3
        ]: r"^\b\d{4}\b.*\b(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\b",  # Starts with a year and contains a month name
    }

    df["category"] = df[column].apply(categorize_title, patterns=patterns)
    result = df[df["category"] == cat]
    result = result.drop(columns=["category"])
    result = result.dropna()
    result.columns = ["date", "value"]
    result["date"] = pd.to_datetime(result["date"], format="%Y %b")
    result["value"] = result["value"].astype(float)
    result = result.reset_index(drop=True)
    return result


def read_cpih(
    file_path: str, medical: bool = True, category: str = "Month"
) -> pd.DataFrame:
    """
    Read the CPIH data from a CSV file and return a DataFrame.

    Parameters:
    file_path (str): The path to the CSV file.
    category (str): The category of the data to extract.

    Returns:
    pd.DataFrame: The CPIH data."""
    return get_category(
        pd.read_csv(file_path), "Title", ["Month", "Year", "Quarter", "Month"], category
    )


def read_hes(
    file_path: str,
):
    """
    Read the HES data from a CSV file and return a DataFrame.

    Parameters:
    file_path (str): The path to the CSV file.

    Returns:
    pd.DataFrame: The HES data."""
    df = pd.read_csv(file_path)
    df["CALENDAR_MONTH_END_DATE"] = df["CALENDAR_MONTH_END_DATE"].str.replace(
        "-", " 20"
    )
    df["CALENDAR_MONTH_END_DATE"] = df["CALENDAR_MONTH_END_DATE"].str.upper()
    df["CALENDAR_MONTH_END_DATE"] = pd.to_datetime(
        df["CALENDAR_MONTH_END_DATE"], format="mixed"
    )
    df = df.dropna(axis=1, how="all")
    df = df.sort_values(by="CALENDAR_MONTH_END_DATE")
    df = df.dropna()
    df = df.reset_index(drop=True)
    df.rename(columns={"CALENDAR_MONTH_END_DATE": "date"}, inplace=True)
    df["date"] = pd.to_datetime(df["date"], format="%Y %b")
    df["date"] = df["date"] + pd.offsets.MonthBegin(-1)
    return df


def get_global_df(
    cpih: pd.DataFrame, cpim: pd.DataFrame, hes: pd.DataFrame
) -> pd.DataFrame:
    """
    Merge the CPIH, CPIM and HES data into a single DataFrame.

    Parameters:
    cpih (pd.DataFrame): The CPIH data.
    cpim (pd.DataFrame): The CPIM data.
    hes (pd.DataFrame): The HES data.

    Returns:
    pd.DataFrame: The merged DataFrame."""
    joined_data = pd.merge(cpih, cpim, on="date", how="inner").merge(
        hes, on="date", how="inner"
    )
    joined_data.rename(
        columns={"value_x": "cpih", "value_y": "cpih_medical"}, inplace=True
    )
    joined_data["year"] = joined_data["date"].dt.year
    joined_data["month"] = joined_data["date"].dt.month
    joined_data.drop(columns=["date"], inplace=True)
    return joined_data


def get_final_df(joined_data: pd.DataFrame) -> pd.DataFrame:
    """
    Create the final DataFrame for training and testing.

    Parameters:
    joined_data (pd.DataFrame): The merged DataFrame.

    Returns:
    pd.DataFrame: The final DataFrame."""
    joined_data["date"] = pd.to_datetime(joined_data[["year", "month"]].assign(day=1))
    final_data = pd.DataFrame(columns=["date"])
    final_data["date"] = joined_data["date"]
    final_data["target"] = joined_data["cpih_medical"]
    final_data["cpim_lag1"] = joined_data["cpih_medical"].shift(1)
    final_data["cpim_lag2"] = joined_data["cpih_medical"].shift(2)
    final_data["cpim_lag3"] = joined_data["cpih_medical"].shift(3)
    final_data["cpih_lag1"] = joined_data["cpih"].shift(1)
    final_data["cpih_lag2"] = joined_data["cpih"].shift(2)
    final_data["cpih_lag3"] = joined_data["cpih"].shift(3)
    final_data[HES] = joined_data[HES].shift(1)
    final_data.dropna(inplace=True)
    final_data.reset_index(drop=True, inplace=True)
    return final_data