-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathtag.py
413 lines (364 loc) · 14.9 KB
/
tag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# -*- coding: utf-8 -*-
# Copyright © 2014 - 2022, German Neuroinformatics Node (G-Node)
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted under the terms of the BSD License. See
# LICENSE file in the root of the Project.
import warnings
import numpy as np
from .entity import Entity
from .source_link_container import SourceLinkContainer
from .container import Container, LinkContainer
from .datatype import DataType
from .data_array import DataArray
from .data_view import DataView
from .feature import Feature
from .exceptions import (OutOfBounds, IncompatibleDimensions,
UninitializedEntity, InvalidUnit)
from .dimension_type import DimensionType
from .link_type import LinkType
from . import util
from .section import Section
from .dimensions import SliceMode
class FeatureContainer(Container):
"""
The FeatureContainer has one minor difference from the regular Container:
A Feature can be retrieved by the ID or name of the linked DataArray as
well as the ID of the feature itself.
"""
def __getitem__(self, item):
try:
return Container.__getitem__(self, item)
except KeyError as exc:
# item might be the ID of the referenced data; try it as well
for feat in self:
if feat.data.id == item or feat.data.name == item:
return feat
raise exc
def __contains__(self, item):
if isinstance(item, Feature):
item = item.id
if not Container.__contains__(self, item):
# check if it contains a Feature whose data matches 'item'
for feat in self:
if feat.data.id == item or feat.data.name == item:
return True
return False
return True
class BaseTag(Entity):
"""
Base class for Tag and MultiTag
"""
def __init__(self, nixfile, nixparent, h5group):
super(BaseTag, self).__init__(nixfile, nixparent, h5group)
self._sources = None
self._references = None
self._features = None
@property
def units(self):
"""
Property containing the units of the tag. The tag must provide a
unit for each dimension of the position or extent vector.
This is a read-write property.
:type: list of str
"""
return tuple(u.decode() if isinstance(u, bytes) else u
for u in self._h5group.get_data("units"))
@units.setter
def units(self, units):
if not units:
if self._h5group.has_data("units"):
del self._h5group["units"]
else:
sanitized = []
for unit in units:
util.check_attr_type(unit, str)
unit = util.units.sanitizer(unit)
sanitized.append(unit)
dtype = DataType.String
self._h5group.write_data("units", sanitized, dtype)
if self.file.auto_update_timestamps:
self.force_updated_at()
def create_feature(self, data, link_type):
"""
Create a new feature.
:param data: The data array of this feature.
:type data: nixio.DataArray
:param link_type: The link type of this feature.
:type link_type: nixio.LinkType
:returns: The created feature object.
:rtype: nixio.Feature
"""
if isinstance(link_type, str):
link_type = link_type.lower()
link_type = LinkType(link_type)
features = self._h5group.open_group("features")
feat = Feature.create_new(self.file, self, features, data, link_type)
return feat
def _calc_data_slices(self, data, position, extent, stop_rule):
refslice = list()
if not self.units:
units = [None] * len(data.dimensions)
else:
units = self.units
for idx, dim in enumerate(data.dimensions):
if idx < len(position):
start_pos = position[idx]
start_pos, scaling = self._scale_position(start_pos, units[idx], dim)
if extent is not None and idx < len(extent):
stop_pos = extent[idx]
stop_pos *= scaling
stop_pos += start_pos
slice_mode = stop_rule if extent[idx] > 0.0 else SliceMode.Inclusive
else:
stop_pos = start_pos
slice_mode = SliceMode.Inclusive
range_indices = dim.range_indices(start_pos, stop_pos, slice_mode)
refslice.append(range_indices if range_indices is None else slice(range_indices[0], range_indices[1] + 1))
else: # no position, we take the whole slice for this dimension
start_index = 0
stop_index = data.shape[idx]
refslice.append(slice(start_index, stop_index))
return tuple(refslice)
@staticmethod
def _slices_in_data(data, slices):
if slices is None or not all(slices):
return False
dasize = data.data_extent
stops = tuple(sl.stop for sl in slices)
return np.all(np.less_equal(stops, dasize))
@staticmethod
def _scale_position(pos, unit, dim):
dimtype = dim.dimension_type
if dimtype == DimensionType.Set:
dimunit = None
else:
dimunit = dim.unit
scaling = 1.0
if dimtype == DimensionType.Set:
if unit and unit != "none":
raise IncompatibleDimensions(
"Cannot apply a position with unit to a SetDimension",
"Tag._pos_to_idx"
)
else:
if dimunit is None and unit is not None:
raise IncompatibleDimensions(
"If a unit if given for the position the dimension must not be without a unit!",
"Tag._pos_to_idx"
)
elif dimunit is not None and unit is not None:
try:
scaling = util.units.scaling(unit, dimunit)
except InvalidUnit:
raise IncompatibleDimensions(
"Cannot scale Tag unit {} to match dimension unit {}".format(unit, dimunit),
"Tag._pos_to_idx"
)
return pos * scaling, scaling
@staticmethod
def _pos_to_idx(pos, unit, dim, mode):
dimtype = dim.dimension_type
if dimtype == DimensionType.Set:
dimunit = None
else:
dimunit = dim.unit
scaling = 1.0
if dimtype == DimensionType.Sample:
if not dimunit and unit is not None:
raise IncompatibleDimensions(
"Units of position and SampledDimension "
"must both be given!",
"Tag._pos_to_idx"
)
if dimunit and unit is not None:
try:
scaling = util.units.scaling(unit, dimunit)
except InvalidUnit:
raise IncompatibleDimensions(
"Cannot scale Tag unit {} to match dimension unit {}".format(unit, dimunit),
"Tag._pos_to_idx"
)
index = dim.index_of(pos * scaling, mode)
elif dimtype == DimensionType.Set:
if unit and unit != "none":
raise IncompatibleDimensions(
"Cannot apply a position with unit to a SetDimension",
"Tag._pos_to_idx"
)
index = dim.index_of(pos, mode)
else: # dimtype == DimensionType.Range:
if dimunit and unit is not None:
try:
scaling = util.units.scaling(unit, dimunit)
except InvalidUnit:
raise IncompatibleDimensions(
"Cannot scale Tag unit {} to match dimension unit {}".format(unit, dimunit),
"Tag._pos_to_idx"
)
index = dim.index_of(pos * scaling, mode)
return int(index)
@property
def features(self):
"""
A property containing all features. Features can be obtained
via their index or their ID. Features can be deleted from the list.
Adding new features is done using the create_feature method.
This is a read only attribute.
:type: Container of nixio.Feature.
"""
if self._features is None:
self._features = FeatureContainer("features", self.file,
self, Feature)
return self._features
class Tag(BaseTag):
@classmethod
def create_new(cls, nixfile, nixparent, h5parent, name, type_, position):
newentity = super(Tag, cls).create_new(nixfile, nixparent, h5parent,
name, type_)
newentity.position = position
return newentity
@property
def position(self):
"""
The position defined by the tag. This is a read-write property.
:type: list of float
"""
return tuple(self._h5group.get_data("position"))
@position.setter
def position(self, pos):
if pos is not None and not hasattr(pos, "__getitem__"):
pos = [pos]
if pos is None or len(pos) == 0:
if self._h5group.has_data("position"):
del self._h5group["position"]
else:
dtype = DataType.Double
self._h5group.write_data("position", pos, dtype)
if self.file.auto_update_timestamps:
self.force_updated_at()
@property
def extent(self):
"""
The extent defined by the tag. This is an optional read-write
property and may be set to None.
:type: list of float
"""
return tuple(self._h5group.get_data("extent"))
@extent.setter
def extent(self, ext):
if ext is not None and not hasattr(ext, "__getitem__"):
ext = [ext]
if ext is None or len(ext) == 0:
if self._h5group.has_data("extent"):
del self._h5group["extent"]
else:
dtype = DataType.Double
self._h5group.write_data("extent", ext, dtype)
if self.file.auto_update_timestamps:
self.force_updated_at()
def retrieve_data(self, refidx):
msg = ("Call to deprecated method Tag.retrieve_data. "
"Use Tag.tagged_data instead.")
warnings.warn(msg, category=DeprecationWarning)
return self.tagged_data(refidx)
def tagged_data(self, refidx, stop_rule=SliceMode.Exclusive):
references = self.references
position = self.position
extent = self.extent
if len(references) == 0:
raise OutOfBounds("There are no references in this tag!")
if isinstance(refidx, int) and refidx >= len(references):
raise OutOfBounds("Reference index out of bounds.")
ref = references[refidx]
if extent and len(position) != len(extent):
raise IncompatibleDimensions(
"Number of dimensions in position and extent "
"do not match ", extent)
slices = self._calc_data_slices(ref, self.position, self.extent, stop_rule)
if not all(slices):
return DataView(ref, slices)
if not self._slices_in_data(ref, slices):
raise OutOfBounds("References data slice out of the extent of the "
"DataArray!")
return DataView(ref, slices)
def retrieve_feature_data(self, featidx):
msg = ("Call to deprecated method Tag.retrieve_feature_data. "
"Use Tag.feature_data instead.")
warnings.warn(msg, category=DeprecationWarning)
return self.feature_data(featidx)
def feature_data(self, featidx, stop_rule=SliceMode.Exclusive):
if len(self.features) == 0:
raise OutOfBounds("There are no features associated with this tag!")
try:
feat = self.features[featidx]
except KeyError:
feat = None
for feature in self.features:
if feature.data.name == featidx or feature.data.id == featidx:
feat = feature
break
if feat is None:
raise
data = feat.data
if data is None:
raise UninitializedEntity()
if feat.link_type == LinkType.Tagged:
slices = self._calc_data_slices(data, self.position, self.extent, stop_rule)
if not self._slices_in_data(data, slices):
raise OutOfBounds("Requested data slice out of the extent "
"of the Feature!")
return DataView(data, slices)
# For untagged and indexed return the full data
fullslices = tuple(slice(0, stop) for stop in data.shape)
return DataView(data, fullslices)
@property
def references(self):
"""
A property containing all data arrays referenced by the tag. Referenced
data arrays can be obtained by index or their id. References can be
removed from the list, removing a referenced DataArray will not remove
it from the file. New references can be added using the append method
of the list.
This is a read only attribute.
:type: Container of nixio.DataArray
"""
if self._references is None:
self._references = LinkContainer("references", self, DataArray,
self._parent.data_arrays)
return self._references
@property
def sources(self):
"""
A property containing all Sources referenced by the Tag. Sources
can be obtained by index or their id. Sources can be removed from the
list, but removing a referenced Source will not remove it from the
file. New Sources can be added using the append method of the list.
This is a read only attribute.
"""
if self._sources is None:
self._sources = SourceLinkContainer(self)
return self._sources
# metadata
@property
def metadata(self):
"""
Associated metadata of the entity. Sections attached to the entity via
this attribute can provide additional annotations. This is an optional
read-write property, and can be None if no metadata is available.
:type: nixio.Section
"""
if "metadata" in self._h5group:
return Section(self.file, None, self._h5group.open_group("metadata"))
return None
@metadata.setter
def metadata(self, sect):
if not isinstance(sect, Section):
raise TypeError("{} is not of type Section".format(sect))
self._h5group.create_link(sect, "metadata")
@metadata.deleter
def metadata(self):
if "metadata" in self._h5group:
self._h5group.delete("metadata")