Skip to content

global_object

global_object

GlobalObject

GlobalObject is the assimilated knowledge of EasyScience.

Every class based on EasyScience gets brought into the collective.

Source code in src/easyscience/global_object/global_object.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@singleton
class GlobalObject:
    """GlobalObject is the assimilated knowledge of `EasyScience`.

    Every class based on `EasyScience` gets brought
    into the collective.
    """

    __log = Logger()
    __map = Map()
    __stack = None
    __debug = False

    def __init__(self):
        # Logger. This is so there's a unified logging interface
        self.log: Logger = self.__log
        # Debug. Global debugging level
        self.debug: bool = self.__debug
        # Stack. This is where the undo/redo operations are stored.
        self.stack = self.__stack
        #
        self.script: ScriptManager = ScriptManager()
        # Map. This is the conduit database between all global object species
        self.map: Map = self.__map

    def instantiate_stack(self):
        """The undo/redo stack references the collective. Hence it has
        to be imported after initialization.

        :return: None
        :rtype: noneType
        """
        from easyscience.global_object.undo_redo import UndoStack

        self.stack = UndoStack()

    def generate_unique_name(self, name_prefix: str) -> str:
        """Generate a generic unique name for the object using the class
        name and a global iterator. Names are in the format
        `name_prefix_0`, `name_prefix_1`, `name_prefix_2`, etc.

        :param name_prefix: The prefix to be used for the name
        """
        names_with_prefix = [
            name for name in self.map.vertices() if name.startswith(name_prefix + '_')
        ]
        if names_with_prefix:
            name_with_prefix_count = [0]
            for name in names_with_prefix:
                # Strip away the prefix and trailing _
                name_without_prefix = name.replace(name_prefix + '_', '')
                if name_without_prefix.isdecimal():
                    name_with_prefix_count.append(int(name_without_prefix))
            unique_name = f'{name_prefix}_{max(name_with_prefix_count) + 1}'
        else:
            unique_name = f'{name_prefix}_0'
        return unique_name

generate_unique_name(name_prefix)

Generate a generic unique name for the object using the class name and a global iterator. Names are in the format name_prefix_0, name_prefix_1, name_prefix_2, etc.

:param name_prefix: The prefix to be used for the name

Source code in src/easyscience/global_object/global_object.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def generate_unique_name(self, name_prefix: str) -> str:
    """Generate a generic unique name for the object using the class
    name and a global iterator. Names are in the format
    `name_prefix_0`, `name_prefix_1`, `name_prefix_2`, etc.

    :param name_prefix: The prefix to be used for the name
    """
    names_with_prefix = [
        name for name in self.map.vertices() if name.startswith(name_prefix + '_')
    ]
    if names_with_prefix:
        name_with_prefix_count = [0]
        for name in names_with_prefix:
            # Strip away the prefix and trailing _
            name_without_prefix = name.replace(name_prefix + '_', '')
            if name_without_prefix.isdecimal():
                name_with_prefix_count.append(int(name_without_prefix))
        unique_name = f'{name_prefix}_{max(name_with_prefix_count) + 1}'
    else:
        unique_name = f'{name_prefix}_0'
    return unique_name

instantiate_stack()

The undo/redo stack references the collective. Hence it has to be imported after initialization.

:return: None :rtype: noneType

Source code in src/easyscience/global_object/global_object.py
35
36
37
38
39
40
41
42
43
44
def instantiate_stack(self):
    """The undo/redo stack references the collective. Hence it has
    to be imported after initialization.

    :return: None
    :rtype: noneType
    """
    from easyscience.global_object.undo_redo import UndoStack

    self.stack = UndoStack()

hugger

hugger

PatcherFactory

Bases: Hugger

