Dummy MotorΒΆ
This section contains the dummymotor script.
Download file: dummymotor.py
1"""
2Dummy motor driver
3
4This file is part of lab-control-lib
5(c) 2023-2024 Pierre Thibault (pthibault@units.it)
6"""
7
8import time
9
10from lclib import register_driver, proxycall, proxydevice
11from lclib.base import MotorBase, SocketDriverBase, emergency_stop
12
13__all__ = ['Dummymotor', 'Motor', 'DummyControllerInterface']
14
15ADDRESS = ('localhost', 5050) # Address for the proxy driver
16DEVICE_ADDRESS = ('localhost', 10000) # Address of the (fake) controller. In reality that would be another device on the LAN
17
18@register_driver
19@proxydevice(address=ADDRESS)
20class Dummymotor(SocketDriverBase):
21 """
22 Socket driver example for a dummy motor.
23 """
24
25 DEFAULT_DEVICE_ADDRESS = DEVICE_ADDRESS
26 POLL_INTERVAL = 0.01 # temporization for rapid status checks during moves.
27 EOL = b'\n'
28
29 def __init__(self, device_address=None):
30 """
31 Initialize socket driver.
32 """
33 device_address = device_address or self.DEFAULT_DEVICE_ADDRESS
34
35 # Register periodic (heartbeat) calls to avoid disconnect
36 self.periodic_calls = {'status': (self.status, 10.)}
37
38 # Initialize driver
39 super().__init__(device_address=device_address)
40
41 # Declare the motor value as metadata call.
42 self.metacalls.update({'position': self.get_pos})
43
44 def init_device(self):
45 """
46 Device initialization.
47 """
48 reply = self.device_cmd(b'DO_INIT\n')
49 self.logger.info('Do init replied %s' % reply.strip())
50
51 self.logger.info("Dummy initialization complete.")
52 self.initialized = True
53
54 @proxycall()
55 def status(self):
56 """
57 Dummy driver status.
58 """
59 return self.device_cmd(b'STATUS\n').strip()
60
61 @proxycall(interrupt=True)
62 def abort(self):
63 """
64 Emergency stop.
65 """
66 self.logger.info("ABORTING DUMMY!")
67 reply = self.device_cmd(b'ABORT\n')
68 return
69
70 @proxycall()
71 def get_pos(self):
72 """
73 Dummy position
74 """
75 pos = self.device_cmd(b'GET_POSITION\n')
76 return float(pos.strip())
77
78 @proxycall(admin=True, block=False)
79 def set_pos(self, value):
80 """
81 Dummy set position
82 """
83 self.device_cmd('SET_POSITION %f\n' % value)
84 self.check_done()
85 return self.get_pos()
86
87 @proxycall()
88 def check_done(self):
89 """
90 Poll until movement is complete.
91 """
92 if self.status() == b'IDLE':
93 return
94 with emergency_stop(self.abort):
95 while True:
96 # query axis status
97 status = self.status()
98 if status == b'IDLE':
99 break
100 # Temporise
101 time.sleep(self.POLL_INTERVAL)
102 self.logger.info("Finished moving dummy stage.")
103
104 @proxycall()
105 @property
106 def pos(self):
107 """
108 Dummy position
109 """
110 pos = self.device_cmd(b'GET_POSITION\n')
111 return float(pos.strip())
112
113
114@Dummymotor.register_motor('dummy')
115class Motor(MotorBase):
116 def __init__(self, name, driver):
117 super(Motor, self).__init__(name, driver)
118
119 def _get_pos(self):
120 """
121 Return position in degrees
122 """
123 return self.driver.get_pos()
124
125 def _set_abs_pos(self, x):
126 """
127 Set absolute position
128 """
129 return self.driver.set_pos(x)
130
131import socket
132from lclib.base import _recv_all
133
134class DummyControllerInterface:
135
136 def __init__(self, timeout=5., latency=0.):
137 """
138 Bare-bones fake socket device accepting a single connection and processing a few commands.
139 """
140 self.timeout = timeout
141 self.latency = latency
142
143 self.client_sock = socket.socket(
144 socket.AF_INET, socket.SOCK_STREAM) # TCP socket
145 self.client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
146 self.client_sock.settimeout(1)
147 self.client_sock.bind(DEVICE_ADDRESS)
148 self.client_sock.listen(5)
149 self.client = None
150 print('Accepting connections.')
151
152 # Initial internal state
153 self.initialized = False
154 self.pos = 0.
155 self.in_error = False
156 self.in_motion = False
157 self.speed = 1.
158 self.limits = (-20., 20.)
159
160 self.listen()
161
162 def listen(self):
163 """
164 Infinite loop waiting for a connection and serving the connected client.
165 """
166 while True:
167 try:
168 client, address = self.client_sock.accept()
169 print('Client connected')
170 client.settimeout(self.timeout)
171 self.serve(client)
172 except socket.timeout:
173 continue
174
175 def serve(self, client):
176 """
177 Serve the newly connected client
178 """
179 t_start = 0.
180 t_end = 0.
181 start_pos = self.pos
182 end_pos = self.pos
183 direction = 1
184 while True:
185 # Read data
186 try:
187 data = _recv_all(client).strip().decode('ascii')
188 except socket.timeout:
189 continue
190 print(f'Received: {data} | ', end=' ')
191
192 t = time.time()
193 if self.in_motion:
194 # Compute new position if the motor was last set in motion
195 if t < t_end:
196 # Motion is not over
197 self.pos = start_pos + direction*self.speed*(t-t_start)
198 self.in_motion = True
199 else:
200 # Motion is done
201 self.in_motion = False
202 self.pos = end_pos
203
204 # Manage commands
205 if data == 'DO_INIT':
206 self.initialized = True
207 reply = 'OK: INIT'
208 if not self.initialized:
209 reply = 'ERROR: NOT INITIALIZED'
210 elif data == 'STATUS':
211 if self.in_motion:
212 reply = 'MOVING'
213 else:
214 reply = 'IDLE'
215 elif data == 'ABORT':
216 if self.in_motion:
217 self.in_motion = False
218 reply = 'OK: ABORTED'
219 else:
220 reply = 'ERROR: NOTHING TO ABORT'
221 elif data == 'GET_POSITION':
222 reply = '%f' % self.pos
223 elif data.startswith('SET_POSITION'):
224 if self.in_motion:
225 reply = 'ERROR: STILL MOVING'
226 else:
227 try:
228 end_pos = float(data.strip('SET_POSITION'))
229 except:
230 reply = 'ERROR: WRONG SYNTAX'
231 else:
232 if end_pos < self.limits[0]:
233 reply = 'ERROR: LOWER LIMIT'
234 elif end_pos > self.limits[1]:
235 reply = 'ERROR: UPPER LIMIT'
236 else:
237 # Motion is valid
238 start_pos = self.pos
239 t_start = t
240 t_end = t + abs(end_pos - start_pos)/self.speed
241 direction = 1 if end_pos > start_pos else -1
242 self.in_motion = True
243 reply = 'OK'
244 else:
245 reply = 'ERROR: UNKNOWN COMMAND'
246
247 # Return to client
248 time.sleep(self.latency)
249 print(f'Sent: {reply}')
250 client.sendall((reply+'\n').encode())