Skip to content

Commit

Permalink
Wifi: move wpa implementation to subfolder
Browse files Browse the repository at this point in the history
  • Loading branch information
Williangalvani committed Dec 20, 2024
1 parent 5083dd7 commit a4afc26
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 182 deletions.
205 changes: 57 additions & 148 deletions core/services/wifi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import argparse
import asyncio
import logging
import os
import stat
import sys
from pathlib import Path
from typing import Any, List, Optional

Expand All @@ -29,7 +26,9 @@
ScannedWifiNetwork,
WifiCredentials,
)
from WifiManager import WifiManager
from wifi_handlers.AbstractWifiHandler import AbstractWifiManager
from wifi_handlers.networkmanager.networkmanager import NetworkManagerWifi
from wifi_handlers.wpa_supplicant.WifiManager import WifiManager

FRONTEND_FOLDER = Path.joinpath(Path(__file__).parent.absolute(), "frontend")
SERVICE_NAME = "wifi-manager"
Expand All @@ -38,7 +37,9 @@
init_logger(SERVICE_NAME)

logger.info("Starting Wifi Manager.")
wifi_manager = WifiManager()
wpa_manager = WifiManager()
network_manager = NetworkManagerWifi()
wifi_manager: Optional[AbstractWifiManager] = None


app = FastAPI(
Expand All @@ -52,22 +53,19 @@
@app.get("/status", summary="Retrieve status of wifi manager.")
@version(1, 0)
async def network_status() -> Any:
assert wifi_manager is not None
wifi_status = await wifi_manager.status()
logger.info("Status:")
for line in tabulate(list(wifi_status.items())).splitlines():
for line in tabulate(list(vars(wifi_status).items())).splitlines():
logger.info(line)
return wifi_status


@app.get("/scan", response_model=List[ScannedWifiNetwork], summary="Retrieve available wifi networks.")
@version(1, 0)
async def scan() -> Any:
logger.info("Trying to perform network scan.")
assert wifi_manager is not None
try:
available_networks = await wifi_manager.get_wifi_available()
logger.info("Available networks:")
for line in tabulate([network.dict() for network in available_networks], headers="keys").splitlines():
logger.info(line)
return available_networks
except BusyError as error:
raise StackedHTTPException(status_code=status.HTTP_425_TOO_EARLY, error=error) from error
Expand All @@ -76,86 +74,25 @@ async def scan() -> Any:
@app.get("/saved", response_model=List[SavedWifiNetwork], summary="Retrieve saved wifi networks.")
@version(1, 0)
async def saved() -> Any:
logger.info("Trying to fetch saved networks.")
assert wifi_manager is not None
saved_networks = await wifi_manager.get_saved_wifi_network()
logger.info("Saved networks:")
for line in tabulate([network.dict() for network in saved_networks], headers="keys").splitlines():
logger.info(line)
return saved_networks


@app.post("/connect", summary="Connect to wifi network.")
@version(1, 0)
async def connect(credentials: WifiCredentials, hidden: bool = False) -> Any:
logger.info(f"Trying to connect to '{credentials.ssid}'.")

network_id: Optional[int] = None
is_new_network = False
try:
saved_networks = await wifi_manager.get_saved_wifi_network()
match_network = next(filter(lambda network: network.ssid == credentials.ssid, saved_networks))
network_id = match_network.networkid
logger.info(f"Network is already known, id={network_id}.")
except StopIteration:
logger.info("Network is not known.")
is_new_network = True

is_secure = False
try:
available_networks = await wifi_manager.get_wifi_available()
scanned_network = next(filter(lambda network: network.ssid == credentials.ssid, available_networks))
flags_for_passwords = ["WPA", "WEP", "WSN"]
for candidate in flags_for_passwords:
if candidate in scanned_network.flags:
is_secure = True
break
except StopIteration:
logger.info("Could not find wifi network around.")

if credentials.password == "" and network_id is None and is_secure:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No password received and network not found among saved ones.",
)

try:
# Update known network if password is not necessary anymore
if network_id is not None and not is_secure and credentials.password == "":
logger.info(f"Removing old entry for known network, id={network_id}.")
await wifi_manager.remove_network(network_id)
network_id = await wifi_manager.add_network(credentials, hidden)
logger.info(f"Network entry updated, id={network_id}.")