Source code in src/easyscience/global_object/hugger/hugger.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class PatcherFactory(Hugger, metaclass=ABCMeta):
    def __init__(self):
        super().__init__()

    @staticmethod
    def is_mutable(arg) -> bool:
        ret = True
        if isinstance(arg, (int, float, complex, str, tuple, frozenset, bytes, property)):
            ret = False
        return ret

    @staticmethod
    def _caller_name(skip: int = 2):
        """Get a name of a caller in the format module.class.method
        `skip` specifies how many levels of stack to skip while getting
        caller name.

        skip=1 means "who calls me", skip=2 "who calls my caller" etc.
        An empty string is returned if skipped levels exceed stack
        height
        https://gist.github.com/techtonik/2151727#gistcomment-2333747
        """

        def stack_(frame):
            framelist = []
            while frame:
                framelist.append(frame)
                frame = frame.f_back
            return framelist

        stack = stack_(sys._getframe(1))
        start = 0 + skip
        if len(stack) < start + 1:
            return ''
        parentframe = stack[start]

        name = []
        module = inspect.getmodule(parentframe)
        # `modname` can be None when frame is executed directly in console
        # TODO(techtonik): consider using __main__
        if module:
            name.append(module.__name__)
        # detect classname
        if 'self' in parentframe.f_locals:
            # I don't know any way to detect call from the object method
            # XXX: there seems to be no way to detect static method call - it will
            #      be just a function call
            name.append(parentframe.f_locals['self'].__class__.__name__)
        codename = parentframe.f_code.co_name
        if codename != '<module>':  # top level usually
            name.append(codename)  # function or a method
        del parentframe
        return '.'.join(name)

    def _append_args(self, *args, **kwargs):
        def check(res):
            return (
                id(res) not in self._store.unique_rets
                and id(res) not in self._store.create_list
                and id(res) not in self._store.unique_args
            )

        for arg in args:
            if self.is_mutable(arg) and check(arg):
                self._store.unique_args.append(id(arg))
        for item in kwargs.values():
            if self.is_mutable(item) and check(item):
                self._store.unique_args.append(id(item))

    def _append_create(self, obj):
        this_id = id(obj)
        if this_id not in self._store.create_list:
            self._store.create_list.append(this_id)

    def _append_result(self, result) -> int:
        ret = 0

        def check(res):
            return (
                id(res) not in self._store.unique_rets
                and id(res) not in self._store.create_list
                and id(res) not in self._store.unique_args
            )

        if isinstance(result, type(None)):
            return ret
        elif isinstance(result, tuple):
            for res in result:
                # if self.is_mutable(res) and check(res):
                if check(res):
                    self._store.unique_rets.append(id(res))
            ret = len(result)
        else:
            # if self.is_mutable(result) and check(result):
            if check(result):
                self._store.unique_rets.append(id(result))
            ret = 1
        return ret

    def _append_log(self, log_entry: str):
        self._store.log.append(log_entry)

    def __options(self, item) -> Tuple[int, dict]:
        this_id = id(item)
        option = {
            'create_list': self._store.create_list,
            'return_list': self._store.unique_rets,
            'input_list': self._store.unique_args,
        }
        return this_id, option

    def _get_position(self, query: str, item) -> int:
        this_id, option = self.__options(item)
        in_list = self._in_list(query, item)
        index = None
        if in_list:
            index = option.get(query).index(this_id)
        return index

    def _in_list(self, query: str, item) -> bool:
        this_id, option = self.__options(item)
        return this_id in option.get(query, [])

    @staticmethod
    def _get_class_that_defined_method(method_in) -> classmethod:
        if inspect.ismethod(method_in):
            for cls in inspect.getmro(method_in.__self__.__class__):
                if cls.__dict__.get(method_in.__name__) is method_in:
                    return cls
            method_in = method_in.__func__  # fallback to __qualname__ parsing
        if inspect.isfunction(method_in):
            class_name = method_in.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]
            try:
                cls = getattr(inspect.getmodule(method_in), class_name)
            except AttributeError:
                cls = method_in.__globals__.get(class_name)
            if isinstance(cls, type):
                return cls

property

LoggedProperty

Bases: property

Pump up python properties.

In this case we can see who has called this property and then do something if a criteria is met. In this case if the caller is not a member of the ObjBase class. Note that all high level EasyScience objects should be built from ObjBase.

Source code in src/easyscience/global_object/hugger/property.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class LoggedProperty(property):
    """Pump up python properties.

    In this case we can see who has called this property and
    then do something if a criteria is met. In this case if the caller is not a member of
    the `ObjBase` class. Note that all high level `EasyScience` objects should be built from
    `ObjBase`.
    """

    _global_object = global_object

    def __init__(self, *args, get_id=None, my_self=None, test_class=None, **kwargs):
        super(LoggedProperty, self).__init__(*args, **kwargs)
        self._get_id = get_id
        self._my_self = my_self
        self.test_class = test_class

    @staticmethod
    def _caller_class(test_class, skip: int = 1):
        def stack_(frame):
            frame_list = []
            while frame:
                frame_list.append(frame)
                frame = frame.f_back
            return frame_list

        stack: List[Any] = stack_(sys._getframe(1))
        start = 0 + skip
        if len(stack) < start + 1:
            return ''
        parent_frame = stack[start]
        test = False
        if 'self' in parent_frame.f_locals:
            test = issubclass(parent_frame.f_locals['self'].__class__, test_class)
        return test

    def __get__(self, instance, owner=None):
        if not global_object.script.enabled:
            return super(LoggedProperty, self).__get__(instance, owner)
        test = self._caller_class(self.test_class)
        res = super(LoggedProperty, self).__get__(instance, owner)

        def result_item(item_to_be_resulted):
            if item_to_be_resulted is None:
                return None
            if global_object.map.is_known(item_to_be_resulted):
                global_object.map.change_type(item_to_be_resulted, 'returned')
            else:
                global_object.map.add_vertex(item_to_be_resulted, obj_type='returned')

        if not test and self._get_id is not None and self._my_self is not None:
            if not isinstance(res, list):
                result_item(res)
            else:
                for item in res:
                    result_item(item)
            Store().append_log(self.makeEntry('get', res))
            if global_object.debug:  # noqa: S1006
                print(f"I'm {self._my_self} and {self._get_id} has been called from the outside!")
        return res

    def __set__(self, instance, value):
        if not global_object.script.enabled:
            return super().__set__(instance, value)
        test = self._caller_class(self.test_class)
        if not test and self._get_id is not None and self._my_self is not None:
            Store().append_log(self.makeEntry('set', value))
            if global_object.debug:  # noqa: S1006
                print(
                    f"I'm {self._my_self} and {self._get_id} has been set to {value} from the outside!"
                )
        return super().__set__(instance, value)

    def makeEntry(self, log_type, returns, *args, **kwargs) -> str:
        temp = ''
        if returns is None:
            returns = []
        if not isinstance(returns, list):
            returns = [returns]
        if log_type == 'get':
            for var in returns:
                if var.unique_name in global_object.map.returned_objs:
                    index = global_object.map.returned_objs.index(var.unique_name)
                    temp += f'{Store().var_ident}{index}, '
            if len(returns) > 0:
                temp = temp[:-2]
                temp += ' = '
            if self._my_self.unique_name in global_object.map.created_objs:
                # for edge in route[::-1]:
                index = global_object.map.created_objs.index(self._my_self.unique_name)
                temp += f'{self._my_self.__class__.__name__.lower()}_{index}.{self._get_id}'
            if self._my_self.unique_name in global_object.map.created_internal:
                # We now have to trace....
                route = global_object.map.reverse_route(self._my_self)  # noqa: F841
                index = global_object.map.created_internal.index(self._my_self.unique_name)
                temp += f'{self._my_self.__class__.__name__.lower()}_{index}.{self._get_id}'
        elif log_type == 'set':
            if self._my_self.unique_name in global_object.map.created_objs:
                index = global_object.map.created_objs.index(self._my_self.unique_name)
                temp += f'{self._my_self.__class__.__name__.lower()}_{index}.{self._get_id} = '
            args = args[1:]
            for var in args:
                if var.unique_name in global_object.map.argument_objs:
                    index = global_object.map.argument_objs.index(var.unique_name)
                    temp += f'{Store().var_ident}{index}'
                elif var.unique_name in global_object.map.returned_objs:
                    index = global_object.map.returned_objs.index(var.unique_name)
                    temp += f'{Store().var_ident}{index}'
                elif var.unique_name in global_object.map.created_objs:
                    index = global_object.map.created_objs.index(var.unique_name)
                    temp += f'{self._my_self.__class__.__name__.lower()}_{index}'
                else:
                    if isinstance(var, str):
                        var = '"' + var + '"'
                    temp += f'{var}'
        else:
            print(f'{log_type} is not implemented yet. Sorry')
        temp += '\n'
        return temp

