-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtimeseriesu.py
More file actions
354 lines (272 loc) · 9.77 KB
/
Copy pathtimeseriesu.py
File metadata and controls
354 lines (272 loc) · 9.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
import numpy as np
from scipy.spatial.transform import Rotation as R
def apply_f_to_ts(src, f):
for k, data in src.__dict__.items():
if isinstance(data, TimeSeries):
src[k] = f(data)
setattr(src, k, src[k])
elif isinstance(data, DataSet):
apply_f_to_ts(data, f)
def trans_data(src, dest, f):
# Ideally, this meta data should not be in the same namespace as the data.
forbidden_keys = ["times", "meta_times", "metadata", "finalized", "t0"]
if not hasattr(src, '__dict__'):
if type(src) is dict:
for k, data in src.items():
dest[k] = f(data)
else:
# What to do
assert False
else:
for k, data in src.__dict__.items():
if k in forbidden_keys:
continue
if isinstance(data, dict):
dest[k] = BasicAttrDict()
trans_data(data, dest[k], f)
else:
dest[k] = f(data)
setattr(dest, k, dest[k])
def f_retimed(ts, newts, **kwargs):
from scipy.interpolate import interp1d
from scipy.spatial.transform import Rotation as R, Slerp
def f(data, ts=ts, newts=newts):
if len(data) and type(data) is R:
# Allow rotation re-timing with nearest if times out of order
if 'kind' in kwargs and kwargs['kind'] == 'nearest':
quat = data.as_quat()
quat_retimed = interp1d(ts, quat, axis=0, **kwargs)(newts)
return R.from_quat(quat_retimed)
return Slerp(ts, data)(newts)
elif len(data) and type(data[0]) == type(R.identity()):
# Keep this in case we are using lists.
rots = R.from_matrix([rot.as_matrix() for rot in data])
return Slerp(ts, rots)(newts)
elif len(data) and type(data[0]) in [str, np.str_, bytes, np.bytes_]:
print(f"WARNING: interpolating with non numericals not well supported (copying first: {data[0]})")
# I wish interp1d with kind='nearest' works here.
return np.array([data[0]] * len(newts))
return interp1d(ts, data, axis=0, **kwargs)(newts)
return f
def f_masked(mask):
def f(data, mask=mask):
# Only to deal with numpy Rotation bug
if isinstance(data, list):
if len(data) == len(mask):
new_list = []
for i, x in enumerate(data):
if mask[i]:
new_list.append(x)
return new_list
return data
# For any auxiliary vars that may have been added...
if isinstance(data, (str, int, float, np.generic)):
return data
return data[mask]
return f
def retimed_copy(src, dest, oldts, newts, **kwargs):
return trans_data(src, dest, f_retimed(oldts, newts, **kwargs))
def masked_copy(src, dest, mask):
return trans_data(src, dest, f_masked(mask))
class BasicAttrDict(dict):
pass
class DataSet(dict):
def __setattr__(self, k, v):
self[k] = v
object.__setattr__(self, k, v)
def _item_map(self, f):
ret = DataSet()
for k, v in self.items():
ret[k] = f(v)
setattr(ret, k, ret[k])
return ret
def add_point(self, key, delim='/', ts_metadata=None, **data):
key = key.strip(delim)
del_ind = key.find(delim)
if del_ind != -1:
fkey = key[:del_ind]
if fkey not in self:
self[fkey] = DataSet()
setattr(self, fkey, self[fkey])
self[fkey].add_point(key[del_ind + 1:], delim=delim, ts_metadata=ts_metadata, **data)
else:
if key not in self:
self[key] = TimeSeries(metadata=ts_metadata)
setattr(self, key, self[key])
self[key].add_point(**data)
def method_map(self, method_name, *args):
return self._item_map(lambda obj, args=args: getattr(obj, method_name)(*args))
def get_view(self, start_time, end_time):
return self.method_map('get_view', start_time, end_time)
def get_after(self, start_time):
return self.method_map('get_after', start_time)
def get_before(self, end_time):
return self.method_map('get_before', end_time)
def get_multiview(self, mask):
return self.method_map('get_multiview', mask)
def finalize(self):
[v.finalize() for v in self.values()]
class TimeSeries(dict):
def __init__(self, metadata=None):
self.times = []
self.meta_times = []
self.metadata = metadata
self.finalized = False
def __len__(self):
return len(self.times)
def _build_dict(self, d, obj, ind):
for name, val in d.items():
if isinstance(val, dict):
obj[name] = BasicAttrDict()
self._build_dict(val, obj[name], ind)
elif isinstance(val, np.ndarray) or isinstance(val, list) or type(val) is R:
obj[name] = val[ind]
else:
print("Unhandled value:", val, ". Please fix me.")
assert False
setattr(obj, name, obj[name])
def point_iter(self):
assert self.finalized
for i in range(len(self.times)):
obj = BasicAttrDict()
self._build_dict(self, obj, i)
obj.t = self.times[i]
if self.meta_times is not None and len(self.meta_times):
obj.meta_t = self.meta_times[i]
yield obj
def sub_add(self, d, **kwargs):
for name, val in kwargs.items():
first = name not in d
if isinstance(val, dict):
if first:
d[name] = BasicAttrDict()
self.sub_add(d[name], **val)
else:
if first:
d[name] = []
d[name].append(val)
if first:
setattr(d, name, d[name])
def add_point(self, time, meta_time=None, **kwargs):
assert not self.finalized
self.times.append(time)
if meta_time is not None:
self.meta_times.append(meta_time)
self.sub_add(self, **kwargs)
def _finalize(self, name, vals, d):
# Special case for Rotation objects
if vals and type(vals[0]) is R and hasattr(R, 'concatenate'):
d[name] = R.concatenate(vals)
else:
try:
test = np.array(vals)
except ValueError:
test = np.array(vals, dtype=object)
# Deal with scipy Rotation object bug
if len(test.shape) == 32:
d[name] = vals
else:
d[name] = test
setattr(d, name, d[name])
def finalize(self):
self.times = np.array(self.times)
self.meta_times = np.array(self.meta_times)
self.apply_f(self._finalize, self)
if len(self.times):
self.t0 = self.times[0]
self.finalized = True
def apply_f(self, f, d, *args, **kwargs):
for name, vals in d.items():
if isinstance(vals, dict):
self.apply_f(f, vals, *args, **kwargs)
else:
f(name, vals, d, *args, **kwargs)
def _delete_inds(self, name, vals, d, inds):
""" inds is a boolean mask """
# Only to deal with numpy Rotation bug
if isinstance(vals, list):
d[name] = [val for i, val in enumerate(vals) if not inds[i]]
elif type(vals) is R:
d[name] = d[name][np.logical_not(inds)]
else:
d[name] = np.delete(d[name], inds, axis=0)
setattr(d, name, d[name])
def _reorder_inds(self, name, vals, d, inds):
d[name] = vals[inds]
setattr(d, name, d[name])
def remove_dup_times(self):
assert self.finalized
timedups = np.hstack((np.diff(self.times) == 0, False))
self.times = np.delete(self.times, timedups)
if self.meta_times is not None and len(self.meta_times):
self.meta_times = np.delete(self.meta_times, timedups)
self.apply_f(self._delete_inds, self, timedups)
return timedups.sum()
def order_times(self):
assert self.finalized
if np.all(self.times[:-1] <= self.times[1:]):
return
# Use a sorting algorithm that's fast for almost sorted
sort_inds = np.argsort(self.times, kind='stable')
self.times = self.times[sort_inds]
if self.meta_times is not None and len(self.meta_times):
self.meta_times = self.meta_times[sort_inds]
if hasattr(self, 't0'):
self.t0 = self.times[0]
self.apply_f(self._reorder_inds, self, sort_inds)
def get_masked_view(self, timemask):
assert self.finalized
ret = TimeSeries(metadata=self.metadata)
ret.finalized = True
ret.times = self.times[timemask].copy()
if hasattr(self, 't0'):
if len(ret.times):
ret.t0 = ret.times[0]
else:
ret.t0 = self.t0
if self.meta_times is not None and len(self.meta_times):
ret.meta_times = self.meta_times[timemask].copy()
else:
ret.meta_times = self.meta_times
masked_copy(self, ret, timemask)
return ret
def get_multiview(self, timess):
mask = np.zeros(len(self.times), dtype=bool)
for times in timess:
mask = np.logical_or(mask, np.logical_and(times[0] <= self.times, self.times <= times[1]))
return self.get_masked_view(mask)
def get_view(self, start_time, end_time):
return self.get_multiview([(start_time, end_time)])
def get_after(self, start_time):
return self.get_view(start_time, np.inf)
def get_before(self, end_time):
return self.get_view(-np.inf, end_time)
def get_all(self):
return self.get_view(-np.inf, np.inf)
def retime(self, fieldstr, newts, fill_value=0.0, **kwargs):
from scipy.interpolate import interp1d
fieldlist = fieldstr.split('.')
obj = self
for field in fieldlist:
obj = getattr(obj, field)
return interp1d(self.times, obj, axis=0, bounds_error=False, fill_value=fill_value, **kwargs)(newts)
def retimeall(self, newts, interponly=True, **kwargs):
if interponly:
newts = newts[np.logical_and(newts >= self.times[0], newts <= self.times[-1])]
elif 'fill_value' not in kwargs:
kwargs['fill_value'] = 'extrapolate'
ret = TimeSeries(metadata=self.metadata)
ret.finalized = True
ret.times = newts
if hasattr(self, 't0'):
if len(ret.times):
ret.t0 = ret.times[0]
else:
ret.t0 = self.t0
ret.meta_times = None
retimed_copy(self, ret, self.times, newts, **kwargs)
return ret
def syncwith(self, ts, **kwargs):
timemask = np.logical_and(ts.times >= self.times[0], ts.times <= self.times[-1])
ret = self.retimeall(ts.times[timemask], **kwargs)
return ret, ts.get_masked_view(timemask)