Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for router firmware #85

Merged
merged 8 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ Options:
--bootloader-baudrate NUMBERS [default: 115200]
--cpc-baudrate NUMBERS [default: 460800, 115200, 230400]
--ezsp-baudrate NUMBERS [default: 115200]
--spinel-baudrate NUMBERS [default: 460800]
--probe-method TEXT [default: bootloader, cpc, ezsp, spinel]
--router-baudrate NUMBERS [default: 115200]
--spinel-baudrate NUMBERS [default: 460800]
--probe-method TEXT [default: bootloader, cpc, ezsp, spinel,
router]
--bootloader-reset [yellow|ihost|slzb07|sonoff]
--help Show this message and exit.

Expand Down
2 changes: 1 addition & 1 deletion universal_silabs_flasher/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class VersionComponent:

@functools.total_ordering
class Version:
_SEPARATORS = {".", "-", "/", "_", " build "}
_SEPARATORS = {".", "-", "/", "_", " build ", " GA build "}
_SEPARATORS_REGEX = re.compile(
"(" + "|".join(re.escape(s) for s in _SEPARATORS) + ")"
)
Expand Down
4 changes: 4 additions & 0 deletions universal_silabs_flasher/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

class FirmwareImageType(enum.Enum):
ZIGBEE_NCP = "zigbee_ncp"
ZIGBEE_ROUTER = "zigbee_router"
OPENTHREAD_RCP = "openthread_rcp"
ZWAVE_NCP = "zwave_ncp"
BOOTLOADER = "bootloader"
Expand All @@ -26,13 +27,15 @@ class ApplicationType(enum.Enum):
CPC = "cpc"
EZSP = "ezsp"
SPINEL = "spinel"
ROUTER = "router"


FW_IMAGE_TYPE_TO_APPLICATION_TYPE = {
FirmwareImageType.ZIGBEE_NCP: ApplicationType.EZSP,
FirmwareImageType.MULTIPAN: ApplicationType.CPC,
FirmwareImageType.OPENTHREAD_RCP: ApplicationType.SPINEL,
FirmwareImageType.BOOTLOADER: ApplicationType.GECKO_BOOTLOADER,
FirmwareImageType.ZIGBEE_ROUTER: ApplicationType.ROUTER,
}


Expand All @@ -41,6 +44,7 @@ class ApplicationType(enum.Enum):
ApplicationType.CPC: [460800, 115200, 230400],
ApplicationType.EZSP: [115200],
ApplicationType.SPINEL: [460800],
ApplicationType.ROUTER: [115200],
}