logger

Logger

Source code in src/easyscience/global_object/logger.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Logger:
    def __init__(self, log_level: int = logging.INFO):
        self.logger = logging.getLogger(__name__)
        self.level = log_level
        self.logger.setLevel(self.level)

    def getLogger(self, logger_name, color: str = '32', defaults: bool = True) -> logging:
        """Create a logger :param color:

        :param logger_name: logger name. Usually __name__ on creation
        :param defaults: Do you want to associate any current file
            loggers with this logger
        :return: A logger
        """
        logger = logging.getLogger(logger_name)
        logger.setLevel(self.level)
        # self.applyLevel(logger)
        # for handler_type in self._handlers:
        #     for handler in self._handlers[handler_type]:
        #         if handler_type == 'sys' or defaults:
        #             handler.formatter._fmt = self._makeColorText(color)
        #             logger.addHandler(handler)
        # logger.propagate = False
        # self._loggers.append(logger)
        return logger

getLogger(logger_name, color='32', defaults=True)

Create a logger :param color:

:param logger_name: logger name. Usually name on creation :param defaults: Do you want to associate any current file loggers with this logger :return: A logger

Source code in src/easyscience/global_object/logger.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def getLogger(self, logger_name, color: str = '32', defaults: bool = True) -> logging:
    """Create a logger :param color:

    :param logger_name: logger name. Usually __name__ on creation
    :param defaults: Do you want to associate any current file
        loggers with this logger
    :return: A logger
    """
    logger = logging.getLogger(logger_name)
    logger.setLevel(self.level)
    # self.applyLevel(logger)
    # for handler_type in self._handlers:
    #     for handler in self._handlers[handler_type]:
    #         if handler_type == 'sys' or defaults:
    #             handler.formatter._fmt = self._makeColorText(color)
    #             logger.addHandler(handler)
    # logger.propagate = False
    # self._loggers.append(logger)
    return logger

map

Map

