"""
Proxy Device: decorators that expose a class and chose methods/properties
through network, using RpyC backend for communication.
Example code:
::
@proxydevice(address=("127.0.0.1", 5055))
class A:
def __init__(self, x=1):
self.x = x
self.a = "abc"
self.stop = False
# A non-exposed call
def do_something(self, y):
self.x += y
# An exposed call
@proxycall()
def get_multiple(self, y):
return self.x * y
# An exposed call allowed only for the client with admin rights
@proxycall(admin=True)
def set_a(self, a):
self.a = a
# A long task. Must be made non-blocking otherwise the sever will wait for return value
@proxycall(admin=True, block=False)
def long_task(self):
for i in range(10):
print(chr(i + 65))
time.sleep(1)
if self.stop:
self.stop = False
break
return 1
# Declaring the abort call, to be sent when ctrl-C is hit during a long call.
@proxycall(interrupt=True)
def abort(self):
print("Aborting the long call!")
self.stop = True
# An exposed property
@proxycall()
@property
def x_value(self):
return self.x
@x_value.setter
def x_value(self, v):
self.x = v
Create a normal instance:
::
a = A()
Or on one computer:
::
server = A.Server()
On another computer:
::
a = A.Client()
# now a has all methods that have been exposed by the proxycall decorator
a.get_multiple(5)
-> 5
This file is part of lab-control-lib
(c) 2023-2024 Pierre Thibault (pthibault@units.it)
"""
import logging
import rpyc
import atexit
import threading
import inspect
import time
import sys
import traceback
import builtins
import pickle
import enum
from .util import Future
from .logs import logger as rootlogger
__all__ = ['proxydevice', 'proxycall', 'ProxyDeviceError']
logger = logging.getLogger(__name__)
def _m(obj):
"""
Marshaller to avoid rpyc netrefs.
"""
return pickle.dumps(obj)
def _um(s):
"""
Unmarshaller.
"""
return pickle.loads(s)
[docs]
class ProxyDeviceError(Exception):
pass
# Register custom error
rpyc.core.vinegar._generic_exceptions_cache[
"lclib.util.proxydevice.ProxyDeviceError"] = ProxyDeviceError
class ThreadedServer(rpyc.ThreadedServer):
"""
Minimal subclass that adds a callback for disconnect events.
"""
def __init__(self, *args, **kwargs):
self._disconnect_callback = kwargs.pop("disconnect_callback", None)
super().__init__(*args, **kwargs)
def _handle_connection(self, conn):
super()._handle_connection(conn)
if self._disconnect_callback:
self._disconnect_callback(conn)
class WrapServiceBase(rpyc.Service):
"""
Base class (dynamically augmented with other methods for the rpyc service.
"""
# To be replaced by server instance in the dynamically generated subclass
server = None
def on_connect(self, conn):
"""
Manage new client connection
"""
# Store client connection object
self.conn = conn
# Thread id is used as unique id for this service (and this client)
self.id = threading.get_ident()
# Notify server of new connection
self.server.new_client(self.id, self.conn)
# Call parent class connect
super().on_connect(conn)
def exposed_create_instance(self, args, kwargs):
"""
Attempt to create instance
"""
return self.server.create_instance(args=args, kwargs=kwargs)
def exposed_ask_admin(self, admin=None, force=None):
"""
Request to become admin
"""
return self.server.ask_admin(admin=admin, force=force)
def exposed_kill(self):
"""
Kill the server.
"""
self.server.stop()
def exposed_abort(self):
"""
Abort call.
"""
if self.server.interrupt_method is None:
self.server.logger.error("Abort requested but no interrupt method exists!")
return
return self.server.interrupt_method()
@classmethod
def _new_exposed_method(cls, name, admin, block):
"""
Add an "exposed" method to the server service class. The resulting
method can be called by the client.
Parameters:
name (str): The name of the instance method
admin (bool): If True, admin rights are required
block (bool): If False, call instance method on a separate thread and
return immediately
"""
if block:
# Normal case: a method that grabs the lock and run the method
def method(service_self, args, kwargs):
# Check if admin rights are required
if admin and not service_self.server.is_admin:
raise ProxyDeviceError(
f"Non-admin clients cannot run method {name}."
)
# Find the method to call in the object instance
instance_method = getattr(service_self.server.instance, name)
# Call the instance method
with service_self.server.lock:
result = instance_method(*_um(args), **_um(kwargs))
return _m({"result": result})
else:
# Non-blocking call: we need to call the method on a separate thread and return
def method(service_self, args, kwargs):
# Check if admin rights are required
if admin and not service_self.server.is_admin:
raise ProxyDeviceError(
f"Non-admin clients cannot run method {name}."
)
# Find the method to call in the object instance
instance_method = getattr(service_self.server.instance, name)
# Check if another non-blocking call is already running
if service_self.server.awaiting_result is not None:
raise ProxyDeviceError(
f"Current of past non-blocking call still pending."
)
# Grab the lock: better making sure that no-one else is toying
# with the server for the next steps
with service_self.server.lock:
# Grab client connection to send result as a callback
c = service_self.server.this_conn()
# Define callback function
def callback(result, error):
# Send result or error to client
result_and_error = _m({"result": result, "error": error})
try:
c.root.notify_result(result_and_error)
except EOFError:
# This happens if keyboard interrupt killed the client
pass
# Either way we are done with this call so we reset the
# attribute holding the thread.
service_self.server.awaiting_result = None
# Create the thread and start it
service_self.server.awaiting_result = Future(
instance_method,
args=_um(args),
kwargs=_um(kwargs),
callback=callback,
)
return _m({"result": None})
# Attach the method to the service with "exposed_" prefix as per rpyc
setattr(cls, f"exposed_{name}", method)
@classmethod
def _new_exposed_property(cls, name, admin):
"""
Add "exposed" property getter and setter to the server service class.
The resulting method will be called by the client through usual
property interaction.
Parameters:
name (str): The name of the property
admin (bool): If True, admin rights are required
"""
# Getter
def get_method(service_self):
# Call getattr on the instance
with service_self.server.lock:
result = getattr(service_self.server.instance, name)
return _m({"result": result})
# Setter
def set_method(service_self, value):
# Check if admin rights are required
if admin and not service_self.server.is_admin:
raise ProxyDeviceError(f"Non-admin clients cannot set property {name}.")
# Call setattr on the instance
with service_self.server.lock:
setattr(service_self.server.instance, name, _um(value))
return _m({"result": None})
# Attach the two methods to the service.
setattr(cls, f"exposed__get_{name}", get_method)
setattr(cls, f"exposed__set_{name}", set_method)
class ClientServiceBase(rpyc.Service):
"""
The base class for the client service exposed to the server.
"""
# To be replaced by the actual client object when subclassed
client = None
# To be replaced by a real logger when the object is subclassed
exposed_logger = None
def on_connect(self, conn):
"""
Manage new connection
This is the service exposed by the client to the remote server
"""
# Store server connection object
self.conn = conn
# Call parent class connect
super().on_connect(conn)
def exposed_print(self, *objects, sep=' ', end='\n', file=sys.stdout, flush=False):
"""
Print string locally
"""
builtins.print(*objects, sep=sep, end=end, file=file, flush=flush)
def exposed_input(self, prompt=None):
"""
Get input locally
"""
return input(prompt)
def exposed_notify_result(self, result_and_error):
"""
Called by server's "awaiting_result" thread when done.
"""
self.client.logger.debug("Non-blocking call finished.")
self.client.awaited_result = _um(result_and_error)
self.client.result_flag.set()
class ProxyClientBase:
"""
Base class for Proxy Client.
"""
# These class attributes are set in subclasses created by the proxydevice decorator
ADDRESS = None
API = None
SERVE_INTERVAL = 0.0
SLEEP_INTERVAL = 0.1
RECONNECT_INTERVAL = 3.0
def __init__(self, admin=True, name=None, args=None, kwargs=None, clean=True, reconnect='if_successful', address=None):
"""
Base class for client proxy. Subclasses are created dynamically by the
`proxydevice` decorator.
Parameters:
admin (bool): Whether admin rights should be requested
name (str): an optional name for this client
args (tuple): args to pass to the server if the remote object has not
been instantiated
kwargs (dicts): same as args above
clean (bool): If false, non-blocking calls will not "fake block"
awaiting result.
reconnect: one of 'if_successful' (default), 'always', or 'never'
address (str, int): (IP, port) of the server. If None, the class
attribute ADDRESS will be used.
"""
self.name = self.__class__.__name__
self.client_name = name or self.name
self.clean = clean
self.reconnect = reconnect
if address is not None:
self.ADDRESS = address
# Statistics
self.stats = {'startup': time.time(),
'reply_number': 0,
'total_reply_time': 0.,
'total_reply_time2': 0.,
'min_reply_time': 100.,
'max_reply_time': 0.,
'last_reply_time': 0.}
# Create logger
self.logger = rootlogger.getChild(self.__class__.__name__)
# rpyc connection
self.conn = None
self.serving_thread = None
self._connection_failed = False
self.first_connect = True
self._active = False
self._terminate = False
# Try to connect
self._connected = self.connect()
# Result-ready-flag
self.result_flag = threading.Event()
self.awaited_result = None
# Instantiate remote object if needed
if args is not None or kwargs is not None:
if not self._connected:
raise RuntimeError(f'Client {name} is not connected, so cannot instantiate remote instance' )
self.conn.root.create_instance(args, kwargs)
# Ask for admin
if self._connected:
self.conn.root.ask_admin(admin=admin)
# For thread clean up
atexit.register(self._stop)
def connect(self):
"""
Initial connection.
If self.reconnect == 'always', this function returns immediately even if
the connection failed (it will keep trying in the background). Otherwise,
this function will return only when the connection is established
to continue initialization.
Returns:
True if connectect, False if not (yet) connected
"""
def catch_result(r, e):
if e is None:
return
self._connection_failed = True
self.serving_thread = Future(self._serve, callback=catch_result)
# Wait for connection to be established.
while True:
if self.conn is not None:
return True
if self._connection_failed:
raise ProxyDeviceError('Connection failed')
if self.reconnect == 'always':
return False
time.sleep(0.05)
@property
def connected(self):
return self._connected
def disconnect(self):
"""
Disconnect from server
"""
self._stop()
self._terminate = True
def kill_server(self):
try:
self.conn.root.kill()
except EOFError:
# This is normal - the connection was lost mid-way
pass
except:
raise
self.disconnect()
def _serve(self):
"""
Serve rpyc incoming connections. This replaces rpyc.BgServingThread, which
is not robust enough to disconnect events.
All exceptions have to be caught because this is running on a separate thread.
"""
# Wrap everything in a loop for reconnect.
while not self._terminate:
try:
# rpyc connection
self.conn = rpyc.connect(
service=self._create_service(),
host=self.ADDRESS[0],
port=self.ADDRESS[1],
)
except ConnectionRefusedError:
# No server present
if (self.reconnect != 'always') or ((self.reconnect == 'if_successful') and self.first_connect) or (self.reconnect == 'never'):
self.logger.error(f"Connection to {self.ADDRESS} refused. Is the server running?")
raise
# Try reconnecting
if self.first_connect:
self.logger.info(f"Connection for {self.name} to {self.ADDRESS} not yet established. Retrying...")
else:
self.logger.info(f"Connection for {self.name} to {self.ADDRESS} is lost. Reconnecting...")
time.sleep(self.RECONNECT_INTERVAL)
continue
# Connected!
self._connected = True
if self.first_connect:
self.logger.info(f"Connected to {self.ADDRESS}.")
self.first_connect = False
else:
self.logger.info(f"Reconnected.")
# Start serving
self._active = True
try:
while self._active:
self.conn.serve(self.SERVE_INTERVAL)
time.sleep(self.SLEEP_INTERVAL) # to reduce contention
break
except EOFError:
# Connection closed!
self.logger.warning("Connection lost.")
if self.reconnect != 'never':
self._connected = False
continue
raise
try:
self.conn.close()
except Exception:
# We don't really care if closing the connection didn't work, we are
# Heading out anyway.
pass
self._connected = False
self.logger.info("Connection closed.")
def ask_admin(self, admin=None, force=None):
"""
Query or request admin status
ask_admin() -> current admin status (True or False)
ask_admin(admin=True) -> request admin status
ask_admin(admin=False) -> rescind admin status
ask_admin(admin=True, force=True) -> force admin status
"""
return self.conn.root.ask_admin(admin=admin, force=force)
def abort(self):
"""
Call the remote interrupt method.
"""
self.conn.root.abort()
def _stop(self):
"""
Close connections
"""
self._active = False
def _create_service(self):
"""
Create a service subclass with appropriate class attributes.
"""
class ClientService(ClientServiceBase):
client = self
exposed_logger = rpyc.restricted(
self.logger, ["debug", "info", "warning", "error", "critical"]
)
return ClientService
def _update_stats(self, t0, t1):
"""
Update internal timing statistics.
"""
dt = t1 - t0
self.stats['reply_number'] += 1
self.stats['total_reply_time'] += dt
self.stats['total_reply_time2'] += dt * dt
minr = self.stats['min_reply_time']
maxr = self.stats['max_reply_time']
self.stats['min_reply_time'] = min(dt, minr)
self.stats['max_reply_time'] = max(dt, maxr)
self.stats['last_reply_time'] = t0
@classmethod
def _new_property(cls, name, doc):
"""
Add property to subclass, connected to remote object call.
Parameters:
name (str): property name
doc (str): doc string
"""
# Create getter
def fget(client_self):
t0 = time.time()
method = getattr(client_self.conn.root, f"_get_{name}")
reply = _um(method())
client_self._update_stats(t0, time.time())
return reply["result"]
# Create setter
def fset(client_self, value):
t0 = time.time()
method = getattr(client_self.conn.root, f"_set_{name}")
method(_m(value))
client_self._update_stats(t0, time.time())
# Set name
fget.__name__ = name
fset.__name__ = name
# Create property
new_prop = property(fget, fset, None, doc=doc)
setattr(cls, name, new_prop)
@classmethod
def _new_method(cls, name, doc, signature, block=True):
"""
Add method to subclass, connected to remote object call.
Parameters:
name (str): property name
doc (str): doc string
signature (str): method signature. Used for documentation
block (bool): whether the call will be blocking on server side.
"""
# Create method that calls the remote method
if block:
# In blocking mode, we just request the result and wait
def method(client_self, *args, **kwargs):
t0 = time.time()
service_method = getattr(client_self.conn.root, name)
reply = _um(service_method(_m(args), _m(kwargs)))
client_self._update_stats(t0, time.time())
return reply["result"]
else:
# In non-blocking mode, we have to wait for result
# and catch keyboard interrupts to try and abort the command
def method(client_self, *args, **kwargs):
t0 = time.time()
# Find remote method to call
service_method = getattr(client_self.conn.root, name)
# This calls the remote method, but since it is non-blocking it returns immediately
reply = _um(service_method(_m(args), _m(kwargs)))
# Timing statistics are about call delays so we measure time now
client_self._update_stats(t0, time.time())
# Reset awaited result
client_self.awaited_result = None
client_self.result_flag.clear()
if not client_self.clean:
# clean is False, do not "fake-block"
return reply
# Wait for result. This loop catches keyboard interrupts
emergency_stop = False
while True:
if emergency_stop:
# We are here because a keyboard interrupt was set
client_self.abort()
return
try:
# Wait for result_flag to be set
if client_self.result_flag.wait(1):
# result_flag has been set. Clear it
client_self.result_flag.clear()
# Grab the result
reply = client_self.awaited_result
error = reply.pop("error", None)
if error:
raise error
# return the result
return reply["result"]
except KeyboardInterrupt:
# Set switch and continue to call abort.
emergency_stop = True
continue
# Set method name and documentation
method.__name__ = name
doc = f"{name}{signature}\n" + doc
method.__doc__ = doc
# Attach method to subclass
setattr(cls, name, method)
class ProxyServerBase:
"""
Holding the device instance and serving clients.
"""
# These class attributes are set in subclasses created by the proxydevice decorator
ADDRESS = None
API = None
CLS = None
def __init__(self, instantiate=True, instance_args=None, instance_kwargs=None, address=None):
"""
Base class for server proxy. Subclasses are created dynamically by the
proxydevice decorator and attaches the defaults ADDRESS, API and CLS as
class attributes.
Parameters:
instantiate (bool): if True, create immediately the internal instance
of self.cls, using the provided args/kwargs. If
False, instantiation will proceed with the first
client connection.
instance_args/kwargs: args, kwargs to pass for class instantiation.
address (str, int): (IP, port) of the serving address. If None, the
class attribute ADDRESS will be used.
"""
# Create logger
self.logger = rootlogger.getChild(self.__class__.__name__)
self.name = self.__class__.__name__.lower()
if address is not None:
self.ADDRESS = address
# instance of the class cls once we have received the initialization
# parameters from the first client (or provided here at construction)
self.instance = None
# The instance method that gets called when an emergency stop is requested
self.interrupt_method = None
# The non-blocking thread
self.awaiting_result = None
# Dict of connected clients
self.clients = {}
# Lock to ensure instance is accessed synchronously
self.lock = threading.Lock()
# The rpyc server runs itself on a thread
self.serving_thread = None
self.rpyc_server = None
# A variable telling who is admin
self.admin = None
atexit.register(self.stop)
if instantiate:
self.create_instance(args=instance_args, kwargs=instance_kwargs)
# Create the service from API
self.service = self._create_service()
# Start serving
self.activate()
def activate(self):
"""
Start serving
"""
# Create rpyc threaded server
self.rpyc_server = ThreadedServer(
service=self.service,
port=self.ADDRESS[1],
protocol_config={
"allow_all_attrs": True,
"allow_setattr": True,
"allow_delattr": True,
},
disconnect_callback=self.del_client,
)
# Replace print and input
self.logger.info("Rerouting 'print' and 'input'")
mods = [sys.modules[cn.__module__] for cn in self.instance.__class__.__mro__ if cn.__module__ != 'builtins']
for mod in mods:
mod.print = self._proxy_print
mod.input = self._proxy_input
# Start server on separate thread
self.serving_thread = threading.Thread(
target=self.rpyc_server.start, daemon=True
)
self.serving_thread.start()
def wait(self):
"""
Wait until the server stops.
"""
if self.serving_thread is not None and self.serving_thread.is_alive():
self.serving_thread.join()
def stop(self):
"""
Stop serving.
"""
try:
self.rpyc_server.close()
except AttributeError:
# rpyc_server might already be None
pass
# Clean up
self.logger.info("Reseting builtin 'print' and 'input'")
mods = [sys.modules[cn.__module__] for cn in [self.instance.__class__] + list(self.instance.__class__.__bases__) if cn.__module__ != 'builtins']
for mod in mods:
mod.print = builtins.print
mod.input = builtins.input
def _create_service(self):
"""
Factory for rpyc Service class based on API.
This looks complicated because of the dynamically generated
methods.
"""
# Create subclass for the rpyc service
WrapService = type("WrapService", (WrapServiceBase,), {})
# Attach this instance of the server
WrapService.server = self
# Create exposed methods for all elements of the API
for name, api_info in self.API.items():
if api_info["property"]:
WrapService._new_exposed_property(name, api_info["admin"])
else:
WrapService._new_exposed_method(
name, api_info["admin"], block=api_info["block"]
)
return WrapService
@property
def this_id(self):
return threading.get_ident()
def this_conn(self):
"""
Return the connection object for this thread
"""
# The id of the calling client
id = self.this_id
c = self.clients.get(id, None)
if c is None:
raise ProxyDeviceError(f"Thread {id} has no associated connected client!")
return c
def ask_admin(self, admin, force):
"""
Manage admin requests
"""
# This is the proxy for the remote (client) logger
rlogger = self.this_conn().root.logger
id = self.this_id
if admin is None:
if self.admin is None:
rlogger.warning("No client is currently admin")
return False
is_admin = self.admin == id
if is_admin:
rlogger.info("Client is admin")
else:
rlogger.info("Client is not admin")
return is_admin
if admin:
if self.admin is None:
self.admin = id
return True
elif self.admin == id:
rlogger.warning("Client aldeady admin")
return True
elif force:
self.admin = id
rlogger.info("Client now admin (forced)")
return True
else:
return False
else:
if self.admin != id:
rlogger.warning("Client was not admin")
return None
self.admin = None
rlogger.info("Client not admin anymore")
return True
@property
def is_admin(self):
"""
True only if current client is admin.
"""
return self.admin == self.this_id
def _proxy_print(self, *objects, sep=' ', end='\n', file=None, flush=False):
"""
Print locally and on client.
"""
# Print to stdout
builtins.print(*objects, sep=sep, end=end, file=file, flush=flush)
# Print to client stdout
cl_conn = self.clients.get(self.admin, None)
if cl_conn is not None:
try:
cl_conn.root.print(*objects, sep=sep, end=end, file=file, flush=flush)
except:
builtins.print(traceback.format_exc())
self.logger.error('Remote printing failed.')
def _proxy_input(self, prompt=None):
"""
Input through client.
"""
cl_conn = self.clients.get(self.admin, None)
if cl_conn is None:
raise ProxyDeviceError("Cannot use input without admin client!")
return cl_conn.root.input(prompt=prompt)
def new_client(self, id, conn):
"""
Called by a service on a new client connection, from it's own thread. Stores
the rpyc connection object for future interactions.
"""
self.clients[id] = conn
def del_client(self, conn):
"""
Called by the ThreadedServer instance upon disconnect.
"""
id = self.this_id
if not self.clients.pop(id, None):
self.logger.error("Disconnecting client not found!")
elif id == self.admin:
self.logger.info(f"Admin client {id} disconnected")
self.admin = None
else:
self.logger.info(f"Client {id} disconnected")
def create_instance(self, args=None, kwargs=None):
"""
Create the instance of the wrapped class, using args and kwargs as initialization parameters
"""
args = args or ()
kwargs = kwargs or {}
# Raise error if instance already exists
if self.instance is not None:
raise RuntimeError(
f"Instance of class {self.instance.__class__.__name__} already exists."
)
# Instantiate the wrapped object
try:
self.instance = self.CLS(*args, **kwargs)
except BaseException as error:
self.logger.critical("Class instantiation failed!", exc_info=True)
self._stopping = True
self.instance = None
raise
# Look for interrupt call
self.interrupt_method = None
for method_name, api_info in self.API.items():
if api_info.get("interrupt"):
self.interrupt_method = getattr(self.instance, method_name)
self.logger.info(f"Method {method_name} is the abort call.")
self.logger.info("Created instance of wrapped class.")
[docs]
class proxycall:
"""
Decorator to tag a method or property to be exposed for remote access.
"""
def __init__(self, admin=False, block=True, interrupt=False, **kwargs):
"""
Decorator to tag a method or property to be exposed for remote access.
Parameters:
admin (bool): whether admin rights are required to execute command.
block (bool): Wait for the function to return.
interrupt (bool): if True, declare this method as the method to call
when SIG_INT is caught on client side.
kwargs: anything else that might be needed in the future.
"""
self.admin = admin
self.block = block
self.interrupt = interrupt
self.kwargs = kwargs
def __call__(self, f):
"""
Decorator call.
This attaches a dictionary called api_call to methods and properties,
which are then scanned by the proxydevice decorator.
"""
api_info = {
"admin": self.admin,
"block": self.block,
"interrupt": self.interrupt,
}
api_info.update(self.kwargs)
if type(f) is property:
api_info["property"] = True
api_info["doc"] = f.fget.__doc__
api_info["name"] = f.fget.__name__
api_info["signature"] = None
f.fget.api_info = api_info
else:
api_info["property"] = False
api_info["doc"] = f.__doc__
api_info["name"] = f.__name__
api_info["signature"] = str(inspect.signature(f))
f.api_info = api_info
return f
[docs]
class proxydevice:
"""
Decorator that does the main magic.
"""
def __init__(self, address=None, clean=True, stream=True, **kwargs):
"""
Decorator initialization.
Parameters:
address (str, int): (IP, port)of the serving address. If None, will
have to be provided as an argument.
clean (bool): whether the client side should receive replies in the
same format as for the native class. If false, blocking
calls return immediately.
stream (bool): If True, mirror locally device's stdout and stderr
kwargs: kept for compatibility
"""
self.address = address
self.clean = clean
self.stream = stream
def __call__(self, cls):
"""
Decorator call. This creates a ServerBase and a ClientBase subclass.
The latter gets populated with all fake methods and properties that
make calls to the server through the rpyc connection.
"""
# Extract API info
API = {}
# Looping through __dict__ misses all methods of parent class(es)
# So we need to do it with getattr
for k in dir(cls):
try:
v = getattr(cls, k)
if type(v) is property:
api_info = v.fget.api_info
else:
api_info = v.api_info
except AttributeError:
continue
API[k] = api_info
# Define server subclass and set default values
proxyserver_name = f"{cls.__name__}ProxyServer"
Server = type(proxyserver_name, (ProxyServerBase,), {})
Server.ADDRESS = self.address
Server.API = API
Server.CLS = cls
# Define client subclass
proxyclient_name = f"{cls.__name__}ProxyClient"
Client = type(proxyclient_name, (ProxyClientBase,), {})
Client.ADDRESS = self.address
Client.API = API
# Create all fake methods and properties for Client
for name, api_info in API.items():
signature = api_info["signature"] or "(*args, **kwargs)"
doc = api_info["doc"] or ""
try:
if api_info["property"]:
Client._new_property(name, doc)
logger.debug(f"{proxyclient_name}: Inserted property '{name}'.")
else:
Client._new_method(name, doc, signature, block=api_info["block"])
logger.debug(f"{proxyclient_name}: Inserted method '{name}'.")
if api_info["interrupt"]:
logger.debug(f"{proxyclient_name}: Method '{name}' is the abort call.")
except AttributeError:
continue
# Attach server and client objects to decorated class
cls.Server = Server
cls.Client = Client
return cls