246 lines
8.5 KiB
Python
246 lines
8.5 KiB
Python
import argparse
|
||
import json
|
||
import os
|
||
import socket
|
||
import sys
|
||
import time
|
||
import urllib.error
|
||
import urllib.request
|
||
import uuid
|
||
from pathlib import Path
|
||
|
||
|
||
APP_NAME = "InvWebClientAgent"
|
||
|
||
|
||
def is_windows():
|
||
return os.name == "nt"
|
||
|
||
|
||
def config_dir():
|
||
if is_windows():
|
||
base = os.environ.get("APPDATA") or str(Path.home())
|
||
return Path(base) / APP_NAME
|
||
return Path.home() / f".{APP_NAME.lower()}"
|
||
|
||
|
||
def config_path():
|
||
return config_dir() / "config.json"
|
||
|
||
|
||
def load_config():
|
||
path = config_path()
|
||
if not path.exists():
|
||
return {}
|
||
try:
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def save_config(config):
|
||
path = config_path()
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(json.dumps(config, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
|
||
def prompt_value(prompt_text, current_value=""):
|
||
suffix = f" [{current_value}]" if current_value else ""
|
||
raw = input(f"{prompt_text}{suffix}: ").strip()
|
||
return raw or current_value
|
||
|
||
|
||
def fetch_json(url, headers=None, timeout=10):
|
||
request = urllib.request.Request(url, headers=headers or {}, method="GET")
|
||
with urllib.request.urlopen(request, timeout=timeout) as response:
|
||
return json.loads(response.read().decode("utf-8"))
|
||
|
||
|
||
def choose_computer(server, token, current_inventory_number=""):
|
||
response = fetch_json(
|
||
server.rstrip("/") + "/api/client_computers",
|
||
headers={"X-Agent-Token": token},
|
||
)
|
||
computers = response.get("computers") or []
|
||
if not computers:
|
||
return current_inventory_number
|
||
|
||
print("Выберите компьютер из списка:")
|
||
default_index = None
|
||
for index, item in enumerate(computers, start=1):
|
||
if current_inventory_number and item.get("inventory_number") == current_inventory_number:
|
||
default_index = index
|
||
label = f'{index}. {item.get("inventory_number", "")} | {item.get("name", "")} | {item.get("type_label", "")}'
|
||
cabinet = item.get("cabinet", "")
|
||
if cabinet:
|
||
label += f" | {cabinet}"
|
||
print(label)
|
||
|
||
while True:
|
||
default_text = str(default_index) if default_index else ""
|
||
raw = prompt_value("Номер компьютера", default_text)
|
||
if not raw:
|
||
return current_inventory_number
|
||
if raw.isdigit():
|
||
selected_index = int(raw)
|
||
if 1 <= selected_index <= len(computers):
|
||
return computers[selected_index - 1].get("inventory_number", "")
|
||
print("Некорректный выбор. Введите номер из списка.")
|
||
|
||
|
||
def startup_script_path():
|
||
startup_dir = Path(os.environ.get("APPDATA", "")) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
|
||
return startup_dir / f"{APP_NAME}.cmd"
|
||
|
||
|
||
def enable_windows_startup():
|
||
script_path = startup_script_path()
|
||
script_path.parent.mkdir(parents=True, exist_ok=True)
|
||
python_exe = Path(sys.executable)
|
||
script_file = Path(__file__).resolve()
|
||
command = f'@echo off\r\n"{python_exe}" "{script_file}"\r\n'
|
||
script_path.write_text(command, encoding="utf-8")
|
||
|
||
|
||
def maybe_setup_windows_config(config):
|
||
print("Первичная настройка клиента Windows")
|
||
config["server"] = prompt_value("Адрес сервера", config.get("server", "http://127.0.0.1:5000"))
|
||
config["token"] = prompt_value("Токен клиента", config.get("token", "change-me-client-token"))
|
||
try:
|
||
config["inventory_number"] = choose_computer(config["server"], config["token"], config.get("inventory_number", ""))
|
||
except Exception as error:
|
||
print(f"Не удалось получить список компьютеров: {error}")
|
||
config["inventory_number"] = prompt_value("Инвентарный номер компьютера", config.get("inventory_number", ""))
|
||
startup_answer = prompt_value("Добавить в автозапуск Windows? (y/n)", "y").lower()
|
||
config["autostart"] = startup_answer in ("y", "yes", "д", "да")
|
||
save_config(config)
|
||
if config["autostart"]:
|
||
enable_windows_startup()
|
||
return config
|
||
|
||
|
||
def maybe_setup_config(config):
|
||
print("Первичная настройка клиента")
|
||
config["server"] = prompt_value("Адрес сервера", config.get("server", "http://127.0.0.1:5000"))
|
||
config["token"] = prompt_value("Токен клиента", config.get("token", "change-me-client-token"))
|
||
try:
|
||
config["inventory_number"] = choose_computer(config["server"], config["token"], config.get("inventory_number", ""))
|
||
except Exception as error:
|
||
print(f"Не удалось получить список компьютеров: {error}")
|
||
config["inventory_number"] = prompt_value("Инвентарный номер компьютера", config.get("inventory_number", ""))
|
||
save_config(config)
|
||
return config
|
||
|
||
|
||
def detect_hostname():
|
||
return socket.gethostname()
|
||
|
||
|
||
def detect_ip_address():
|
||
try:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.connect(("8.8.8.8", 80))
|
||
ip_address = sock.getsockname()[0]
|
||
sock.close()
|
||
return ip_address
|
||
except OSError:
|
||
return "127.0.0.1"
|
||
|
||
|
||
def detect_mac_address():
|
||
mac_value = uuid.getnode()
|
||
return ":".join(f"{(mac_value >> ele) & 0xff:02x}" for ele in range(40, -1, -8))
|
||
|
||
|
||
def detect_online(ip_address):
|
||
return bool(ip_address and ip_address != "127.0.0.1")
|
||
|
||
|
||
def build_payload(inventory_number):
|
||
ip_address = detect_ip_address()
|
||
return {
|
||
"inventory_number": inventory_number,
|
||
"hostname": detect_hostname(),
|
||
"ip_address": ip_address,
|
||
"mac_address": detect_mac_address(),
|
||
"is_online": detect_online(ip_address),
|
||
}
|
||
|
||
|
||
def send_heartbeat(server_url, agent_token, inventory_number, timeout=10):
|
||
payload = build_payload(inventory_number)
|
||
request = urllib.request.Request(
|
||
server_url.rstrip("/") + "/api/client_heartbeat",
|
||
data=json.dumps(payload).encode("utf-8"),
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"X-Agent-Token": agent_token,
|
||
},
|
||
method="POST",
|
||
)
|
||
with urllib.request.urlopen(request, timeout=timeout) as response:
|
||
return response.read().decode("utf-8")
|
||
|
||
|
||
def resolve_settings(args):
|
||
config = load_config()
|
||
if not config.get("server") or not config.get("inventory_number") or not config.get("token"):
|
||
if is_windows():
|
||
config = maybe_setup_windows_config(config)
|
||
else:
|
||
config = maybe_setup_config(config)
|
||
|
||
server = args.server or config.get("server")
|
||
token = args.token or config.get("token")
|
||
inventory_number = args.inventory_number or config.get("inventory_number")
|
||
|
||
if is_windows():
|
||
updated = False
|
||
if server and config.get("server") != server:
|
||
config["server"] = server
|
||
updated = True
|
||
if token and config.get("token") != token:
|
||
config["token"] = token
|
||
updated = True
|
||
if inventory_number and config.get("inventory_number") != inventory_number:
|
||
config["inventory_number"] = inventory_number
|
||
updated = True
|
||
if updated:
|
||
save_config(config)
|
||
|
||
if not server or not token or not inventory_number:
|
||
raise SystemExit("server, token and inventory-number are required")
|
||
|
||
return server, token, inventory_number
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Client agent for inventory heartbeat")
|
||
parser.add_argument("--server", help="Base URL of inventory server, e.g. http://192.168.1.92:5000")
|
||
parser.add_argument("--token", help="Shared client agent token")
|
||
parser.add_argument("--inventory-number", help="Inventory number from computers table")
|
||
parser.add_argument("--interval", type=int, default=60, help="Heartbeat interval in seconds")
|
||
parser.add_argument("--once", action="store_true", help="Send one heartbeat and exit")
|
||
args = parser.parse_args()
|
||
|
||
server, token, inventory_number = resolve_settings(args)
|
||
|
||
while True:
|
||
try:
|
||
result = send_heartbeat(server, token, inventory_number)
|
||
print(result)
|
||
except urllib.error.HTTPError as error:
|
||
print(f"HTTP error: {error.code} {error.reason}")
|
||
except urllib.error.URLError as error:
|
||
print(f"Connection error: {error.reason}")
|
||
except Exception as error:
|
||
print(f"Unexpected error: {error}")
|
||
|
||
if args.once:
|
||
break
|
||
time.sleep(max(5, args.interval))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|