Source code in src/easyscience/global_object/map.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
class Map:
    def __init__(self):
        # A dictionary of object names and their corresponding objects
        self._store = weakref.WeakValueDictionary()
        # A dict with object names as keys and a list of their object types as values, with weak references
        self.__type_dict = {}

    def _snapshot_items(self):
        """Return a stable snapshot of __type_dict items.

        Some callers iterate over __type_dict while other threads or
        weakref finalizers may modify it. Creating a list snapshot (with
        a retry loop) prevents RuntimeError: dictionary changed size
        during iteration.
        """
        while True:
            try:
                return list(self.__type_dict.items())
            except RuntimeError:
                # Dict changed during snapshot creation, retry
                continue

    def vertices(self) -> List[str]:
        """Returns the vertices of a map.

        Uses a retry loop to handle RuntimeError that can occur when the
        WeakValueDictionary is modified during iteration (e.g., by
        garbage collection).
        """
        while True:
            try:
                return list(self._store)
            except RuntimeError:
                # Dictionary changed size during iteration, retry
                continue

    def edges(self):
        """Returns the edges of a map."""
        return self.__generate_edges()

    @property
    def argument_objs(self) -> List[str]:
        return self._nested_get('argument')

    @property
    def created_objs(self) -> List[str]:
        return self._nested_get('created')

    @property
    def created_internal(self) -> List[str]:
        return self._nested_get('created_internal')

    @property
    def returned_objs(self) -> List[str]:
        return self._nested_get('returned')

    def _nested_get(self, obj_type: str) -> List[str]:
        """Access a nested object in root by key sequence."""
        # Create a stable snapshot of the dict items to avoid RuntimeError
        # when the dict is modified during iteration (e.g., by finalizers).
        while True:
            try:
                items = self._snapshot_items()
                return [key for key, item in items if obj_type in item.type]
            except RuntimeError:
                # In case the snapshot itself raises (very rare), retry
                continue

    def get_item_by_key(self, item_id: str) -> object:
        if item_id in self._store:
            return self._store[item_id]
        raise ValueError('Item not in map.')

    def is_known(self, vertex: object) -> bool:
        """Check if a vertex is known in the map.

        All objects should have a 'unique_name' attribute.
        """
        return vertex.unique_name in self._store

    def find_type(self, vertex: object) -> List[str]:
        if self.is_known(vertex):
            return self.__type_dict[vertex.unique_name].type

    def reset_type(self, obj, default_type: str):
        if obj.unique_name in self.__type_dict:
            self.__type_dict[obj.unique_name].reset_type(default_type)

    def change_type(self, obj, new_type: str):
        if obj.unique_name in self.__type_dict:
            self.__type_dict[obj.unique_name].type = new_type

    def add_vertex(self, obj: object, obj_type: str = None):
        name = obj.unique_name
        if name in self._store:
            raise ValueError(f'Object name {name} already exists in the graph.')
        # Clean up stale entry in __type_dict if the weak reference was collected
        # but the finalizer hasn't run yet
        if name in self.__type_dict:
            del self.__type_dict[name]

        self._store[name] = obj

        entry_list = _EntryList()
        entry_list.finalizer = weakref.finalize(obj, self.prune_type_dict, name)
        entry_list.type = obj_type
        self.__type_dict[name] = entry_list  # Add objects type to the list of types

    def add_edge(self, start_obj: object, end_obj: object):
        if start_obj.unique_name in self.__type_dict:
            self.__type_dict[start_obj.unique_name].append(end_obj.unique_name)
        else:
            raise AttributeError('Start object not in map.')

    def get_edges(self, start_obj) -> List[str]:
        if start_obj.unique_name in self.__type_dict:
            return list(self.__type_dict[start_obj.unique_name])
        else:
            raise AttributeError

    def __generate_edges(self) -> list:
        """A static method generating the edges of the map.

        Edges are represented as sets with one (a loop back to the
        vertex) or two vertices
        """
        edges = []
        # Iterate over a snapshot of items and snapshot neighbour lists to
        # avoid concurrent modification issues.
        for vertex, neighbours in self._snapshot_items():
            neighbours_snapshot = list(neighbours)
            for neighbour in neighbours_snapshot:
                if {neighbour, vertex} not in edges:
                    edges.append({vertex, neighbour})
        return edges

    def prune_vertex_from_edge(self, parent_obj, child_obj):
        vertex1 = parent_obj.unique_name
        if child_obj is None:
            return
        vertex2 = child_obj.unique_name

        if vertex1 in self.__type_dict and vertex2 in self.__type_dict[vertex1]:
            del self.__type_dict[vertex1][self.__type_dict[vertex1].index(vertex2)]

    def prune_type_dict(self, key: str):
        if key in self.__type_dict:
            del self.__type_dict[key]

    def prune(self, key: str):
        if key in self.__type_dict:
            del self.__type_dict[key]
            if key in self._store:
                del self._store[key]

    def find_isolated_vertices(self) -> list:
        """Returns a list of isolated vertices."""
        isolated = []
        for vertex, neighbours in self._snapshot_items():
            if not list(neighbours):
                isolated.append(vertex)
        return isolated

    def find_path(self, start_vertex: str, end_vertex: str, path=[]) -> list:
        """Find a path from start_vertex to end_vertex in map."""

        graph = self.__type_dict
        path = path + [start_vertex]
        if start_vertex == end_vertex:
            return path
        if start_vertex not in graph:
            return []
        for vertex in graph[start_vertex]:
            if vertex not in path:
                extended_path = self.find_path(vertex, end_vertex, path)
                if extended_path:
                    return extended_path
        return []

    def find_all_paths(self, start_vertex: str, end_vertex: str, path=[]) -> list:
        """Find all paths from start_vertex to end_vertex in map."""

        graph = self.__type_dict
        path = path + [start_vertex]
        if start_vertex == end_vertex:
            return [path]
        if start_vertex not in graph:
            return []
        paths = []
        for vertex in graph[start_vertex]:
            if vertex not in path:
                extended_paths = self.find_all_paths(vertex, end_vertex, path)
                for p in extended_paths:
                    paths.append(p)
        return paths

    def reverse_route(self, end_vertex: str, start_vertex: Optional[str] = None) -> List:
        """In this case we have an object and want to know the
        connections to get to another in reverse.

        We might not know the start_object. In which case we follow the
        shortest path to a base vertex.
        :param end_obj:
        :type end_obj:
        :param start_obj:
        :type start_obj:
        :return:
        :rtype:
        """
        path_length = sys.maxsize
        optimum_path = []
        if start_vertex is None:
            # We now have to find where to begin..... Iterate over a snapshot
            for possible_start, vertices in self._snapshot_items():
                vertices_snapshot = list(vertices)
                if end_vertex in vertices_snapshot:
                    temp_path = self.find_path(possible_start, end_vertex)
                    if len(temp_path) < path_length:
                        path_length = len(temp_path)
                        optimum_path = temp_path
        else:
            optimum_path = self.find_path(start_vertex, end_vertex)
        optimum_path.reverse()
        return optimum_path

    def is_connected(self, vertices_encountered=None, start_vertex=None) -> bool:
        """Determines if the map is connected."""
        if vertices_encountered is None:
            vertices_encountered = set()
        graph = self.__type_dict
        vertices = list(graph)
        if not start_vertex:
            # chose a vertex from graph as a starting point
            start_vertex = vertices[0]
        vertices_encountered.add(start_vertex)
        if len(vertices_encountered) != len(vertices):
            for vertex in list(graph[start_vertex]):
                if vertex not in vertices_encountered and self.is_connected(
                    vertices_encountered, vertex
                ):
                    return True
        else:
            return True
        return False

    def _clear(self):
        """Reset the map to an empty state.

        Only to be used for testing
        """
        self._store.clear()
        self.__type_dict.clear()
        gc.collect()

    def __repr__(self) -> str:
        return f'Map object of {len(self._store)} vertices.'

