diff options
| author | Camille Mougey <camille.mougey@cea.fr> | 2015-02-23 17:21:05 +0100 |
|---|---|---|
| committer | Camille Mougey <camille.mougey@cea.fr> | 2015-02-23 17:54:59 +0100 |
| commit | 9303454bbfb934017c1054f668f3083c75b2d61c (patch) | |
| tree | 2d1cbeaf42338603556e00241b36afa2842283e8 | |
| parent | 4713279e25625b33b8867c35d7da686781b8b9a7 (diff) | |
| download | miasm-9303454bbfb934017c1054f668f3083c75b2d61c.tar.gz miasm-9303454bbfb934017c1054f668f3083c75b2d61c.zip | |
Core: Introduce BoundedDict and its regression test
| -rw-r--r-- | miasm2/core/utils.py | 73 | ||||
| -rw-r--r-- | test/core/utils.py | 41 | ||||
| -rw-r--r-- | test/test_all.py | 1 |
3 files changed, 115 insertions, 0 deletions
diff --git a/miasm2/core/utils.py b/miasm2/core/utils.py index b2ddff2b..782c21d6 100644 --- a/miasm2/core/utils.py +++ b/miasm2/core/utils.py @@ -1,5 +1,6 @@ import struct import inspect +import UserDict upck8 = lambda x: struct.unpack('B', x)[0] upck16 = lambda x: struct.unpack('H', x)[0] @@ -48,3 +49,75 @@ class keydefaultdict(collections.defaultdict): def whoami(): return inspect.stack()[2][3] + + +class BoundedDict(UserDict.DictMixin): + """Limited in size dictionnary. + + To reduce combinatory cost, once an upper limit @max_size is reached, + @max_size - @min_size elements are suppressed. + The targeted elements are the less accessed. + + One can define a callback called when an element is removed + """ + + def __init__(self, max_size, min_size=None, initialdata=None, + delete_cb=None): + """Create a BoundedDict + @max_size: maximum size of the dictionnary + @min_size: (optional) number of most used element to keep when resizing + @initialdata: (optional) dict instance with initial data + @delete_cb: (optional) callback called when an element is removed + """ + self._data = initialdata.copy() if initialdata else {} + self._min_size = min_size if min_size else max_size / 3 + self._max_size = max_size + self._size = len(self._data) + self._counter = collections.Counter(self._data.keys()) + self._delete_cb = delete_cb + + def __setitem__(self, asked_key, value): + if asked_key not in self._data: + # Update internal size and use's counter + self._size += 1 + + # Bound can only be reached on a new element + if (self._size >= self._max_size): + most_commons = [key for key, _ in self._counter.most_common()] + + # Handle callback + if self._delete_cb is not None: + for key in most_commons[self._min_size - 1:]: + self._delete_cb(key) + + # Keep only the most @_min_size used + self._data = {key:self._data[key] + for key in most_commons[:self._min_size - 1]} + self._size = self._min_size + + # Reset use's counter + self._counter = collections.Counter(self._data.keys()) + + self._data[asked_key] = value + self._counter.update([asked_key]) + + def keys(self): + "Return the list of dict's keys" + return self._data.keys() + + def __getitem__(self, key): + self._counter.update([key]) + return self._data[key] + + def __delitem__(self, key): + if self._delete_cb is not None: + self._delete_cb(key) + del self._data[key] + self._size -= 1 + del self._counter[key] + + def __del__(self): + """Ensure the callback is called when last reference is lost""" + if self._delete_cb: + for key in self._data: + self._delete_cb(key) diff --git a/test/core/utils.py b/test/core/utils.py new file mode 100644 index 00000000..bf14df68 --- /dev/null +++ b/test/core/utils.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +import unittest + + +class TestUtils(unittest.TestCase): + + def test_boundedDict(self): + from miasm2.core.utils import BoundedDict + + # Use a callback + def logger(key): + print "DELETE", key + + # Create a 5/2 dictionnary + bd = BoundedDict(5, 2, initialdata={"element": "value"}, + delete_cb=logger) + bd["element2"] = "value2" + assert("element" in bd) + assert("element2" in bd) + self.assertEqual(bd["element"], "value") + self.assertEqual(bd["element2"], "value2") + + # Increase 'element2' use + _ = bd["element2"] + + for i in xrange(6): + bd[i] = i + print "Insert %d -> %s" % (i, bd) + + assert(len(bd) == 2) + + assert("element2" in bd) + self.assertEqual(bd["element2"], "value2") + + +if __name__ == '__main__': + testsuite = unittest.TestLoader().loadTestsFromTestCase(TestUtils) + report = unittest.TextTestRunner(verbosity=2).run(testsuite) + exit(len(report.errors + report.failures)) diff --git a/test/test_all.py b/test/test_all.py index 0ecc677f..8d759053 100644 --- a/test/test_all.py +++ b/test/test_all.py @@ -100,6 +100,7 @@ testset += SemanticTestExec("x86_32", "PE", 0x401000, ["bsr_bsf"], for script in ["interval.py", "graph.py", "parse_asm.py", + "utils.py", ]: testset += RegressionTest([script], base_dir="core") ## Expression |