if network_id is None:
network_id = await wifi_manager.add_network(credentials, hidden)
logger.info(f"Saving new network entry, id={network_id}.")

logger.info("Performing network connection.")
if network_id is None:
raise ValueError("Missing 'network_id' for network connection.")
await wifi_manager.connect_to_network(network_id, timeout=40)
except ConnectionError as error:
if is_new_network and network_id is not None:
logger.info("Removing new network entry since connection failed.")
await wifi_manager.remove_network(network_id)
raise error
logger.info(f"Successfully connected to '{credentials.ssid}'.")
assert wifi_manager is not None
await wifi_manager.try_connect_to_network(credentials, hidden)


@app.post("/remove", summary="Remove saved wifi network.")
@version(1, 0)
async def remove(ssid: str) -> Any:
logger.info(f"Trying to remove network '{ssid}'.")
assert wifi_manager is not None
logger.info(f"Processing remove request for SSID: {ssid}")
try:
saved_networks = await wifi_manager.get_saved_wifi_network()
# Here we get all networks that match the ssid
# and get a list where the biggest networkid comes first.
# If we remove the lowest numbers first, it'll change the highest values to -1
# TODO: We should move the entire wifi framestack to work with bssid
match_networks = [network for network in saved_networks if network.ssid == ssid]
match_networks = sorted(match_networks, key=lambda network: network.networkid, reverse=True)
for match_network in match_networks:
await wifi_manager.remove_network(match_network.networkid)
await wifi_manager.remove_network(ssid)
except StopIteration as error:
logger.info(f"Network '{ssid}' is unknown.")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Network '{ssid}' not saved.") from error
Expand All @@ -165,35 +102,40 @@ async def remove(ssid: str) -> Any:
@app.get("/disconnect", summary="Disconnect from wifi network.")
@version(1, 0)
async def disconnect() -> Any:
logger.info("Trying to disconnect from current network.")
assert wifi_manager is not None
await wifi_manager.disconnect()
logger.info("Successfully disconnected from network.")


@app.get("/hotspot", summary="Get hotspot state.")
@version(1, 0)
def hotspot_state() -> Any:
return wifi_manager.hotspot.is_running()
assert wifi_manager is not None
return wifi_manager.hotspot_is_running()


@app.get("/hotspot_extended_status", summary="Get extended hotspot status.")
@version(1, 0)
def hotspot_extended_state() -> HotspotStatus:
return HotspotStatus(supported=wifi_manager.hotspot.supports_hotspot, enabled=wifi_manager.hotspot.is_running())
async def hotspot_extended_state() -> HotspotStatus:
assert wifi_manager is not None
return HotspotStatus(
supported=await wifi_manager.supports_hotspot(), enabled=await wifi_manager.hotspot_is_running()
)


@app.post("/hotspot", summary="Enable/disable hotspot.")
@version(1, 0)
def toggle_hotspot(enable: bool) -> Any:
async def toggle_hotspot(enable: bool) -> Any:
assert wifi_manager is not None
if enable:
wifi_manager.enable_hotspot()
return
wifi_manager.disable_hotspot()
return await wifi_manager.enable_hotspot()
return await wifi_manager.disable_hotspot()


@app.post("/smart_hotspot", summary="Enable/disable smart-hotspot.")
@version(1, 0)
def toggle_smart_hotspot(enable: bool) -> Any:
assert wifi_manager is not None
if enable:
wifi_manager.enable_smart_hotspot()
return
Expand All @@ -203,86 +145,53 @@ def toggle_smart_hotspot(enable: bool) -> Any:
@app.get("/smart_hotspot", summary="Check if smart-hotspot is enabled.")
@version(1, 0)
def check_smart_hotspot() -> Any:
assert wifi_manager is not None
return wifi_manager.is_smart_hotspot_enabled()


@app.post("/hotspot_credentials", summary="Update hotspot credentials.")
@version(1, 0)
def set_hotspot_credentials(credentials: WifiCredentials) -> Any:
wifi_manager.set_hotspot_credentials(credentials)
async def set_hotspot_credentials(credentials: WifiCredentials) -> Any:
assert wifi_manager is not None
await wifi_manager.set_hotspot_credentials(credentials)


@app.get("/hotspot_credentials", summary="Get hotspot credentials.")
@version(1, 0)
def get_hotspot_credentials() -> Any:
assert wifi_manager is not None
return wifi_manager.hotspot_credentials()