__generate_edges()

A static method generating the edges of the map.

Edges are represented as sets with one (a loop back to the vertex) or two vertices

Source code in src/easyscience/global_object/map.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def __generate_edges(self) -> list:
    """A static method generating the edges of the map.

    Edges are represented as sets with one (a loop back to the
    vertex) or two vertices
    """
    edges = []
    # Iterate over a snapshot of items and snapshot neighbour lists to
    # avoid concurrent modification issues.
    for vertex, neighbours in self._snapshot_items():
        neighbours_snapshot = list(neighbours)
        for neighbour in neighbours_snapshot:
            if {neighbour, vertex} not in edges:
                edges.append({vertex, neighbour})
    return edges

edges()

Returns the edges of a map.

Source code in src/easyscience/global_object/map.py
105
106
107
def edges(self):
    """Returns the edges of a map."""
    return self.__generate_edges()

find_all_paths(start_vertex, end_vertex, path=[])

Find all paths from start_vertex to end_vertex in map.

Source code in src/easyscience/global_object/map.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def find_all_paths(self, start_vertex: str, end_vertex: str, path=[]) -> list:
    """Find all paths from start_vertex to end_vertex in map."""

    graph = self.__type_dict
    path = path + [start_vertex]
    if start_vertex == end_vertex:
        return [path]
    if start_vertex not in graph:
        return []
    paths = []
    for vertex in graph[start_vertex]:
        if vertex not in path:
            extended_paths = self.find_all_paths(vertex, end_vertex, path)
            for p in extended_paths:
                paths.append(p)
    return paths

find_isolated_vertices()

Returns a list of isolated vertices.

Source code in src/easyscience/global_object/map.py
224
225
226
227
228
229
230
def find_isolated_vertices(self) -> list:
    """Returns a list of isolated vertices."""
    isolated = []
    for vertex, neighbours in self._snapshot_items():
        if not list(neighbours):
            isolated.append(vertex)
    return isolated

find_path(start_vertex, end_vertex, path=[])

Find a path from start_vertex to end_vertex in map.

Source code in src/easyscience/global_object/map.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def find_path(self, start_vertex: str, end_vertex: str, path=[]) -> list:
    """Find a path from start_vertex to end_vertex in map."""

    graph = self.__type_dict
    path = path + [start_vertex]
    if start_vertex == end_vertex:
        return path
    if start_vertex not in graph:
        return []
    for vertex in graph[start_vertex]:
        if vertex not in path:
            extended_path = self.find_path(vertex, end_vertex, path)
            if extended_path:
                return extended_path
    return []

is_connected(vertices_encountered=None, start_vertex=None)

Determines if the map is connected.

Source code in src/easyscience/global_object/map.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def is_connected(self, vertices_encountered=None, start_vertex=None) -> bool:
    """Determines if the map is connected."""
    if vertices_encountered is None:
        vertices_encountered = set()
    graph = self.__type_dict
    vertices = list(graph)
    if not start_vertex:
        # chose a vertex from graph as a starting point
        start_vertex = vertices[0]
    vertices_encountered.add(start_vertex)
    if len(vertices_encountered) != len(vertices):
        for vertex in list(graph[start_vertex]):
            if vertex not in vertices_encountered and self.is_connected(
                vertices_encountered, vertex
            ):
                return True
    else:
        return True
    return False

is_known(vertex)

Check if a vertex is known in the map.

All objects should have a 'unique_name' attribute.

Source code in src/easyscience/global_object/map.py
142
143
144
145
146
147
def is_known(self, vertex: object) -> bool:
    """Check if a vertex is known in the map.

    All objects should have a 'unique_name' attribute.
    """
    return vertex.unique_name in self._store

reverse_route(end_vertex, start_vertex=None)

In this case we have an object and want to know the connections to get to another in reverse.

We might not know the start_object. In which case we follow the shortest path to a base vertex. :param end_obj: :type end_obj: :param start_obj: :type start_obj: :return: :rtype:

Source code in src/easyscience/global_object/map.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def reverse_route(self, end_vertex: str, start_vertex: Optional[str] = None) -> List:
    """In this case we have an object and want to know the
    connections to get to another in reverse.

    We might not know the start_object. In which case we follow the
    shortest path to a base vertex.
    :param end_obj:
    :type end_obj:
    :param start_obj:
    :type start_obj:
    :return:
    :rtype:
    """
    path_length = sys.maxsize
    optimum_path = []
    if start_vertex is None:
        # We now have to find where to begin..... Iterate over a snapshot
        for possible_start, vertices in self._snapshot_items():
            vertices_snapshot = list(vertices)
            if end_vertex in vertices_snapshot:
                temp_path = self.find_path(possible_start, end_vertex)
                if len(temp_path) < path_length:
                    path_length = len(temp_path)
                    optimum_path = temp_path
    else:
        optimum_path = self.find_path(start_vertex, end_vertex)
    optimum_path.reverse()
    return optimum_path

