summaryrefslogtreecommitdiff
path: root/scripts/cli.py
diff options
context:
space:
mode:
authordmlunar <root@lunar.sh>2025-01-22 16:47:21 +0200
committerdmlunar <root@lunar.sh>2025-10-15 23:42:50 +0200
commit729f2a2c3ebfb2612d873caf453a1d7ca02180d9 (patch)
tree7bab2fcc0c7f50eab3013348697bc06ddd71d551 /scripts/cli.py
downloadvarpa-729f2a2c3ebfb2612d873caf453a1d7ca02180d9.tar.gz
varpa-729f2a2c3ebfb2612d873caf453a1d7ca02180d9.zip
varpa: initial public commit
Diffstat (limited to 'scripts/cli.py')
-rw-r--r--scripts/cli.py289
1 files changed, 289 insertions, 0 deletions
diff --git a/scripts/cli.py b/scripts/cli.py
new file mode 100644
index 0000000..e401239
--- /dev/null
+++ b/scripts/cli.py
@@ -0,0 +1,289 @@
+# Author: Dylan Muller
+# Copyright (c) 2025
+# All rights reserved.
+#
+# - Commercial/IP use prohibited.
+# - Attribution required.
+# See License.txt
+
+import TermTk as ttk
+import os
+import sys
+import msvcrt
+import ctypes
+import time
+import serial
+import serial.tools.list_ports
+from threading import Thread, Lock
+import signal
+
+from setup import *
+from device import *
+
+GWL_STYLE = -16
+WS_SIZEBOX = 0x00040000
+WS_MAXIMIZEBOX = 0x00010000
+CLI_TITLE = "RFCLI"
+CLI_VER = "1.00.01"
+
+class gate_status:
+ status = ""
+ q_volt = 0
+ q_amp = 0
+ q_time = 0
+ v_set = 0
+ q_drain = 0
+
+class drain_status:
+ status = ""
+ v_adc = 0
+ v_set = 0
+ amp = 0
+
+class system_status:
+ status = ""
+ v_fwd = 0
+ p_fwd = 0
+ p_set = 0
+ t_delta = 0
+
+class global_status:
+ gate = gate_status()
+ drain = drain_status()
+ system = system_status()
+
+status = global_status()
+root = ttk.TTk(title= CLI_TITLE + " " + CLI_VER,
+ sigmask=())
+serial_port = None
+update_thread = None
+lock = Lock()
+
+def cli_inputs_focused():
+ if (cli_LEDrainOverride.hasFocus() or
+ cli_LEGateQDrain.hasFocus() or
+ cli_LESystemPSet.hasFocus() or
+ cli_LESystemSWRLim.hasFocus()):
+ return True
+ return False
+
+def cli_update_thread():
+ global serial_port
+
+ while(True):
+ with lock:
+ if(serial_port.is_open):
+ reply = sys_reply(serial_port)
+ if (SETUP_CMD_BUSY_RESPONSE in reply):
+ serial_port.close()
+ continue
+ if (cli_inputs_focused() == False):
+ rv = cli_update_status(serial_port)
+ if(rv == 0):
+ serial_port.close()
+ if(cli_FSystemStatus.color() != le_green):
+ cli_FSystemStatus.setColor(le_green)
+ else:
+ if(cli_FSystemStatus.color() != le_red):
+ cli_FSystemStatus.setText("USER INPUT")
+ cli_FSystemStatus.setColor(le_red)
+
+ if (cli_BSystemConnect.text() != "Disconnect"):
+ cli_BSystemConnect.setText("Disconnect")
+ else:
+ if (cli_inputs_focused() == False):
+ cli_FSystemStatus.setText("NO-LINK")
+ cli_BSystemConnect.setText("Connect")
+ time.sleep(SETUP_CLI_UPDATE_RATE)
+
+def cli_BSystemConnect_clicked():
+ global serial_port
+ global update_thread
+ with lock:
+ if (serial_port is None or serial_port.is_open == False):
+ target = cli_CBSystemPort.currentText()
+ if (target == SETUP_STR_EMPTY):
+ display_message("No valid COM port")
+ return
+ else:
+ init_uart_link(target)
+ if (serial_port and serial_port.is_open):
+ reply = sys_reply(serial_port)
+ if (SETUP_CMD_BUSY_RESPONSE in reply):
+ display_message("Target is busy")
+ serial_port.close()
+ return
+ elif (SETUP_CMD_REPLY_RESPONSE not in reply):
+ display_message("Target does not respond")
+ serial_port.close()
+ return
+ else:
+ display_message("Failed establishing link")
+ serial_port.close()
+ return
+
+ if (update_thread == None):
+ update_thread = Thread(target=cli_update_thread)
+ update_thread.start()
+ else:
+ serial_port.close()
+
+def cli_update_gate_frame(line, param):
+ global status
+ if (SETUP_REPORT_STATUS in line):
+ status.gate.status = param
+ cli_FGateStatus.setText(param)
+ elif (SETUP_REPORT_Q_VOLT in line):
+ status.gate.q_volt = int(param)
+ cli_FGateQV.setText(param)
+ elif (SETUP_REPORT_Q_AMP in line):
+ status.gate.q_amp = int(param)
+ cli_FGateQA.setText(param)
+ elif (SETUP_REPORT_Q_TIME in line):
+ status.gate.q_time = int(param)
+ cli_FGateQTime.setText(param)
+ elif (SETUP_REPORT_V_SET in line):
+ status.gate.v_set = int(param)
+ cli_FGateVSet.setText(param)
+ elif (SETUP_REPORT_Q_DRAIN in line):
+ status.gate.q_drain = int(param)
+
+def cli_update_drain_frame(line, param):
+ global status
+ if (SETUP_REPORT_STATUS in line):
+ status.drain.status = param
+ cli_FDrainStatus.setText(param)
+ elif (SETUP_REPORT_V_ADC in line):
+ status.drain.v_adc = int(param)
+ cli_FDrainVADC.setText(param)
+ elif (SETUP_REPORT_V_SET in line):
+ status.drain.amp = int(param)
+ cli_FDrainVSet.setText(param)
+
+def cli_update_system_frame(line, param):
+ global status
+ if (SETUP_REPORT_STATUS in line):
+ status.system.status = param
+ cli_FSystemStatus.setText(param)
+ elif (SETUP_REPORT_V_FWD in line):
+ status.system.v_fwd = int(param)
+ cli_FSystemVFwd.setText(param)
+ elif (SETUP_REPORT_P_FWD in line):
+ status.system.p_fwd = int(param)
+ cli_FSystemPFwd.setText(param)
+ elif (SETUP_REPORT_P_SET in line):
+ status.system.p_set = int(param)
+ elif (SETUP_REPORT_T_DELTA in line):
+ status.system.t_delta = int(param)
+ cli_FSystemTSet.setText(param)
+
+def cli_update_status(serial):
+ serial.flush()
+ report = sys_report(serial)
+ if (report == SETUP_STR_EMPTY or
+ SETUP_CMD_BUSY_RESPONSE in report):
+ return 0
+ current_frame = 0
+ fields_updated = 0
+ for line in report:
+ if (SETUP_REPORT_FRAME_GATE in line):
+ current_frame = SETUP_REPORT_FRAME_GATE
+ continue
+ elif (SETUP_REPORT_FRAME_DRAIN in line):
+ current_frame = SETUP_REPORT_FRAME_DRAIN
+ continue
+ elif (SETUP_REPORT_FRAME_SYSTEM in line):
+ current_frame = SETUP_REPORT_FRAME_SYSTEM
+ continue
+ if (current_frame != 0 and serial_port):
+ param = extract_param(line)
+ if (param == False):
+ return 0
+ fields_updated += 1
+ if (current_frame == SETUP_REPORT_FRAME_GATE):
+ cli_update_gate_frame(line, param)
+ elif (current_frame == SETUP_REPORT_FRAME_DRAIN):
+ cli_update_drain_frame(line, param)
+ elif (current_frame == SETUP_REPORT_FRAME_SYSTEM):
+ cli_update_system_frame(line, param)
+ return fields_updated
+
+def signal_handler(sig, frame):
+ display_message("test")
+ exit_terminal()
+
+def extract_param(line):
+ split = line.split(':')
+ if (len(split) != 2):
+ return False
+ return split[1].strip()
+
+def uart_established(serial):
+ if (serial and serial.is_open):
+ return True
+ return False
+
+def init_uart_link(port):
+ global serial_port
+ try:
+ if (serial_port is None):
+ serial_port = serial.Serial(
+ port=port,
+ baudrate=SETUP_SERIAL_BAUD,
+ timeout=SETUP_SERIAL_TIMEOUT)
+ serial_port.open()
+ else:
+ serial_port.open()
+ except serial.SerialException as e:
+ return None
+
+def list_com_ports():
+ com = []
+ ports = list(serial.tools.list_ports.comports())
+ for port in ports:
+ com.append(port.device)
+ return com
+
+def set_win_attributes():
+ hwnd = ctypes.windll.kernel32.GetConsoleWindow()
+ current_style = ctypes.windll.user32.GetWindowLongW(hwnd, GWL_STYLE)
+ ctypes.windll.user32.SetWindowLongW(hwnd, GWL_STYLE, current_style &
+ ~WS_SIZEBOX & ~WS_MAXIMIZEBOX)
+
+def cleanup_terminal():
+ os.system('cls')
+ os.system('echo \x1b[?25h')
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ while msvcrt.kbhit():
+ msvcrt.getch()
+ os._exit(0)
+
+def exit_terminal():
+ root.quit()
+ cleanup_terminal()
+
+def display_message(text):
+ messageBox = ttk.TTkMessageBox(text=text)
+ ttk.TTkHelper.overlay(None, messageBox, 25, 10, True)
+
+cli_ctrl_c = ttk.TTkK.CTRL | ttk.TTkK.Key_C
+root.layout().addWidget(main_window)
+ttk.TTkShortcut(cli_ctrl_c).activated.connect(exit_terminal)
+cli_BSystemConnect.clicked.connect(cli_BSystemConnect_clicked)
+cli_CBSystemPort.addItems(list_com_ports())
+
+cli_LEGateQDrain.setInputType(ttk.TTkK.Input_Number)
+cli_LESystemSWRLim.setInputType(ttk.TTkK.Input_Number)
+cli_LEDrainOverride.setInputType(ttk.TTkK.Input_Number)
+cli_LESystemPSet.setInputType(ttk.TTkK.Input_Number)
+
+cli_FSystemStatus.setColor(le_green)
+
+#BSystemExit.clicked.connect(exit_terminal)
+#set_win_attributes()
+root.mainloop()
+
+#currentIndexChanged