-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexperiment_result_utils.py
319 lines (278 loc) · 9.71 KB
/
experiment_result_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
import math
import mlflow
from mlflow.tracking import MlflowClient
import optuna
import os
import torch
from policy_utils import *
from constants import *
def get_or_create_experiment(experiment_name):
if experiment := mlflow.get_experiment_by_name(experiment_name):
return experiment.experiment_id
else:
return mlflow.create_experiment(experiment_name)
def save(path, dict):
# Ensure the parent directory exists
parent_dir = os.path.dirname(path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
with open(path, "wb") as f: # Use "wb" mode for writing binary files
torch.save(dict, f)
print(f"Dictionary saved at {path}")
def load(path):
dict = torch.load(path)
print(f"Dictionary loaded from {path}")
return dict
def save_to_mlflow(dict, name="Dict"):
temp_path = "./results.pth"
save(temp_path, dict)
mlflow.log_artifact(temp_path)
os.remove(temp_path)
print("Dictionary saved to MLflow and local file removed")
def load_from_mlflow(artifact_uri):
# Download the artifact from MLflow
local_path = mlflow.artifacts.download_artifacts(artifact_uri=artifact_uri)
# Load the artifact into memory
dict = load(local_path)
print("Dictionary loaded from MLflow")
return dict
def get_run_id_by_name(experiment_id, run_name):
client = MlflowClient()
filter_string = f"tags.`mlflow.runName` = '{run_name}'"
runs = client.search_runs(experiment_ids=[experiment_id], filter_string=filter_string)
if not runs:
raise ValueError(f"No run found with name: {run_name}")
return runs[0].info.run_id
def get_nested_runs(experiment_id, run_name=None):
client = MlflowClient()
runs = client.search_runs(experiment_ids=[experiment_id])
if run_name is None:
nested_runs = [run for run in runs if run.data.tags.get('mlflow.parentRunId') is not None]
else:
nested_runs = [run for run in runs if run_name in run.data.tags.get('mlflow.runName') and run.data.tags.get('mlflow.parentRunId') is not None]
return nested_runs
def get_nested_artifacts(experiment_id, run_name=None):
nested_runs = get_nested_runs(experiment_id, run_name=run_name)
client = MlflowClient()
artifacts = []
for run in nested_runs:
base_uri = run.info.artifact_uri
if len(client.list_artifacts(run.info.run_id)) == 0:
continue
resource_uri = client.list_artifacts(run.info.run_id)[0].path
artifact_uri = base_uri+"/"+resource_uri
result = load_from_mlflow(artifact_uri)
artifacts.append(result)
return artifacts
def get_parent_runs(experiment_id):
client = MlflowClient()
runs = client.search_runs(experiment_ids=[experiment_id])
parent_runs = [run for run in runs if run.data.tags.get('mlflow.parentRunId') is None]
return parent_runs
def get_parent_artifacts(experiment_id):
parent_runs = get_parent_runs(experiment_id)
client = MlflowClient()
artifacts = []
print(parent_runs)
for run in parent_runs:
base_uri = run.info.artifact_uri
print(client.list_artifacts(run.info.run_id))
list_artifacts = [artifact.path for artifact in client.list_artifacts(run.info.run_id) if artifact.path.endswith(".pth")]
if len(list_artifacts) > 0:
resource_uri = list_artifacts[0]
artifact_uri = base_uri+"/"+resource_uri
result = load_from_mlflow(artifact_uri)
artifacts.append(result)
return artifacts
"""
Test policies on the original problem
Args:
- tmdp (TMDP): the teleporting MDP
- thetas (ndarray): collection of parameter vectors associated to the policies
- episodes (int): number of episodes to run
- temp (float): temperature value
return (ndarray): the average reward collected over trajectories for each policy
"""
def test_policies(tmdp:TMDP, thetas, episodes=100, temp=1e-5, deterministic=True):
returns = []
tau = tmdp.tau
for theta in thetas:
pi = get_softmax_policy(theta, temperature=temp)
if deterministic:
pi = get_policy(pi)
tmdp.reset()
tmdp.update_tau(0.)
episode = 0
cum_r = 0
traj_count = 0
while episode < episodes:
s = tmdp.env.s
a = select_action(pi[s])
s_prime, reward, flags, prob = tmdp.step(a)
cum_r += reward
if flags["done"]:
break
tmdp.reset()
traj_count += 1
episode += 1
cum_r = cum_r/traj_count if traj_count > 0 else cum_r
returns.append(cum_r)
tmdp.update_tau(tau)
return returns
def test_policies_len(tmdp:TMDP, thetas, episodes=100, temp=1e-5, deterministic=True, mu=None):
returns = []
episode_lengths = []
tau = tmdp.tau
mu_train = tmdp.env.mu
# Set original mu if mu is not None
if mu is not None:
tmdp.env.mu = mu
for theta in thetas:
pi = get_softmax_policy(theta, temperature=temp)
if deterministic:
pi = get_policy(pi)
tmdp.reset()
tmdp.update_tau(0.)
episode = 0
cum_r = 0
done = False
while episode < episodes and not done:
s = tmdp.env.s
a = select_action(pi[s])
s_prime, reward, flags, prob = tmdp.step(a)
cum_r += reward
if flags["done"]:
done = True
tmdp.reset()
episode += 1
returns.append(cum_r)
episode_lengths.append(episode)
tmdp.update_tau(tau)
tmdp.env.mu = mu_train
return returns, episode_lengths
def test_Q_policies(tmdp:TMDP, Qs, episodes=100):
returns = []
tau = tmdp.tau
for Q in Qs:
pi = get_policy(Q)
tmdp.reset()
tmdp.update_tau(0.)
episode = 0
cum_r = 0
traj_count = 0
while episode < episodes:
s = tmdp.env.s
a = select_action(pi[s])
s_prime, reward, flags, prob = tmdp.step(a)
cum_r += reward
if flags["done"]:
tmdp.reset()
traj_count += 1
break
episode += 1
cum_r = cum_r/traj_count if traj_count > 0 else cum_r
returns.append(cum_r)
tmdp.update_tau(tau)
return returns
def test_Q_policies_len(tmdp:TMDP, Qs, episodes=100, mu=None):
returns = []
episode_lengths = []
tau = tmdp.tau
mu_train = tmdp.env.mu
# Set original mu if mu is not None
if mu is not None:
tmdp.env.mu = mu
for Q in Qs:
pi = get_policy(Q)
tmdp.reset()
tmdp.update_tau(0.)
episode = 0
cum_r = 0
done = False
while episode < episodes and not done:
s = tmdp.env.s
a = select_action(pi[s])
s_prime, reward, flags, prob = tmdp.step(a)
cum_r += reward
if flags["done"]:
done = True
tmdp.reset()
episode += 1
returns.append(cum_r)
episode_lengths.append(episode)
tmdp.update_tau(tau)
tmdp.env.mu = mu_train
return returns, episode_lengths
def get_artifacts_from_experiment(experiment_id):
runs = mlflow.search_runs(experiment_ids=experiment_id)
artifacts = []
for run_id in runs["run_id"]:
artifacts.append(mlflow.get_artifact_uri(run_id=run_id))
return artifacts
def plot_avg_test_return(returns, title, figsize=(10, 8)):
fig = plt.figure(figsize=figsize)
avg_returns = np.average(returns, axis=0)
std_dev = np.std(returns, axis=0)
n_samples = len(returns)
std_err = std_dev / np.sqrt(n_samples)
ci = 1.96
upper_bound = avg_returns + ci * std_err
lower_bound = avg_returns - ci * std_err
plt.plot(avg_returns, label='Average Return', color='r')
plt.fill_between(range(len(avg_returns)), lower_bound, upper_bound, color='r', alpha=0.2, label='95% Confidence Interval')
plt.legend()
plt.title(title)
plt.xlabel('Episode')
plt.ylabel('Avg Return')
plt.show()
return fig
def generate_M_labels(length, x):
assert x >= 2, "Error: x must be >= than 2"
labels = []
for i in range(x):
if i == 0:
labels.append("0")
else:
value = round(length/(x-i)/1_000_000, 2)
labels.append(f"{value}M")
return labels
def generate_uniform_labels(x_min, x_max):
step = 1_000_000
num_labels = int(np.ceil((x_max - x_min) / step)) + 1
labels = [f"{i}M" for i in range(num_labels)]
labels[0] = '0'
return labels
def adjust_y_ticks(ax, y_value, remove_last=False):
y_ticks = ax.get_yticks()
# Find the tick value closest to y_value and remove it
closest_tick = min(y_ticks, key=lambda x: abs(x - y_value))
new_y_ticks = list(y_ticks)
if remove_last:
new_y_ticks.remove(new_y_ticks[-1])
new_y_ticks.remove(closest_tick)
new_y_ticks.append(y_value)
new_y_ticks.sort()
return new_y_ticks
def pad_to_same_length(results):
# Find the maximum length of the lists
max_len = max(len(result) for result in results)
# Pad each list to the maximum length
for result in results:
if len(result) < max_len:
last_element = result[-1]
result.extend([last_element] * (max_len - len(result)))
return results
def river_swim_uniform_curr_xi(tau, nS):
assert tau >= 0 and tau < 1, 'tau must be in [0, 1]'
low = int(np.floor((nS - 1) * tau))
if tau < 0.01:
low = 1
dim = (nS - 1) - low
xi = np.zeros(nS)
if dim > 0:
xi[low:nS-1] = 1.0 / dim
xi[-2] = 1.0 - sum(xi[:-2])
return xi