vertices()

Returns the vertices of a map.

Uses a retry loop to handle RuntimeError that can occur when the WeakValueDictionary is modified during iteration (e.g., by garbage collection).

Source code in src/easyscience/global_object/map.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def vertices(self) -> List[str]:
    """Returns the vertices of a map.

    Uses a retry loop to handle RuntimeError that can occur when the
    WeakValueDictionary is modified during iteration (e.g., by
    garbage collection).
    """
    while True:
        try:
            return list(self._store)
        except RuntimeError:
            # Dictionary changed size during iteration, retry
            continue

undo_redo

CommandHolder

A holder for one or more commands which are added to the stack.

Source code in src/easyscience/global_object/undo_redo.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class CommandHolder:
    """A holder for one or more commands which are added to the
    stack.
    """

    def __init__(self, text: str = None):
        self._commands = deque()
        self._text = text
        self.__index = 0

    def append(self, command: T_):
        self._commands.appendleft(command)

    def pop(self):
        return self._commands.popleft()

    def __iter__(self) -> T_:
        while self.__index < len(self):
            index = self.__index
            self.__index += 1
            yield self._commands[index]
        self.__index = 0

    def __len__(self) -> int:
        return len(self._commands)

    @property
    def is_macro(self) -> bool:
        return len(self) > 1

    @property
    def current(self) -> T_:
        return self._commands[0]

    @property
    def text(self) -> str:
        text = ''
        if self._commands:
            text = self._commands[-1].text
        if self._text is not None:
            text = self._text
        return text

    @text.setter
    def text(self, text: str):
        self._text = text

NotarizedDict

Bases: UserDict

A simple dict drop in for EasyScience group classes.

This is used as it wraps the get/set methods

Source code in src/easyscience/global_object/undo_redo.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class NotarizedDict(UserDict):
    """A simple dict drop in for EasyScience group classes.

    This is used as it wraps the get/set methods
    """

    def __init__(self, **kwargs):
        from easyscience import global_object  # Local import to avoid circular dependency

        super().__init__(**kwargs)
        self._global_object = global_object
        self._stack_enabled = False

    @classmethod
    def _classname(cls):
        # This method just returns the name of the class
        return cls.__name__

    @dict_stack_deco
    def __setitem__(self, key, value):
        super(NotarizedDict, self).__setitem__(key, value)

    @dict_stack_deco
    def __delitem__(self, key):
        super(NotarizedDict, self).__delitem__(key)

    def __repr__(self):
        return f'{self._classname()}({self.data})'

    @dict_stack_deco
    def reorder(self, **kwargs):
        self.data = kwargs.copy()

PropertyStack

Bases: UndoCommand

Stack operator for when a property setter is wrapped.

Source code in src/easyscience/global_object/undo_redo.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class PropertyStack(UndoCommand):
    """Stack operator for when a property setter is wrapped."""

    def __init__(self, parent, func: Callable, old_value: Any, new_value: Any, text: str = None):
        # self.setText("Setting {} to {}".format(func.__name__, new_value))
        super().__init__(self)
        self._parent = parent
        self._old_value = old_value
        self._new_value = new_value
        self._set_func = func
        self.text = f'{parent} value changed from {old_value} to {new_value}'
        if text is not None:
            self.text = text

    def undo(self) -> NoReturn:
        self._set_func(self._parent, self._old_value)

    def redo(self) -> NoReturn:
        self._set_func(self._parent, self._new_value)

UndoCommand

The Command interface pattern.

Source code in src/easyscience/global_object/undo_redo.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class UndoCommand(metaclass=abc.ABCMeta):
    """The Command interface pattern."""

    def __init__(self, obj) -> None:
        self._obj = obj
        self._text = None

    @abc.abstractmethod
    def undo(self) -> NoReturn:
        """Undo implementation which should be overwritten."""

    @abc.abstractmethod
    def redo(self) -> NoReturn:
        """Redo implementation which should be overwritten."""

    @property
    def text(self) -> str:
        return self._text

    @text.setter
    def text(self, text: str) -> NoReturn:
        self._text = text

redo() abstractmethod

Redo implementation which should be overwritten.

Source code in src/easyscience/global_object/undo_redo.py
31
32
33
@abc.abstractmethod
def redo(self) -> NoReturn:
    """Redo implementation which should be overwritten."""

undo() abstractmethod

Undo implementation which should be overwritten.

Source code in src/easyscience/global_object/undo_redo.py
27
28
29
@abc.abstractmethod
def undo(self) -> NoReturn:
    """Undo implementation which should be overwritten."""

UndoStack

Implement a version of QUndoStack without the QT.