Expand Down
10 changes: 10 additions & 0 deletions universal_silabs_flasher/flash.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ def convert(self, value: tuple | str, param: click.Parameter, ctx: click.Context
type=CommaSeparatedNumbers(),
show_default=True,
)
@click.option(
"--router-baudrate",
default=DEFAULT_BAUDRATES[ApplicationType.ROUTER],
type=CommaSeparatedNumbers(),
show_default=True,
)
@click.option(
"--spinel-baudrate",
default=DEFAULT_BAUDRATES[ApplicationType.SPINEL],
Expand All @@ -149,6 +155,7 @@ def main(
bootloader_baudrate: list[int],
cpc_baudrate: list[int],
ezsp_baudrate: list[int],
router_baudrate: list[int],
spinel_baudrate: list[int],
probe_method: list[ApplicationType],
bootloader_reset: str | None,
Expand Down Expand Up @@ -190,6 +197,7 @@ def main(
ApplicationType.GECKO_BOOTLOADER: bootloader_baudrate,
ApplicationType.CPC: cpc_baudrate,
ApplicationType.EZSP: ezsp_baudrate,
ApplicationType.ROUTER: router_baudrate,
ApplicationType.SPINEL: spinel_baudrate,
},
probe_methods=probe_method,
Expand Down Expand Up @@ -341,6 +349,8 @@ async def flash(

if flasher.app_type == ApplicationType.EZSP:
running_image_type = FirmwareImageType.ZIGBEE_NCP
elif flasher.app_type == ApplicationType.ROUTER:
running_image_type = FirmwareImageType.ZIGBEE_ROUTER
elif flasher.app_type == ApplicationType.SPINEL:
running_image_type = FirmwareImageType.OPENTHREAD_RCP
elif flasher.app_type == ApplicationType.CPC:
Expand Down
20 changes: 20 additions & 0 deletions universal_silabs_flasher/flasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .firmware import FirmwareImage
from .gecko_bootloader import GeckoBootloaderProtocol, NoFirmwareError
from .gpio import find_gpiochip_by_label, send_gpio_pattern
from .router import RouterProtocol
from .spinel import SpinelProtocol
from .xmodemcrc import BLOCK_SIZE as XMODEM_BLOCK_SIZE

Expand All @@ -48,6 +49,7 @@ def __init__(
ApplicationType.GECKO_BOOTLOADER,
ApplicationType.CPC,
ApplicationType.EZSP,
ApplicationType.ROUTER,
ApplicationType.SPINEL,
),
device: str,
Expand Down Expand Up @@ -103,6 +105,9 @@ def _connect_cpc(self, baudrate: int):
def _connect_ezsp(self, baudrate: int):
return connect_ezsp(self._device, baudrate)

def _connect_router(self, baudrate: int):
return connect_protocol(self._device, baudrate, RouterProtocol)

def _connect_spinel(self, baudrate: int):
return connect_protocol(self._device, baudrate, SpinelProtocol)

Expand Down Expand Up @@ -150,6 +155,16 @@ async def probe_ezsp(self, baudrate: int) -> ProbeResult:
continue_probing=False,
)

async def probe_router(self, baudrate: int) -> ProbeResult:
async with self._connect_router(baudrate) as router:
version = await router.probe()

return ProbeResult(
version=version,
baudrate=baudrate,
continue_probing=False,
)

async def probe_spinel(self, baudrate: int) -> ProbeResult:
async with self._connect_spinel(baudrate) as spinel:
version = await spinel.probe()
Expand Down Expand Up @@ -194,6 +209,7 @@ async def probe_app_type(
ApplicationType.CPC: self.probe_cpc,
ApplicationType.EZSP: self.probe_ezsp,
ApplicationType.SPINEL: self.probe_spinel,
ApplicationType.ROUTER: self.probe_router,
}

for probe_method, baudrate in (
Expand Down Expand Up @@ -266,6 +282,10 @@ async def enter_bootloader(self) -> None:
async with self._connect_spinel(self.app_baudrate) as spinel:
async with asyncio_timeout(PROBE_TIMEOUT):
await spinel.enter_bootloader()
elif self.app_type is ApplicationType.ROUTER:
async with self._connect_router(self.app_baudrate) as router:
async with asyncio_timeout(PROBE_TIMEOUT):
await router.enter_bootloader()
elif self.app_type is ApplicationType.EZSP:
async with self._connect_ezsp(self.app_baudrate) as ezsp:
try:
Expand Down
107 changes: 107 additions & 0 deletions universal_silabs_flasher/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import annotations

import asyncio
import enum
import logging
import re

from zigpy.serial import SerialProtocol

from .common import PROBE_TIMEOUT, StateMachine, Version, asyncio_timeout

_LOGGER = logging.getLogger(__name__)

ROUTER_INFO_REGEX = re.compile(rb"stack ver\. \[(?P<version>.*?)\]\r\n")


class State(str, enum.Enum):
STARTUP = "startup"
BOOTWAIT = "bootwait"
INFO = "info"
READY = "ready"


class RouterCommand(bytes, enum.Enum):
INFO = b"version\r\n"
BL_REBOOT = b"bootloader reboot\r\n"


class RouterProtocol(SerialProtocol):
def __init__(self) -> None:
super().__init__()
self._state_machine = StateMachine(
states=list(State),
initial=State.STARTUP,
)
self._version: str | None = None

async def probe(self) -> Version:
"""Attempt to communicate with the router."""
async with asyncio_timeout(PROBE_TIMEOUT):
return await self.router_info()

async def router_info(self) -> Version:
"""Get the router version."""
await self.activate_prompt()
self._state_machine.state = State.INFO
self.send_data(RouterCommand.INFO)

await self._state_machine.wait_for_state(State.READY)

assert self._version is not None
return Version(self._version)

async def activate_prompt(self) -> None:
"""Send enter key to activate CLI prompt."""
if self._state_machine.state == State.STARTUP:
await asyncio.sleep(0.5)
self.send_data(b"\r\n")
await self._state_machine.wait_for_state(State.READY)

def send_data(self, data: bytes) -> None:
assert self._transport is not None
_LOGGER.debug("Sending data %s", data)
self._transport.write(data)

def data_received(self, data: bytes) -> None:
super().data_received(data)

while self._buffer:
_LOGGER.debug("Parsing %s: %r", self._state_machine.state, self._buffer)
if self._state_machine.state == State.STARTUP:
if b"\n>" not in self._buffer:
return

self._buffer.clear()
self._state_machine.state = State.READY

if self._state_machine.state == State.INFO:
match = ROUTER_INFO_REGEX.search(self._buffer)

if match is None:
return

self._version = match.group("version").decode("ascii")
_LOGGER.debug("Detected version string %r", self._version)

self._buffer.clear()
self._state_machine.state = State.READY

elif self._state_machine.state == State.BOOTWAIT:
if b"Gecko Bootloader" not in self._buffer:
return

_LOGGER.debug("Bootloader started")

self._buffer.clear()
self._state_machine.state = State.READY

elif self._state_machine.state == State.READY:
self._buffer.clear()

async def enter_bootloader(self) -> None:
await self.activate_prompt()
self._state_machine.state = State.BOOTWAIT

self.send_data(RouterCommand.BL_REBOOT)
await self._state_machine.wait_for_state(State.READY)