-
Notifications
You must be signed in to change notification settings - Fork 5
/
viconn_util.py
95 lines (76 loc) · 3.28 KB
/
viconn_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
'''
Copyright 2024 philippoo66
Licensed under the GNU GENERAL PUBLIC LICENSE, Version 3 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.gnu.org/licenses/gpl-3.0.html
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import serial
import time
import utils
import optolinkvs2
# Funktion zum Hinzufügen von Bytes zum Puffer
def add_to_ringbuffer(buffer, new_bytes):
for byte in new_bytes:
buffer.pop(0) # Entferne das erste Byte (das älteste Byte)
buffer.append(byte) # Füge das neue Byte am Ende hinzu
def log_vito(data, pre, vitolog):
if(vitolog is not None):
sd = utils.bbbstr(data)
vitolog.write(f"{pre}\t{int(time.time()*1000)}\t{sd}\n")
# VS detection ---------------
def detect_vs2(serVicon:serial.Serial, serOpto:serial.Serial, timeout:float, vitolog_loc) -> bool:
bufferVicon = bytearray([0xFF, 0xFF, 0xFF])
bufferOpto = bytearray([0xFF, 0xFF, 0xFF, 0xFF])
timestart = time.time()
while True:
# Lesen von Daten von beiden seriellen Schnittstellen
dataVicon = serVicon.read()
dataOpto = serOpto.read()
# Überprüfen, ob Daten von ser1 empfangen wurden und dann auf ser2 schreiben
if dataVicon:
serOpto.write(dataVicon)
add_to_ringbuffer(bufferVicon, dataVicon)
#optolinkvs2_switch.log_vito(dataVicon, "M") # funktioniert hier nicht!?!?
log_vito(dataVicon, "M", vitolog_loc)
# reset optobuffer
bufferOpto = bytearray([0xFF, 0xFF, 0xFF, 0xFF])
# Überprüfen, ob Daten von ser2 empfangen wurden und dann auf ser1 schreiben
if dataOpto:
serVicon.write(dataOpto)
add_to_ringbuffer(bufferOpto, dataOpto)
#optolinkvs2_switch.log_vito(dataOpto, "S") # funktioniert hier nicht!?!?
log_vito(dataOpto, "S", vitolog_loc)
# check VS2
if(bufferVicon == bytearray([0x16, 0x00, 0x00])):
if(dataOpto == b'\x06'):
# Initialisierungssequenz erkannt und positive Antwort
return True
if(bufferOpto[0] == 0x06):
if(bufferOpto[1] == 0x41):
if (bufferOpto[3] == 0x01):
# Antwort im VS2 Format erkannt
return True
time.sleep(0.001)
if(time.time() > timestart + timeout):
return False
# viconn request mechanism -------------
vicon_request = bytearray()
def listen_to_Vitoconnect(servicon:serial, vitolog_loc):
global vicon_request
while(True):
succ, _, data = optolinkvs2.receive_vs2telegr(False, True, servicon) # contains sleep(0.005)
if(succ == 1):
vicon_request = data
elif(data):
log_vito(data, "X", vitolog_loc)
def get_vicon_request() -> bytearray:
global vicon_request
ret = vicon_request
vicon_request = bytearray()
return ret