-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathpoint_mass.py
More file actions
357 lines (295 loc) · 13.1 KB
/
point_mass.py
File metadata and controls
357 lines (295 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# Copyright 2019 DeepMind Technologies Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A simple point-mass environment in N-dimensions."""
from typing import Optional
import gym
from imitation.envs import resettable_env
from imitation.policies import serialize as policy_serialize
from imitation.util import registry, serialize
import numpy as np
from stable_baselines.common import policies
import tensorflow as tf
from evaluating_rewards import serialize as reward_serialize
from evaluating_rewards.rewards import base
class PointMassEnv(resettable_env.ResettableEnv):
"""A simple point-mass environment."""
def __init__(
self,
ndim: int = 2,
dt: float = 1e-1,
ctrl_coef: float = 1.0,
threshold: float = -1,
var: float = 1.0,
):
"""Builds a PointMass environment.
Args:
ndim: Number of dimensions.
dt: Size of timestep.
ctrl_coef: Weight for control cost.
threshold: Distance to goal within which episode terminates.
(Set negative to disable episode termination.)
var: Standard deviation of components of initial state distribution.
"""
super().__init__()
self.ndim = ndim
self.dt = dt
self.ctrl_coef = ctrl_coef
self.threshold = threshold
self.var = var
substate_space = gym.spaces.Box(-np.inf, np.inf, shape=(ndim,))
subspaces = {k: substate_space for k in ["pos", "vel", "goal"]}
self._state_space = gym.spaces.Dict(spaces=subspaces)
self._observation_space = gym.spaces.Box(-np.inf, np.inf, shape=(3 * ndim,))
self._action_space = gym.spaces.Box(-1, 1, shape=(ndim,))
self.viewer = None
self._agent_transform = None
self._goal_transform = None
def initial_state(self):
"""Choose initial state randomly from region at least 1-step from goal."""
while True:
pos = self.rand_state.randn(self.ndim) * np.sqrt(self.var)
vel = self.rand_state.randn(self.ndim) * np.sqrt(self.var)
goal = self.rand_state.randn(self.ndim) * np.sqrt(self.var)
dist = np.linalg.norm(pos - goal)
min_dist_next = dist - self.dt * np.linalg.norm(vel)
if min_dist_next > self.threshold:
break
return {"pos": pos, "vel": vel, "goal": goal}
def transition(self, state, action):
action = np.array(action)
action = action.clip(-1, 1)
return {
"pos": state["pos"] + self.dt * state["vel"],
"vel": state["vel"] + self.dt * action,
"goal": state["goal"],
}
def reward(self, state, action, new_state):
del state
dist = np.linalg.norm(new_state["pos"] - new_state["goal"])
ctrl_penalty = np.dot(action, action)
return -dist - self.ctrl_coef * ctrl_penalty
def terminal(self, state, step: int) -> bool:
"""Terminate if agent within threshold of goal.
Set threshold to be negative to disable early termination, making environment
fixed horizon.
"""
dist = np.linalg.norm(state["pos"] - state["goal"])
return bool(dist < self.threshold)
def obs_from_state(self, state):
obs = np.concatenate([state["pos"], state["vel"], state["goal"]], axis=-1)
return obs.astype(np.float32)
def state_from_obs(self, obs):
return {
"pos": obs[..., 0 : self.ndim],
"vel": obs[..., self.ndim : 2 * self.ndim],
"goal": obs[..., 2 * self.ndim : 3 * self.ndim],
}
def render(self, mode="human"):
if self.viewer is None:
from gym.envs.classic_control import rendering # pylint:disable=import-outside-toplevel
self.viewer = rendering.Viewer(500, 500)
self.viewer.set_bounds(-5, 5, -5, 5)
def make_circle(**kwargs):
obj = rendering.make_circle(**kwargs)
transform = rendering.Transform()
obj.add_attr(transform)
self.viewer.add_geom(obj)
return obj, transform
goal, self._goal_transform = make_circle(radius=0.2)
goal.set_color(1.0, 0.85, 0.0) # golden
_, self._agent_transform = make_circle(radius=0.1)
def project(arr):
if self.ndim == 1:
assert len(arr) == 1
return arr[0], 0
elif self.ndim == 2:
assert len(arr) == 2
return tuple(arr)
else:
raise ValueError()
self._goal_transform.set_translation(*project(self.cur_state["goal"]))
self._agent_transform.set_translation(*project(self.cur_state["pos"]))
return self.viewer.render(return_rgb_array=(mode == "rgb_array"))
def close(self):
if self.viewer:
self.viewer.close()
self.viewer = None
class PointMassGroundTruth(base.BasicRewardModel, serialize.LayersSerializable):
"""RewardModel representing the true (dense) reward in PointMass."""
def __init__(
self, observation_space: gym.Space, action_space: gym.Space, ctrl_coef: float = 1.0
):
serialize.LayersSerializable.__init__(**locals(), layers={})
self.ndim, remainder = divmod(observation_space.shape[0], 3)
assert remainder == 0
self.ctrl_coef = ctrl_coef
base.BasicRewardModel.__init__(self, observation_space, action_space)
self._reward = self.build_reward()
def build_reward(self):
"""Computes reward from observation and action in PointMass environment."""
pos = self._proc_next_obs[:, 0 : self.ndim]
goal = self._proc_next_obs[:, 2 * self.ndim : 3 * self.ndim]
dist = tf.norm(pos - goal, axis=-1)
ctrl_cost = tf.reduce_sum(tf.square(self._proc_act), axis=-1)
return -dist - self.ctrl_coef * ctrl_cost
@property
def reward(self):
"""Reward tensor."""
return self._reward
class PointMassSparseReward(base.BasicRewardModel, serialize.LayersSerializable):
"""A sparse reward for the point mass being close to the goal.
Should produce similar behavior to PointMassGroundTruth. However, it is not
equivalent up to potential shaping.
"""
def __init__(
self,
observation_space: gym.Space,
action_space: gym.Space,
ctrl_coef: float = 1.0,
threshold: float = 0.05,
goal_offset: Optional[np.ndarray] = None,
):
"""Constructs a PointMassSparseReward instance.
Args:
observation_space: Observation space of environment.
action_space: Action of environment.
ctrl_coef: The multiplier for the quadratic control penalty.
threshold: How near the point mass must be to the goal to receive reward.
goal_offset: If specified, shifts the goal in the direction specified.
The larger this is, the more dissimilar the reward model and resulting
policy will be from PointMassGroundTruth.
"""
serialize.LayersSerializable.__init__(**locals(), layers={})
self.ndim, remainder = divmod(observation_space.shape[0], 3)
assert remainder == 0
self.ctrl_coef = ctrl_coef
self.threshold = threshold
self.goal_offset = goal_offset
base.BasicRewardModel.__init__(self, observation_space, action_space)
self._reward = self.build_reward()
def build_reward(self):
"""Computes reward from observation and action in PointMass environment."""
pos = self._proc_obs[:, 0 : self.ndim]
goal = self._proc_obs[:, 2 * self.ndim : 3 * self.ndim]
if self.goal_offset is not None:
goal += self.goal_offset[np.newaxis, :]
dist = tf.norm(pos - goal, axis=-1)
goal_reward = tf.to_float(dist < self.threshold)
ctrl_cost = tf.reduce_sum(tf.square(self._proc_act), axis=-1)
return goal_reward - self.ctrl_coef * ctrl_cost
@property
def reward(self):
"""Reward tensor."""
return self._reward
def _point_mass_dist(obs: tf.Tensor, ndim: int) -> tf.Tensor:
pos = obs[:, 0:ndim]
goal = obs[:, 2 * ndim : 3 * ndim]
return tf.norm(pos - goal, axis=-1)
# pylint false positive: thinks `reward` is missing, but defined in `rewards.PotentialShaping`
class PointMassShaping(
base.PotentialShaping, base.BasicRewardModel, serialize.LayersSerializable
): # pylint:disable=abstract-method
"""Potential shaping term, based on distance to goal."""
def __init__(
self,
observation_space: gym.Space,
action_space: gym.Space,
discount: float = 1.0,
):
"""Builds PointMassShaping.
Args:
observation_space: The observation space.
action_space: The action space.
discount: The initial discount rate to use.
"""
params = dict(locals())
base.BasicRewardModel.__init__(self, observation_space, action_space)
self.ndim, remainder = divmod(observation_space.shape[0], 3)
assert remainder == 0
old_potential = -_point_mass_dist(self._proc_obs, self.ndim)
new_potential = -_point_mass_dist(self._proc_next_obs, self.ndim)
end_potential = tf.constant(0.0)
base.PotentialShaping.__init__(
self, old_potential, new_potential, end_potential, self._proc_dones, discount
)
self.set_discount(discount) # set it so no need for TF initializer to be called
serialize.LayersSerializable.__init__(**params, layers={"discount": self._discount})
class PointMassDenseReward(base.LinearCombinationModelWrapper):
"""Sparse reward plus potential shaping."""
def __init__(
self, observation_space: gym.Space, action_space: gym.Space, discount: float = 1.0, **kwargs
):
sparse = PointMassSparseReward(observation_space, action_space, **kwargs)
# pylint thinks PointMassShaping is abstract but it's concrete.
shaping = PointMassShaping( # pylint:disable=abstract-class-instantiated
observation_space, action_space, discount
)
models = {"sparse": (sparse, tf.constant(1.0)), "shaping": (shaping, tf.constant(10.0))}
super().__init__(models)
class PointMassPolicy(policies.BasePolicy):
"""Hard-coded policy that accelerates towards goal."""
def __init__(
self, observation_space: gym.Space, action_space: gym.Space, magnitude: float = 1.0
):
super().__init__(
sess=None,
ob_space=observation_space,
ac_space=action_space,
n_env=1,
n_steps=1,
n_batch=1,
)
self.ndim, remainder = divmod(observation_space.shape[0], 3)
assert remainder == 0
self.magnitude = magnitude
def step(self, obs, state=None, mask=None, deterministic=False):
del deterministic
pos = obs[:, 0 : self.ndim]
vel = obs[:, self.ndim : 2 * self.ndim]
goal = obs[:, 2 * self.ndim : 3 * self.ndim]
target_vel = goal - pos
target_vel = target_vel / np.linalg.norm(target_vel, axis=1).reshape(-1, 1)
delta_vel = target_vel - vel
delta_vel_norm = np.linalg.norm(delta_vel, ord=np.inf, axis=1).reshape(-1, 1)
act = delta_vel / np.maximum(delta_vel_norm, 1e-4)
act = act.clip(-1, 1)
return act, None, None, None
def proba_step(self, obs, state=None, mask=None):
raise NotImplementedError()
# Register custom policies with imitation
policy_serialize.policy_registry.register(
key="evaluating_rewards/PointMassHardcoded-v0",
value=registry.build_loader_fn_require_space(registry.dummy_context(PointMassPolicy)),
)
# Register custom rewards with evaluating_rewards
reward_serialize.reward_registry.register(
key="evaluating_rewards/PointMassGroundTruth-v0",
value=registry.build_loader_fn_require_space(PointMassGroundTruth),
)
reward_serialize.reward_registry.register(
key="evaluating_rewards/PointMassSparseWithCtrl-v0",
value=registry.build_loader_fn_require_space(PointMassSparseReward),
)
reward_serialize.reward_registry.register(
key="evaluating_rewards/PointMassSparseNoCtrl-v0",
value=registry.build_loader_fn_require_space(PointMassSparseReward, ctrl_coef=0.0),
)
reward_serialize.reward_registry.register(
key="evaluating_rewards/PointMassDenseWithCtrl-v0",
value=registry.build_loader_fn_require_space(PointMassDenseReward),
)
reward_serialize.reward_registry.register(
key="evaluating_rewards/PointMassDenseNoCtrl-v0",
value=registry.build_loader_fn_require_space(PointMassDenseReward, ctrl_coef=0.0),
)