Advertisement
here2share

# sharedctypes.py

Dec 2nd, 2019
338
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.21 KB | None | 0 0
  1. # sharedctypes.py
  2. #
  3. # Module which supports allocation of ctypes objects from shared memory
  4. #
  5. # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
  6. #
  7.  
  8. import sys
  9. import ctypes
  10. import weakref
  11.  
  12. from multiprocessing import heap, RLock
  13. from multiprocessing.forking import assert_spawning, ForkingPickler
  14.  
  15. __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
  16.  
  17. #
  18. #
  19. #
  20.  
  21. typecode_to_type = {
  22.     'c': ctypes.c_char,  'u': ctypes.c_wchar,
  23.     'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
  24.     'h': ctypes.c_short, 'H': ctypes.c_ushort,
  25.     'i': ctypes.c_int,   'I': ctypes.c_uint,
  26.     'l': ctypes.c_long,  'L': ctypes.c_ulong,
  27.     'f': ctypes.c_float, 'd': ctypes.c_double
  28.     }
  29.  
  30. #
  31. #
  32. #
  33.  
  34. def _new_value(type_):
  35.     size = ctypes.sizeof(type_)
  36.     wrapper = heap.BufferWrapper(size)
  37.     return rebuild_ctype(type_, wrapper, None)
  38.  
  39. def RawValue(typecode_or_type, *args):
  40.     '''
  41.    Returns a ctypes object allocated from shared memory
  42.    '''
  43.     type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
  44.     obj = _new_value(type_)
  45.     ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
  46.     obj.__init__(*args)
  47.     return obj
  48.  
  49. def RawArray(typecode_or_type, size_or_initializer):
  50.     '''
  51.    Returns a ctypes array allocated from shared memory
  52.    '''
  53.     type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
  54.     if isinstance(size_or_initializer, int):
  55.         type_ = type_ * size_or_initializer
  56.         return _new_value(type_)
  57.     else:
  58.         type_ = type_ * len(size_or_initializer)
  59.         result = _new_value(type_)
  60.         result.__init__(*size_or_initializer)
  61.         return result
  62.  
  63. def Value(typecode_or_type, *args, **kwds):
  64.     '''
  65.    Return a synchronization wrapper for a Value
  66.    '''
  67.     lock = kwds.pop('lock', None)
  68.     if kwds:
  69.         raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
  70.     obj = RawValue(typecode_or_type, *args)
  71.     if lock is False:
  72.         return obj
  73.     if lock in (True, None):
  74.         lock = RLock()
  75.     if not hasattr(lock, 'acquire'):
  76.         raise AttributeError("'%r' has no method 'acquire'" % lock)
  77.     return synchronized(obj, lock)
  78.  
  79. def Array(typecode_or_type, size_or_initializer, **kwds):
  80.     '''
  81.    Return a synchronization wrapper for a RawArray
  82.    '''
  83.     lock = kwds.pop('lock', None)
  84.     if kwds:
  85.         raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
  86.     obj = RawArray(typecode_or_type, size_or_initializer)
  87.     if lock is False:
  88.         return obj
  89.     if lock in (True, None):
  90.         lock = RLock()
  91.     if not hasattr(lock, 'acquire'):
  92.         raise AttributeError("'%r' has no method 'acquire'" % lock)
  93.     return synchronized(obj, lock)
  94.  
  95. def copy(obj):
  96.     new_obj = _new_value(type(obj))
  97.     ctypes.pointer(new_obj)[0] = obj
  98.     return new_obj
  99.  
  100. def synchronized(obj, lock=None):
  101.     assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
  102.  
  103.     if isinstance(obj, ctypes._SimpleCData):
  104.         return Synchronized(obj, lock)
  105.     elif isinstance(obj, ctypes.Array):
  106.         if obj._type_ is ctypes.c_char:
  107.             return SynchronizedString(obj, lock)
  108.         return SynchronizedArray(obj, lock)
  109.     else:
  110.         cls = type(obj)
  111.         try:
  112.             scls = class_cache[cls]
  113.         except KeyError:
  114.             names = [field[0] for field in cls._fields_]
  115.             d = dict((name, make_property(name)) for name in names)
  116.             classname = 'Synchronized' + cls.__name__
  117.             scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
  118.         return scls(obj, lock)
  119.  
  120. #
  121. # Functions for pickling/unpickling
  122. #
  123.  
  124. def reduce_ctype(obj):
  125.     assert_spawning(obj)
  126.     if isinstance(obj, ctypes.Array):
  127.         return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
  128.     else:
  129.         return rebuild_ctype, (type(obj), obj._wrapper, None)
  130.  
  131. def rebuild_ctype(type_, wrapper, length):
  132.     if length is not None:
  133.         type_ = type_ * length
  134.     ForkingPickler.register(type_, reduce_ctype)
  135.     obj = type_.from_address(wrapper.get_address())
  136.     obj._wrapper = wrapper
  137.     return obj
  138.  
  139. #
  140. # Function to create properties
  141. #
  142.  
  143. def make_property(name):
  144.     try:
  145.         return prop_cache[name]
  146.     except KeyError:
  147.         d = {}
  148.         exec template % ((name,)*7) in d
  149.         prop_cache[name] = d[name]
  150.         return d[name]
  151.  
  152. template = '''
  153. def get%s(self):
  154.    self.acquire()
  155.    try:
  156.        return self._obj.%s
  157.    finally:
  158.        self.release()
  159. def set%s(self, value):
  160.    self.acquire()
  161.    try:
  162.        self._obj.%s = value
  163.    finally:
  164.        self.release()
  165. %s = property(get%s, set%s)
  166. '''
  167.  
  168. prop_cache = {}
  169. class_cache = weakref.WeakKeyDictionary()
  170.  
  171. #
  172. # Synchronized wrappers
  173. #
  174.  
  175. class SynchronizedBase(object):
  176.  
  177.     def __init__(self, obj, lock=None):
  178.         self._obj = obj
  179.         self._lock = lock or RLock()
  180.         self.acquire = self._lock.acquire
  181.         self.release = self._lock.release
  182.  
  183.     def __reduce__(self):
  184.         assert_spawning(self)
  185.         return synchronized, (self._obj, self._lock)
  186.  
  187.     def get_obj(self):
  188.         return self._obj
  189.  
  190.     def get_lock(self):
  191.         return self._lock
  192.  
  193.     def __repr__(self):
  194.         return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
  195.  
  196.  
  197. class Synchronized(SynchronizedBase):
  198.     value = make_property('value')
  199.  
  200.  
  201. class SynchronizedArray(SynchronizedBase):
  202.  
  203.     def __len__(self):
  204.         return len(self._obj)
  205.  
  206.     def __getitem__(self, i):
  207.         self.acquire()
  208.         try:
  209.             return self._obj[i]
  210.         finally:
  211.             self.release()
  212.  
  213.     def __setitem__(self, i, value):
  214.         self.acquire()
  215.         try:
  216.             self._obj[i] = value
  217.         finally:
  218.             self.release()
  219.  
  220.     def __getslice__(self, start, stop):
  221.         self.acquire()
  222.         try:
  223.             return self._obj[start:stop]
  224.         finally:
  225.             self.release()
  226.  
  227.     def __setslice__(self, start, stop, values):
  228.         self.acquire()
  229.         try:
  230.             self._obj[start:stop] = values
  231.         finally:
  232.             self.release()
  233.  
  234.  
  235. class SynchronizedString(SynchronizedArray):
  236.     value = make_property('value')
  237.     raw = make_property('raw')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement