Dummy MotorΒΆ

This section contains the dummymotor script.

Download file: dummymotor.py

  1"""
  2Dummy motor driver
  3
  4This file is part of lab-control-lib
  5(c) 2023-2024 Pierre Thibault (pthibault@units.it)
  6"""
  7
  8import time
  9
 10from lclib import register_driver, proxycall, proxydevice
 11from lclib.base import MotorBase, SocketDriverBase, emergency_stop
 12
 13__all__ = ['Dummymotor', 'Motor', 'DummyControllerInterface']
 14
 15ADDRESS = ('localhost', 5050)  # Address for the proxy driver
 16DEVICE_ADDRESS = ('localhost', 10000)  # Address of the (fake) controller. In reality that would be another device on the LAN
 17
 18@register_driver
 19@proxydevice(address=ADDRESS)
 20class Dummymotor(SocketDriverBase):
 21    """
 22    Socket driver example for a dummy motor.
 23    """
 24
 25    DEFAULT_DEVICE_ADDRESS = DEVICE_ADDRESS
 26    POLL_INTERVAL = 0.01     # temporization for rapid status checks during moves.
 27    EOL = b'\n'
 28
 29    def __init__(self, device_address=None):
 30        """
 31        Initialize socket driver.
 32        """
 33        device_address = device_address or self.DEFAULT_DEVICE_ADDRESS
 34
 35        # Register periodic (heartbeat) calls to avoid disconnect
 36        self.periodic_calls = {'status': (self.status, 10.)}
 37
 38        # Initialize driver
 39        super().__init__(device_address=device_address)
 40
 41        # Declare the motor value as metadata call.
 42        self.metacalls.update({'position': self.get_pos})
 43
 44    def init_device(self):
 45        """
 46        Device initialization.
 47        """
 48        reply = self.device_cmd(b'DO_INIT\n')
 49        self.logger.info('Do init replied %s' % reply.strip())
 50
 51        self.logger.info("Dummy initialization complete.")
 52        self.initialized = True
 53
 54    @proxycall()
 55    def status(self):
 56        """
 57        Dummy driver status.
 58        """
 59        return self.device_cmd(b'STATUS\n').strip()
 60
 61    @proxycall(interrupt=True)
 62    def abort(self):
 63        """
 64        Emergency stop.
 65        """
 66        self.logger.info("ABORTING DUMMY!")
 67        reply = self.device_cmd(b'ABORT\n')
 68        return
 69
 70    @proxycall()
 71    def get_pos(self):
 72        """
 73        Dummy position
 74        """
 75        pos = self.device_cmd(b'GET_POSITION\n')
 76        return float(pos.strip())
 77
 78    @proxycall(admin=True, block=False)
 79    def set_pos(self, value):
 80        """
 81        Dummy set position
 82        """
 83        self.device_cmd('SET_POSITION %f\n' % value)
 84        self.check_done()
 85        return self.get_pos()
 86
 87    @proxycall()
 88    def check_done(self):
 89        """
 90        Poll until movement is complete.
 91        """
 92        if self.status() == b'IDLE':
 93            return
 94        with emergency_stop(self.abort):
 95            while True:
 96                # query axis status
 97                status = self.status()
 98                if status == b'IDLE':
 99                    break
100                # Temporise
101                time.sleep(self.POLL_INTERVAL)
102        self.logger.info("Finished moving dummy stage.")
103
104    @proxycall()
105    @property
106    def pos(self):
107        """
108        Dummy position
109        """
110        pos = self.device_cmd(b'GET_POSITION\n')
111        return float(pos.strip())
112
113
114@Dummymotor.register_motor('dummy')
115class Motor(MotorBase):
116    def __init__(self, name, driver):
117        super(Motor, self).__init__(name, driver)
118
119    def _get_pos(self):
120        """
121        Return position in degrees
122        """
123        return self.driver.get_pos()
124
125    def _set_abs_pos(self, x):
126        """
127        Set absolute position
128        """
129        return self.driver.set_pos(x)
130
131import socket
132from lclib.base import _recv_all
133
134class DummyControllerInterface:
135
136    def __init__(self, timeout=5., latency=0.):
137        """
138        Bare-bones fake socket device accepting a single connection and processing a few commands.
139        """
140        self.timeout = timeout
141        self.latency = latency
142
143        self.client_sock = socket.socket(
144            socket.AF_INET, socket.SOCK_STREAM)  # TCP socket
145        self.client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
146        self.client_sock.settimeout(1)
147        self.client_sock.bind(DEVICE_ADDRESS)
148        self.client_sock.listen(5)
149        self.client = None
150        print('Accepting connections.')
151
152        # Initial internal state
153        self.initialized = False
154        self.pos = 0.
155        self.in_error = False
156        self.in_motion = False
157        self.speed =  1.
158        self.limits = (-20., 20.)
159
160        self.listen()
161
162    def listen(self):
163        """
164        Infinite loop waiting for a connection and serving the connected client.
165        """
166        while True:
167            try:
168                client, address = self.client_sock.accept()
169                print('Client connected')
170                client.settimeout(self.timeout)
171                self.serve(client)
172            except socket.timeout:
173                continue
174
175    def serve(self, client):
176        """
177        Serve the newly connected client
178        """
179        t_start = 0.
180        t_end = 0.
181        start_pos = self.pos
182        end_pos = self.pos
183        direction = 1
184        while True:
185            # Read data
186            try:
187                data = _recv_all(client).strip().decode('ascii')
188            except socket.timeout:
189                continue
190            print(f'Received: {data} | ', end=' ')
191
192            t = time.time()
193            if self.in_motion:
194                # Compute new position if the motor was last set in motion
195                if t < t_end:
196                    # Motion is not over
197                    self.pos = start_pos + direction*self.speed*(t-t_start)
198                    self.in_motion = True
199                else:
200                    # Motion is done
201                    self.in_motion = False
202                    self.pos = end_pos
203
204            # Manage commands
205            if data == 'DO_INIT':
206                self.initialized = True
207                reply = 'OK: INIT'
208            if not self.initialized:
209                reply = 'ERROR: NOT INITIALIZED'
210            elif data == 'STATUS':
211                if self.in_motion:
212                    reply = 'MOVING'
213                else:
214                    reply = 'IDLE'
215            elif data == 'ABORT':
216                if self.in_motion:
217                    self.in_motion = False
218                    reply = 'OK: ABORTED'
219                else:
220                    reply = 'ERROR: NOTHING TO ABORT'
221            elif data == 'GET_POSITION':
222                reply = '%f' % self.pos
223            elif data.startswith('SET_POSITION'):
224                if self.in_motion:
225                    reply = 'ERROR: STILL MOVING'
226                else:
227                    try:
228                        end_pos = float(data.strip('SET_POSITION'))
229                    except:
230                        reply = 'ERROR: WRONG SYNTAX'
231                    else:
232                        if end_pos < self.limits[0]:
233                            reply = 'ERROR: LOWER LIMIT'
234                        elif end_pos > self.limits[1]:
235                            reply = 'ERROR: UPPER LIMIT'
236                        else:
237                            # Motion is valid
238                            start_pos = self.pos
239                            t_start = t
240                            t_end = t + abs(end_pos - start_pos)/self.speed
241                            direction = 1 if end_pos > start_pos else -1
242                            self.in_motion = True
243                            reply = 'OK'
244            else:
245                reply = 'ERROR: UNKNOWN COMMAND'
246
247            # Return to client
248            time.sleep(self.latency)
249            print(f'Sent: {reply}')
250            client.sendall((reply+'\n').encode())