-
Notifications
You must be signed in to change notification settings - Fork 343
/
piDSKY.py
executable file
·599 lines (569 loc) · 20 KB
/
piDSKY.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
#!/usr/bin/python3
# Copyright: None, placed in the PUBLIC DOMAIN by its author (Ron Burkey)
# Filename: piDSKY.py
# Purpose: This is an illustration of how to use the skeleton peripheral program
# piPeripheral.py to create a simple simulated DSKY.
# Reference: http://www.ibiblio.org/apollo/developer.html
# Mod history: 2017-11-17 RSB Began.
# 2017-11-21 RSB Updated with some fixes to the PRO and NOUN
# keys that had been identified for piDSKY2.py.
# 2017-12-02 RSB Replaced the entire program with a stripped form
# of piDSKY2.py (in which all hardware-specific stuff
# has been removed), because it was easier than
# back-porting bug-fixes.
# 2018-01-06 MAS Switched the TEMP light to use channel 163 instead
# of channel 11.
#
# Note that certain functionality (I think the code for get_char_keyboard_nonblock)
# might not work under Windows, but everything should
# presumably work in Linux, Raspbian, Mac OS X, etc.
#
# In this skeleton form, the script acts as kind of a console-based DSKY, in which
# you can use keyboard keys (0 1 2 3 4 5 6 7 8 9 + - V N C P K R Enter) as surrogates
# for DSKY pushbuttons, and all DSKY-related outputs from yaAGC are simply parsed and
# displayed in textual form, though the idea is that in general, you'd rip out all of
# that DSKY-specific stuff and replace it with whatever you wanted.
#
# The parts which need to be modified to be target-system specific are the
# outputFromAGC() and inputsForAGC() functions, which are in the section *after* the following
# section. The immediately following section, on the other hand, has some utility functions I use
# for the default outputFromAGC() and inputsForAGC() functions I provide, and
# can be deleted if they're not useful for the specific implementation desired.
#
# To run the program in its present form, you have to use yaAGC, and optionally
# yaDSKY2 (if you want to see the graphical DSKY and piPeripheral.py working in
# parallel). To do that, assuming you had a directory setup in which all of the
# appropriate files could be found, you could run (presumably from different consoles)
#
# yaDSKY2 --cfg=LM.ini --port=19797
# yaAGC --core=Luminary099.bin --port=19797 --cfg=LM.ini
# piDSKY.py
#
# If you didn't want to use yaDSKY2, then this stuff could all be run in a pure
# command-line environment without a GUI desktop.
import time
import os
import signal
import sys
import argparse
import threading
import termios
import fcntl
import socket
# Parse command-line arguments.
cli = argparse.ArgumentParser()
cli.add_argument("--host", help="Host address of yaAGC, defaulting to localhost.")
cli.add_argument("--port", help="Port for yaAGC, defaulting to 19798.", type=int)
cli.add_argument("--slow", help="For use on really slow host systems.")
args = cli.parse_args()
# Responsiveness settings.
if args.slow:
PULSE = 0.25
lampDeadtime = 0.25
else:
PULSE = 0.05
lampDeadtime = 0.1
# Characteristics of the host and port being used for yaAGC communications.
if args.host:
TCP_IP = args.host
else:
TCP_IP = 'localhost'
if args.port:
TCP_PORT = args.port
else:
TCP_PORT = 19798
###################################################################################
# Some utilities I happen to use in my sample hardware abstraction functions, but
# not of value outside of that, unless you happen to be implementing DSKY functionality
# in a similar way.
# Given a 3-tuple (channel,value,mask), creates packet data and sends it to yaAGC.
def packetize(tuple):
outputBuffer = bytearray(4)
# First, create and output the mask command.
outputBuffer[0] = 0x20 | ((tuple[0] >> 3) & 0x0F)
outputBuffer[1] = 0x40 | ((tuple[0] << 3) & 0x38) | ((tuple[2] >> 12) & 0x07)
outputBuffer[2] = 0x80 | ((tuple[2] >> 6) & 0x3F)
outputBuffer[3] = 0xC0 | (tuple[2] & 0x3F)
s.send(outputBuffer)
# Now, the actual data for the channel.
outputBuffer[0] = 0x00 | ((tuple[0] >> 3) & 0x0F)
outputBuffer[1] = 0x40 | ((tuple[0] << 3) & 0x38) | ((tuple[1] >> 12) & 0x07)
outputBuffer[2] = 0x80 | ((tuple[1] >> 6) & 0x3F)
outputBuffer[3] = 0xC0 | (tuple[1] & 0x3F)
s.send(outputBuffer)
# This particular function parses various keystrokes, like '0' or 'V' and creates
# packets as if they were DSKY keypresses. It should be called occasionally as
# parseDskyKey(0) if there are no keystrokes, in order to make sure that the PRO
# key gets released.
# The return value of this function is
# a list ([...]), of which each element is a 3-tuple consisting of an AGC channel
# number, a value for that channel, and a bitmask that tells which bit-positions
# of the value are valid. The returned list can be empty. For example, a
# return value of
# [ ( 0o15, 0o31, 0o37 ) ]
# would indicate that the lowest 5 bits of channel 15 (octal) were valid, and that
# the value of those bits were 11001 (binary), which collectively indicate that
# the KEY REL key on a DSKY is pressed.
resetCount = 0
def parseDskyKey(ch):
global resetCount
if ch == 'R':
resetCount += 1
if resetCount >= 5:
print("Exiting ...")
return ""
elif ch != "":
resetCount = 0
returnValue = []
if ch == '0':
returnValue.append( (0o15, 0o20, 0o37) )
elif ch == '1':
returnValue.append( (0o15, 0o1, 0o37) )
elif ch == '2':
returnValue.append( (0o15, 0o2, 0o37) )
elif ch == '3':
returnValue.append( (0o15, 0o3, 0o37) )
elif ch == '4':
returnValue.append( (0o15, 0o4, 0o37) )
elif ch == '5':
returnValue.append( (0o15, 0o5, 0o37) )
elif ch == '6':
returnValue.append( (0o15, 0o6, 0o37) )
elif ch == '7':
returnValue.append( (0o15, 0o7, 0o37) )
elif ch == '8':
returnValue.append( (0o15, 0o10, 0o37) )
elif ch == '9':
returnValue.append( (0o15, 0o11, 0o37) )
elif ch == '+':
returnValue.append( (0o15, 0o32, 0o37) )
elif ch == '-':
returnValue.append( (0o15, 0o33, 0o37) )
elif ch == 'V':
returnValue.append( (0o15, 0o21, 0o37) )
elif ch == 'N':
returnValue.append( (0o15, 0o37, 0o37) )
elif ch == 'R':
returnValue.append( (0o15, 0o22, 0o37) )
elif ch == 'C':
returnValue.append( (0o15, 0o36, 0o37) )
elif ch == 'P':
returnValue.append( (0o32, 0o00000, 0o20000) )
elif ch == 'p' or ch == 'PR':
returnValue.append( (0o32, 0o20000, 0o20000) )
elif ch == 'K':
returnValue.append( (0o15, 0o31, 0o37) )
elif ch == '\n':
returnValue.append( (0o15, 0o34, 0o37) )
return returnValue
# This function turns keyboard echo on or off.
def echoOn(control):
fd = sys.stdin.fileno()
new = termios.tcgetattr(fd)
if control:
print("Keyboard echo on")
new[3] |= termios.ECHO
else:
print("Keyboard echo off")
new[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, new)
echoOn(False)
# This function is a non-blocking read of a single character from the
# keyboard. Returns either the key value (such as '0' or 'V'), or else
# the value "" if no key was pressed. Note: fakes a "key"
# 'PR' 0.75 seconds after a key 'p' or 'P'. This is in lieu of PRO
# press and release events. Is is possible to get keypress and release
# events or other equivalent data from the Python "keyboard" module, but
# I didn't know about it at first, and am too lazy to go back and add
# that support.
pressedPRO = False
timePRO = 0
def get_char_keyboard_nonblock():
global pressedPRO, timePRO
fd = sys.stdin.fileno()
oldterm = termios.tcgetattr(fd)
newattr = termios.tcgetattr(fd)
newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, newattr)
oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
c = ""
try:
c = sys.stdin.read(1)
except IOError: pass
termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
if c == 'p' or c == 'P':
pressedPRO = True
timePRO = time.time()
if c == "" and pressedPRO and time.time() > timePRO + 0.75:
pressedPRO = False
c = 'PR'
return c
# The following dictionary gives, for each indicator lamp:
# Whether or not it is currently lit.
# This information isn't actually used for anything, but can be useful in a
# specific hardware model as a way to know which lamp statuses have changed.
lampStatuses = {
"UPLINK ACTY" : { "isLit" : False },
"TEMP" : { "isLit" : False },
"NO ATT" : { "isLit" : False },
"GIMBAL LOCK" : { "isLit" : False },
"DSKY STANDBY" : { "isLit" : False },
"PROG" : { "isLit" : False },
"KEY REL" : { "isLit" : False },
"RESTART" : { "isLit" : False },
"OPR ERR" : { "isLit" : False },
"TRACKER" : { "isLit" : False },
"PRIO DSP" : { "isLit" : False },
"ALT" : { "isLit" : False },
"NO DAP" : { "isLit" : False },
"VEL" : { "isLit" : False }
}
# For modifying the lampStatuses[] array.
def updateLampStatuses(key, value):
global lampStatuses
if key in lampStatuses:
lampStatuses[key]["isLit"] = value
# Converts a 5-bit code in channel 010 to " ", "0", ..., "9".
def codeToString(code):
if code == 0:
return " "
elif code == 21:
return "0"
elif code == 3:
return "1"
elif code == 25:
return "2"
elif code == 27:
return "3"
elif code == 15:
return "4"
elif code == 30:
return "5"
elif code == 28:
return "6"
elif code == 19:
return "7"
elif code == 29:
return "8"
elif code == 31:
return "9"
return "?"
###################################################################################
# Hardware abstraction / User-defined functions. Also, any other platform-specific
# initialization.
# This function is automatically called periodically by the event loop to check for
# conditions that will result in sending messages to yaAGC that are interpreted
# as changes to bits on its input channels. For test purposes, it simply polls the
# keyboard, and interprets various keystrokes as DSKY keys if present. The return
# value is supposed to be a list of 3-tuples of the form
# [ (channel0,value0,mask0), (channel1,value1,mask1), ...]
# and may be en empty list.
def inputsForAGC():
ch = get_char_keyboard_nonblock()
ch = ch.upper()
if ch == '_':
ch = '-'
elif ch == '=':
ch = '+'
else:
returnValue = parseDskyKey(ch)
if len(returnValue) > 0:
print("Sending to yaAGC: " + oct(returnValue[0][1]) + "(mask " + oct(returnValue[0][2]) + ") -> channel " + oct(returnValue[0][0]))
return returnValue
def updateLamps():
# If there were actual hardware, this is where you could use
# lampStatus[] to control the lamps.
return
updateLamps()
# This function is called by the event loop only when yaAGC has written
# to an output channel. The function should do whatever it is that needs to be done
# with this output data, which is not processed additionally in any way by the
# generic portion of the program. As a test, I simply display the outputs for
# those channels relevant to the DSKY.
last10 = 1234567
last11 = 1234567
last13 = 1234567
last163 = 1234567
plusMinusState1 = 0
plusMinusState2 = 0
plusMinusState3 = 0
def outputFromAGC(channel, value):
# These lastNN values are just used to cut down on the number of messages printed,
# when the same value is output over and over again to the same channel, because
# that makes debugging harder.
global last10, last11, last13, last163, plusMinusState1, plusMinusState2, plusMinusState3
if (channel == 0o13):
value &= 0o3000
if (channel == 0o10 and value != last10) or (channel == 0o11 and value != last11) or (channel == 0o13 and value != last13) or (channel == 0o163 and value != last163):
if channel == 0o10:
last10 = value
aaaa = (value >> 11) & 0x0F
b = (value >> 10) & 0x01
ccccc = (value >> 5) & 0x1F
ddddd = value & 0x1F
if aaaa != 12:
sc = codeToString(ccccc)
sd = codeToString(ddddd)
if aaaa == 11:
print(sc + " -> M1 " + sd + " -> M2")
elif aaaa == 10:
print(sc + " -> V1 " + sd + " -> V2")
elif aaaa == 9:
print(sc + " -> N1 " + sd + " -> N2")
elif aaaa == 8:
print(" " + sd + " -> 11")
elif aaaa == 7:
plusMinus = " "
if b != 0:
plusMinus = "1+"
plusMinusState1 |= 1
else:
plusMinusState1 &= ~1
print(sc + " -> 12 " + sd + " -> 13 " + plusMinus)
elif aaaa == 6:
plusMinus = " "
if b != 0:
plusMinus = "1-"
plusMinusState1 |= 2
else:
plusMinusState1 &= ~2
print(sc + " -> 14 " + sd + " -> 15 " + plusMinus)
elif aaaa == 5:
plusMinus = " "
if b != 0:
plusMinus = "2+"
plusMinusState2 |= 1
else:
plusMinusState2 &= ~1
print(sc + " -> 21 " + sd + " -> 22 " + plusMinus)
elif aaaa == 4:
plusMinus = " "
if b != 0:
plusMinus = "2-"
plusMinusState2 |= 2
else:
plusMinusState2 &= ~2
print(sc + " -> 23 " + sd + " -> 24 " + plusMinus)
elif aaaa == 3:
print(sc + " -> 25 " + sd + " -> 31")
elif aaaa == 2:
plusMinus = " "
if b != 0:
plusMinus = "3+"
plusMinusState3 |= 1
else:
plusMinusState3 &= ~1
print(sc + " -> 32 " + sd + " -> 33 " + plusMinus)
elif aaaa == 1:
plusMinus = " "
if b != 0:
plusMinus = "3-"
plusMinusState3 |= 2
else:
plusMinusState3 &= ~2
print(sc + " -> 34 " + sd + " -> 35 " + plusMinus)
elif aaaa == 12:
vel = "VEL OFF "
if (value & 0x04) != 0:
vel = "VEL ON "
updateLampStatuses("VEL", True)
else:
updateLampStatuses("VEL", False)
noAtt = "NO ATT OFF "
if (value & 0x08) != 0:
noAtt = "NO ATT ON "
updateLampStatuses("NO ATT", True)
else:
updateLampStatuses("NO ATT", False)
alt = "ALT OFF "
if (value & 0x10) != 0:
alt = "ALT ON "
updateLampStatuses("ALT", True)
else:
updateLampStatuses("ALT", False)
gimbalLock = "GIMBAL LOCK OFF "
if (value & 0x20) != 0:
gimbalLock = "GIMBAL LOCK ON "
updateLampStatuses("GIMBAL LOCK", True)
else:
updateLampStatuses("GIMBAL LOCK", False)
tracker = "TRACKER OFF "
if (value & 0x80) != 0:
tracker = "TRACKER ON "
updateLampStatuses("TRACKER", True)
else:
updateLampStatuses("TRACKER", False)
prog = "PROG OFF "
if (value & 0x100) != 0:
prog = "PROG ON "
updateLampStatuses("PROG", True)
else:
updateLampStatuses("PROG", False)
print(vel + " " + noAtt + " " + alt + " " + gimbalLock + " " + tracker + " " + prog)
updateLamps()
elif channel == 0o11:
last11 = value
compActy = "COMP ACTY OFF "
if (value & 0x02) != 0:
compActy = "COMP ACTY ON "
uplinkActy = "UPLINK ACTY OFF "
if (value & 0x04) != 0:
uplinkActy = "UPLINK ACTY ON "
updateLampStatuses("UPLINK ACTY", True)
else:
updateLampStatuses("UPLINK ACTY", False)
flashing = "V/N NO FLASH "
print(compActy + " " + uplinkActy + " " + " " + flashing)
updateLamps()
elif channel == 0o13:
last13 = value
test = "DSKY TEST "
if (value & 0x200) == 0:
test = "DSKY NO TEST "
print(test)
updateLamps()
elif channel == 0o163:
last163 = value
if (value & 0x08) != 0:
temp = "TEMP ON "
updateLampStatuses("TEMP", True)
else:
temp = "TEMP OFF "
updateLampStatuses("TEMP", False)
if (value & 0o400) != 0:
standby = "DSKY STANDBY ON "
updateLampStatuses("DSKY STANDBY", True)
else:
standby = "DSKY STANDBY OFF"
updateLampStatuses("DSKY STANDBY", False)
if (value & 0o20) != 0:
keyRel = "KEY REL ON "
updateLampStatuses("KEY REL", True)
else:
keyRel = "KEY REL OFF "
updateLampStatuses("KEY REL", False)
if (value & 0o100) != 0:
oprErr = "OPR ERR FLASH "
updateLampStatuses("OPR ERR", True)
else:
oprErr = "OPR ERR OFF "
updateLampStatuses("OPR ERR", False)
if (value & 0o200) != 0:
restart = "RESTART ON "
updateLampStatuses("RESTART", True)
else:
restart = "RESTART OFF "
updateLampStatuses("RESTART", False)
print(temp + " " + standby + " " + keyRel + " " + oprErr + " " + restart)
else:
print("Received from yaAGC: " + oct(value) + " -> channel " + oct(channel))
return
###################################################################################
# Generic initialization (TCP socket setup). Has no target-specific code, and
# shouldn't need to be modified unless there are bugs.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(0)
def connectToAGC():
while True:
try:
s.connect((TCP_IP, TCP_PORT))
print("Connected to yaAGC (" + TCP_IP + ":" + str(TCP_PORT) + ")")
break
except socket.error as msg:
print("Could not connect to yaAGC (" + TCP_IP + ":" + str(TCP_PORT) + "), exiting: " + str(msg))
time.sleep(1)
# The following provides a clean exit from the program by simply
# hitting any key. However if get_char_keyboard_nonblock isn't
# defined, just delete the next 4 lines and use Ctrl-C to exit instead.
ch = get_char_keyboard_nonblock()
if ch != "":
print("Exiting ...")
sys.exit()
connectToAGC()
###################################################################################
# Event loop. Just check periodically for output from yaAGC (in which case the
# user-defined callback function outputFromAGC is executed) or data in the
# user-defined function inputsForAGC (in which case a message is sent to yaAGC).
# But this section has no target-specific code, and shouldn't need to be modified
# unless there are bugs.
def eventLoop():
# Buffer for a packet received from yaAGC.
packetSize = 4
inputBuffer = bytearray(packetSize)
leftToRead = packetSize
view = memoryview(inputBuffer)
didSomething = False
while True:
if not didSomething:
time.sleep(PULSE)
didSomething = False
# Check for packet data received from yaAGC and process it.
# While these packets are always exactly 4
# bytes long, since the socket is non-blocking, any individual read
# operation may yield less bytes than that, so the buffer may accumulate data
# over time until it fills.
try:
numNewBytes = s.recv_into(view, leftToRead)
except:
numNewBytes = 0
if numNewBytes > 0:
view = view[numNewBytes:]
leftToRead -= numNewBytes
if leftToRead == 0:
# Prepare for next read attempt.
view = memoryview(inputBuffer)
leftToRead = packetSize
# Parse the packet just read, and call outputFromAGC().
# Start with a sanity check.
ok = 1
if (inputBuffer[0] & 0xF0) != 0x00:
ok = 0
elif (inputBuffer[1] & 0xC0) != 0x40:
ok = 0
elif (inputBuffer[2] & 0xC0) != 0x80:
ok = 0
elif (inputBuffer[3] & 0xC0) != 0xC0:
ok = 0
# Packet has the various signatures we expect.
if ok == 0:
# Note that, depending on the yaAGC version, it occasionally
# sends either a 1-byte packet (just 0xFF, older versions)
# or a 4-byte packet (0xFF 0xFF 0xFF 0xFF, newer versions)
# just for pinging the client. These packets hold no
# data and need to be ignored, but for other corrupted packets
# we print a message. And try to realign past the corrupted
# bytes.
if inputBuffer[0] != 0xff or inputBuffer[1] != 0xff or inputBuffer[2] != 0xff or inputBuffer[2] != 0xff:
if inputBuffer[0] != 0xff:
print("Illegal packet: " + hex(inputBuffer[0]) + " " + hex(inputBuffer[1]) + " " + hex(inputBuffer[2]) + " " + hex(inputBuffer[3]))
for i in range(1,packetSize):
if (inputBuffer[i] & 0xF0) == 0:
j = 0
for k in range(i,4):
inputBuffer[j] = inputBuffer[k]
j += 1
view = view[j:]
leftToRead = packetSize - j
else:
channel = (inputBuffer[0] & 0x0F) << 3
channel |= (inputBuffer[1] & 0x38) >> 3
value = (inputBuffer[1] & 0x07) << 12
value |= (inputBuffer[2] & 0x3F) << 6
value |= (inputBuffer[3] & 0x3F)
outputFromAGC(channel, value)
didSomething = True
# Check for locally-generated data for which we must generate messages
# to yaAGC over the socket. In theory, the externalData list could contain
# any number of channel operations, but in practice (at least for something
# like a DSKY implementation) it will actually contain only 0 or 1 operations.
externalData = inputsForAGC()
if externalData == "":
echoOn(True)
return
for i in range(0, len(externalData)):
packetize(externalData[i])
didSomething = True
eventLoop()
os._exit(0)