Source code in src/easyscience/global_object/undo_redo.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class UndoStack:
    """Implement a version of QUndoStack without the QT."""

    def __init__(self, max_history: Union[int, type(None)] = None):
        self._history = deque(maxlen=max_history)
        self._future = deque(maxlen=max_history)
        self._macro_running = False
        self._command_running = False
        self._max_history = max_history
        self._enabled = False

    @property
    def enabled(self) -> bool:
        return self._enabled

    @enabled.setter
    def enabled(self, state: bool):
        if self.enabled and self._macro_running:
            self.endMacro()
        self._enabled = state

    def force_state(self, state: bool):
        self._enabled = state

    @property
    def history(self) -> deque:
        return self._history

    @property
    def future(self) -> deque:
        return self._future

    def push(self, command: T_) -> NoReturn:
        """Add a command to the history stack."""
        # If we're not enabled, then what are we doing!
        if not self.enabled or self._command_running:
            # Do the command and leave.
            command.redo()
            return
        # If there's a macro add the command to the command holder
        if self._macro_running:
            self.history[0].append(command)
        else:
            # Else create the command holder and add it to the stack
            com = CommandHolder()
            com.append(command)
            self.history.appendleft(com)
        # Actually do the command
        command.redo()
        # Reset the future
        self._future = deque(maxlen=self._max_history)

    def pop(self) -> T_:
        """
        !! WARNING - TO BE USED WITH EXTREME CAUTION !!
        !! THIS IS PROBABLY NOT THE FN YOU'RE LOOKING FOR, IT CAN BREAK A LOT OF STUFF !!
        Sometimes you really don't want the last command. Remove it from the stack

        :return: None
        :rtype: None
        """
        pop_it = self._history.popleft()
        popped = pop_it.pop()
        if len(pop_it) > 0:
            self.history.appendleft(pop_it)
        return popped

    def clear(self) -> None:  # NoReturn:
        """Remove any commands on the stack and reset the state."""
        self._history = deque(maxlen=self._max_history)
        self._future = deque(maxlen=self._max_history)
        self._macro_running = False

    def undo(self) -> NoReturn:
        """Undo the last change to the stack."""
        if self.canUndo():
            # Move the command from the past to the future
            this_command_stack = self._history.popleft()
            self._future.appendleft(this_command_stack)

            # Execute all undo commands
            for command in this_command_stack:
                try:
                    self._command_running = True
                    command.undo()
                except Exception as e:
                    print(e)
                finally:
                    self._command_running = False

    def redo(self) -> NoReturn:
        """Redo the last `undo` command on the stack."""
        if self.canRedo():
            # Move from the future to the past
            this_command_stack = self._future.popleft()
            self._history.appendleft(this_command_stack)
            # Need to go from right to left
            this_command_stack = list(this_command_stack)
            this_command_stack.reverse()
            for command in this_command_stack:
                try:
                    self._command_running = True
                    command.redo()
                except Exception as e:
                    print(e)
                finally:
                    self._command_running = False

    def beginMacro(self, text: str) -> NoReturn:
        """Start a bulk update i.e. multiple commands under one
        undo/redo command.
        """
        if self._macro_running:
            raise AssertionError('Cannot start a macro when one is already running')
        com = CommandHolder(text)
        self.history.appendleft(com)
        self._macro_running = True

    def endMacro(self) -> NoReturn:
        """End a bulk update i.e. multiple commands under one undo/redo
        command.
        """
        if not self._macro_running:
            raise AssertionError('Cannot end a macro when one is not running')
        self._macro_running = False

    def canUndo(self) -> bool:
        """Can the last command be undone?"""
        return len(self._history) > 0 and not self._macro_running

    def canRedo(self) -> bool:
        """Can we redo a command?"""
        return len(self._future) > 0 and not self._macro_running

    def redoText(self) -> str:
        """Text associated with a redo item."""
        text = ''
        if self.canRedo():
            text = self.future[0].text
        return text

    def undoText(self) -> str:
        """Text associated with a undo item."""
        text = ''
        if self.canUndo():
            text = self.history[0].text
        return text

beginMacro(text)

Start a bulk update i.e. multiple commands under one undo/redo command.

Source code in src/easyscience/global_object/undo_redo.py
253
254
255
256
257
258
259
260
261
def beginMacro(self, text: str) -> NoReturn:
    """Start a bulk update i.e. multiple commands under one
    undo/redo command.
    """
    if self._macro_running:
        raise AssertionError('Cannot start a macro when one is already running')
    com = CommandHolder(text)
    self.history.appendleft(com)
    self._macro_running = True

canRedo()

Can we redo a command?

Source code in src/easyscience/global_object/undo_redo.py
275
276
277
def canRedo(self) -> bool:
    """Can we redo a command?"""
    return len(self._future) > 0 and not self._macro_running

canUndo()

Can the last command be undone?

Source code in src/easyscience/global_object/undo_redo.py
271
272
273
def canUndo(self) -> bool:
    """Can the last command be undone?"""
    return len(self._history) > 0 and not self._macro_running

clear()

Remove any commands on the stack and reset the state.

Source code in src/easyscience/global_object/undo_redo.py
212
213
214
215
216
def clear(self) -> None:  # NoReturn:
    """Remove any commands on the stack and reset the state."""
    self._history = deque(maxlen=self._max_history)
    self._future = deque(maxlen=self._max_history)
    self._macro_running = False

endMacro()

End a bulk update i.e. multiple commands under one undo/redo command.

Source code in src/easyscience/global_object/undo_redo.py
263
264
265
266
267
268
269
def endMacro(self) -> NoReturn:
    """End a bulk update i.e. multiple commands under one undo/redo
    command.
    """
    if not self._macro_running:
        raise AssertionError('Cannot end a macro when one is not running')
    self._macro_running = False

pop()

