File size: 6,529 Bytes
719d0db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import torch.nn as nn
import numpy as np

class CFTourGenerator(nn.Module):
    def __init__(self, cf_solver):
        super().__init__()
        self.solver = cf_solver
        self.problem = cf_solver.problem

    def forward(self, factual_tour, vehicle_id, cf_step, cf_next_node_id, node_feats, dist_matrix=None):
        """
        solve an input instance with visited edges fixed

        Parameters
        ----------
        factual_tour: list [seq_length]
        cf_step: int
        cf_next_node_id: int
        node_feats: 

        Returns
        -------
        cf_tour: np.array [seq_length]
        """
        fixed_paths = self.get_fixed_paths(factual_tour, vehicle_id, cf_step, cf_next_node_id)
        cf_tours = self.solver.solve(node_feats, fixed_paths, dist_matrix=dist_matrix)
        if cf_tours is None:
            return
        if (cf_step > 0):
            for vehicle_id, cf_tour in enumerate(cf_tours):
                if cf_next_node_id in cf_tour:
                    if cf_step == 1:
                        if cf_tour[1] != cf_next_node_id:
                            cf_tours[vehicle_id] = np.flipud(cf_tour)
                        break
                    else:
                        if (factual_tour[vehicle_id][1] != cf_tour[1]):
                            cf_tours[vehicle_id] = np.flipud(cf_tour) # make direction of the cf tour the same as factual one
                        break
        print("aaaa", cf_tours)
        return cf_tours

    def get_fixed_paths(self, factual_tour, vehicle_id, cf_step, cf_next_node_id):
        visited_paths = np.append(factual_tour[vehicle_id][:cf_step], cf_next_node_id)
        return visited_paths

    # def get_avail_edges(self, factual_tour, cf_step, cf_next_node_id):
    #     visited_paths = np.append(factual_tour[:cf_step], cf_next_node_id)
    #     avail_edges = []
    #     # add fixed edges
    #     for i in range(len(visited_paths) - 1):
    #         avail_edges.append([visited_paths[i], visited_paths[i + 1]])
    #     print(avail_edges)

    #     # add rest avaialbel edges
    #     num_nodes = np.max(factual_tour) + 1
    #     visited = np.array([0] * num_nodes)
    #     for id in visited_paths:
    #         visited[id] = 1
    #     visited[factual_tour[0]] = 0
    #     visited[cf_next_node_id] = 0
    #     mask = visited < 1
    #     node_id = np.arange(num_nodes)
    #     feasible_node_id = node_id[mask]
    #     for j in range(len(feasible_node_id) - 1):
    #         for i in range(j + 1, len(feasible_node_id)):
    #             if ((feasible_node_id[j] == factual_tour[0]) and (feasible_node_id[i] == cf_next_node_id)) or ((feasible_node_id[i] == factual_tour[0]) and (feasible_node_id[j] == cf_next_node_id)):
    #                 continue
    #             avail_edges.append([feasible_node_id[j], feasible_node_id[i]])
    #     return np.array(avail_edges)

#-----------
# unit test
#-----------
if __name__ == "__main__":
    import argparse
    import random
    import matplotlib.pyplot as plt
    # FYI: 
    # - https://yu-nix.com/archives/python-path-get/
    # - https://www.delftstack.com/ja/howto/python/python-get-parent-directory/
    # - https://stackoverflow.com/questions/2817264/how-to-get-the-parent-dir-location
    import os
    import sys
    CURR_DIR = os.path.dirname(os.path.abspath(__file__))
    PARENT_DIR = os.path.abspath(os.path.join(CURR_DIR, os.pardir))
    sys.path.append(PARENT_DIR)
    from utils.util_vis import visualize_factual_and_cf_tours
    from lkh.lkh import LKH
    from models.ortools.ortools import ORTools
    from data_generator.tsptw.tsptw_dataset import generate_tsptw_instance

    parser = argparse.ArgumentParser(description='')
    # general settings
    parser.add_argument("--problem", type=str, default="tsptw")
    parser.add_argument("--random_seed", type=int, default=1234)
    parser.add_argument("--num_samples", type=int, default=5)
    parser.add_argument("--num_nodes", type=int, default=100)
    parser.add_argument("--coord_dim", type=int, default=2)
    # LKH settings
    parser.add_argument("--max_trials", type=int, default=1000)
    parser.add_argument("--lkh_dir", type=str, default="lkh", help="Path to the binary of LKH")
    parser.add_argument("--io_dir", type=str, default="lkh_io_files")
    args = parser.parse_args()

    # models
    # cf_solver = LKH(args.problem, args.max_trials, args.random_seed, lkh_dir=args.lkh_dir, io_dir=args.io_dir)
    cf_solver = ORTools(args.problem)
    cf_generator = CFTourGenerator(cf_solver)

    # dataset
    if args.problem == "tsp":
        np.random.seed(args.random_seed)
        node_feats = np.random.uniform(size=[args.num_samples, args.num_nodes, args.coord_dim])
    elif args.problem == "tsptw":
        coords, time_window, grid_size = generate_tsptw_instance(num_nodes=args.num_nodes, grid_size=100, max_tw_gap=10, max_tw_size=1000, is_integer_instance=True, da_silva_style=True)
        node_feats = np.concatenate([coords, time_window], -1)
        node_feats = node_feats[None, :, :]

    # function ot automatically generate couterfactual visit 
    def get_random_cf_visit(factual_tour, random_seed=1234):
        # random.seed(random_seed)
        num_nodes = np.max(factual_tour) + 1
        step = random.randrange(len(factual_tour) - 2) # remove the last step (returning to the start-point)
        visited = np.array([0] * num_nodes)
        for i in range(step+1):
            visited[factual_tour[i]] = 1
        mask = visited < 1
        node_id = np.arange(num_nodes)
        feasible_node_id = node_id[mask]
        cf_next_id = random.choice(feasible_node_id) # select counterfactual
        return step, cf_next_id

    for i in range(len(node_feats)):
        # obtain a factual tour
        factual_tour = cf_solver.solve(node_feats[i])

        # counterfactual visit
        cf_step, cf_next_node_id = get_random_cf_visit(factual_tour, random_seed=args.random_seed)

        print(cf_step, cf_next_node_id)
        # obtain a counterfactual tour
        cf_tour = cf_generator(factual_tour, cf_step, cf_next_node_id, node_feats[i])

        print(factual_tour)
        print(cf_tour)

        # visualize the factual and counterfactual tours
        if args.problem == "tsp":
            coords = node_feats[i]
        elif args.problem == "tsptw":
            coord_dim = 2
            coords = node_feats[i, :, :coord_dim]
        visualize_factual_and_cf_tours(factual_tour, cf_tour, coords, cf_step, f"test{i}.png")
        break