app = VersionedFastAPI(app, version="1.0.0", prefix_format="/v{major}.{minor}", enable_latest=True)
app.mount("/", StaticFiles(directory=str(FRONTEND_FOLDER), html=True))


if __name__ == "__main__":
if os.geteuid() != 0:
logger.error("You need root privileges to run this script.\nPlease try again using **sudo**. Exiting.")
sys.exit(1)

async def async_start() -> None:
# pylint: disable=global-statement
global wifi_manager
parser = argparse.ArgumentParser(description="Abstraction CLI for WifiManager configuration.")
parser.add_argument(
"--socket",
dest="socket_name",
type=str,
help="Name of the WPA Supplicant socket. Usually 'wlan0' or 'wlp4s0'.",
)
args = parser.parse_args()
candidates = [wpa_manager, network_manager]
for implementation in candidates:
implementation.add_arguments(parser)
# we need to configure all arguments before parsing them, hence two loops
for implementation in candidates:
implementation.configure(parser.parse_args())
async_loop = asyncio.get_event_loop()
# Running uvicorn with log disabled so loguru can handle it
config = Config(app=app, loop=async_loop, host="0.0.0.0", port=9000, log_config=None)
server = Server(config)
for implementation in candidates:
can_work = await implementation.can_work()
logger.info(f"{implementation} can work: {can_work}")
if can_work:
logger.info(f"Using {implementation} as wifi manager.")
await implementation.start()
wifi_manager = implementation
break
await server.serve()

wpa_socket_folder = "/var/run/wpa_supplicant/"
try:
if args.socket_name:
logger.info("Connecting via provided socket.")
socket_name = args.socket_name
else:
logger.info("Connecting via default socket.")

def is_socket(file_path: str) -> bool:
try:
mode = os.stat(file_path).st_mode
return stat.S_ISSOCK(mode)
except Exception as error:
logger.warning(f"Could not check if '{file_path}' is a socket: {error}")
return False

# We are going to sort and get the latest file, since this in theory will be an external interface
# added by the user
entries = os.scandir(wpa_socket_folder)
available_sockets = sorted(
[
entry.path
for entry in entries
if entry.name.startswith(("wlan", "wifi", "wlp")) and is_socket(entry.path)
]
)
if not available_sockets:
raise RuntimeError("No wifi sockets available.")
socket_name = available_sockets[-1]
logger.info(f"Going to use {socket_name} file")
WLAN_SOCKET = os.path.join(wpa_socket_folder, socket_name)
wifi_manager.connect(WLAN_SOCKET)
except Exception as socket_connection_error:
logger.warning(f"Could not connect with wifi socket. {socket_connection_error}")
logger.info("Connecting via internet wifi socket.")
try:
wifi_manager.connect(("127.0.0.1", 6664))
except Exception as udp_connection_error:
logger.error(f"Could not connect with internet socket: {udp_connection_error}. Exiting.")
sys.exit(1)

if __name__ == "__main__":
loop = asyncio.new_event_loop()

# # Running uvicorn with log disabled so loguru can handle it
config = Config(app=app, loop=loop, host="0.0.0.0", port=9000, log_config=None)
server = Server(config)

loop.create_task(wifi_manager.auto_reconnect(60))
loop.create_task(wifi_manager.start_hotspot_watchdog())
loop.run_until_complete(server.serve())
loop.run_until_complete(async_start())
20 changes: 20 additions & 0 deletions core/services/wifi/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@ class HotspotStatus(BaseModel):
enabled: bool


class WifiStatus(BaseModel):
bssid: Optional[str]
freq: Optional[str]
ssid: Optional[str]
id: Optional[str]
mode: Optional[str]
wifi_generation: Optional[str]
pairwise_cipher: Optional[str]
group_cipher: Optional[str]
key_mgmt: Optional[str]
wpa_state: Optional[str]
ip_address: Optional[str]
p2p_device_address: Optional[str]
address: Optional[str]
uuid: Optional[str]
ieee80211ac: Optional[str]
state: Optional[str]
disabled: Optional[str]


class ScannedWifiNetwork(BaseModel):
ssid: Optional[str]
bssid: str
Expand Down
Loading

0 comments on commit a4afc26

Please sign in to comment.