!! WARNING - TO BE USED WITH EXTREME CAUTION !! !! THIS IS PROBABLY NOT THE FN YOU'RE LOOKING FOR, IT CAN BREAK A LOT OF STUFF !! Sometimes you really don't want the last command. Remove it from the stack

:return: None :rtype: None

Source code in src/easyscience/global_object/undo_redo.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def pop(self) -> T_:
    """
    !! WARNING - TO BE USED WITH EXTREME CAUTION !!
    !! THIS IS PROBABLY NOT THE FN YOU'RE LOOKING FOR, IT CAN BREAK A LOT OF STUFF !!
    Sometimes you really don't want the last command. Remove it from the stack

    :return: None
    :rtype: None
    """
    pop_it = self._history.popleft()
    popped = pop_it.pop()
    if len(pop_it) > 0:
        self.history.appendleft(pop_it)
    return popped

push(command)

Add a command to the history stack.

Source code in src/easyscience/global_object/undo_redo.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def push(self, command: T_) -> NoReturn:
    """Add a command to the history stack."""
    # If we're not enabled, then what are we doing!
    if not self.enabled or self._command_running:
        # Do the command and leave.
        command.redo()
        return
    # If there's a macro add the command to the command holder
    if self._macro_running:
        self.history[0].append(command)
    else:
        # Else create the command holder and add it to the stack
        com = CommandHolder()
        com.append(command)
        self.history.appendleft(com)
    # Actually do the command
    command.redo()
    # Reset the future
    self._future = deque(maxlen=self._max_history)

redo()

Redo the last undo command on the stack.

Source code in src/easyscience/global_object/undo_redo.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def redo(self) -> NoReturn:
    """Redo the last `undo` command on the stack."""
    if self.canRedo():
        # Move from the future to the past
        this_command_stack = self._future.popleft()
        self._history.appendleft(this_command_stack)
        # Need to go from right to left
        this_command_stack = list(this_command_stack)
        this_command_stack.reverse()
        for command in this_command_stack:
            try:
                self._command_running = True
                command.redo()
            except Exception as e:
                print(e)
            finally:
                self._command_running = False

redoText()

Text associated with a redo item.

Source code in src/easyscience/global_object/undo_redo.py
279
280
281
282
283
284
def redoText(self) -> str:
    """Text associated with a redo item."""
    text = ''
    if self.canRedo():
        text = self.future[0].text
    return text

undo()

Undo the last change to the stack.

Source code in src/easyscience/global_object/undo_redo.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def undo(self) -> NoReturn:
    """Undo the last change to the stack."""
    if self.canUndo():
        # Move the command from the past to the future
        this_command_stack = self._history.popleft()
        self._future.appendleft(this_command_stack)

        # Execute all undo commands
        for command in this_command_stack:
            try:
                self._command_running = True
                command.undo()
            except Exception as e:
                print(e)
            finally:
                self._command_running = False

undoText()

Text associated with a undo item.

Source code in src/easyscience/global_object/undo_redo.py
286
287
288
289
290
291
def undoText(self) -> str:
    """Text associated with a undo item."""
    text = ''
    if self.canUndo():
        text = self.history[0].text
    return text

property_stack(arg, begin_macro=False)

Decorate a property setter with undo/redo functionality This decorator can be used as:

@property_stack def func() ....

or

@property_stack("This is the undo/redo text) def func() ....

In the latter case the argument is a string which might be evaluated. The possible markups for this string are;

obj - The thing being operated on func - The function being called name - The name of the function being called. old_value - The pre-set value new_value - The post-set value

An example would be Function {name}: Set from {old_value} to {new_value}

Source code in src/easyscience/global_object/undo_redo.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def property_stack(arg: Union[str, Callable], begin_macro: bool = False) -> Callable:
    """
    Decorate a `property` setter with undo/redo functionality
    This decorator can be used as:

    @property_stack
    def func()
    ....

    or

    @property_stack("This is the undo/redo text)
    def func()
    ....

    In the latter case the argument is a string which might be evaluated.
    The possible markups for this string are;

    `obj` - The thing being operated on
    `func` - The function being called
    `name` - The name of the function being called.
    `old_value` - The pre-set value
    `new_value` - The post-set value

    An example would be `Function {name}: Set from {old_value} to {new_value}`

    """

    def make_wrapper(func: Callable, name: str, **kwargs) -> Callable:
        def wrapper(obj, *args) -> NoReturn:
            from easyscience import global_object  # Local import to avoid circular dependency

            old_value = getattr(obj, name)
            new_value = args[0]
            if issubclass(type(old_value), Iterable) or issubclass(type(new_value), Iterable):
                ret = np.all(old_value == new_value)
            else:
                ret = old_value == new_value
            if ret:
                return

            if global_object.debug:
                print(f"I'm {obj} and have been set from {old_value} to {new_value}!")

            global_object.stack.push(PropertyStack(obj, func, old_value, new_value, **kwargs))

        return functools.update_wrapper(wrapper, func)

    if isinstance(arg, Callable):
        func = arg
        name = func.__name__
        wrapper = make_wrapper(func, name)
        setattr(wrapper, 'func', func)
    else:
        txt = arg

        def wrapper(func: Callable) -> Callable:
            name = func.__name__
            inner_wrapper = make_wrapper(func, name, text=txt.format(**locals()))
            setattr(inner_wrapper, 'func', func)
            return inner_wrapper

    return wrapper