Skip to content

Z2MClient

Zigbee2MQTT HTTP ingress client with three-tier auth fallback.

zigporter.z2m_client.Z2MClient(ha_url: str, ha_token: str, z2m_url: str, verify_ssl: bool = True, mqtt_topic: str = 'zigbee2mqtt')

Zigbee2MQTT client.

Auth strategy for ingress requests
  1. Try the request with Authorization: Bearer <token> directly. Works when HA ingress forwards the header (some reverse-proxy setups).
  2. If the response is not JSON, exchange the Bearer token for an ingress session cookie via POST /api/hassio/ingress/session and retry.
  3. If the session exchange fails (no Supervisor, proxy blocks the path, non-admin token), fall back to HA-native APIs:
  4. Device listing / lookup: HA device registry via WebSocket
  5. permit_join / rename: mqtt.publish service call via WebSocket

The mqtt_topic parameter (default "zigbee2mqtt") must match the base topic configured in Z2M. Override via the Z2M_MQTT_TOPIC env var.

PARAMETER DESCRIPTION
ha_url

Base URL of the Home Assistant instance (e.g. "http://homeassistant.local:8123").

TYPE: str

ha_token

Long-lived HA access token. Used for ingress session exchange and as the fallback HA-native client credential.

TYPE: str

z2m_url

Full ingress URL for the Z2M add-on (e.g. "http://homeassistant.local:8123/api/hassio_ingress/<slug>").

TYPE: str

verify_ssl

Set to False to disable TLS certificate verification for self-signed certificates.

TYPE: bool DEFAULT: True

mqtt_topic

Z2M base MQTT topic. Must match the base_topic setting in your Z2M configuration (default: "zigbee2mqtt").

TYPE: str DEFAULT: 'zigbee2mqtt'

Source code in src/zigporter/z2m_client.py
def __init__(
    self,
    ha_url: str,
    ha_token: str,
    z2m_url: str,
    verify_ssl: bool = True,
    mqtt_topic: str = "zigbee2mqtt",
) -> None:
    """
    Args:
        ha_url: Base URL of the Home Assistant instance
            (e.g. ``"http://homeassistant.local:8123"``).
        ha_token: Long-lived HA access token. Used for ingress session
            exchange and as the fallback HA-native client credential.
        z2m_url: Full ingress URL for the Z2M add-on
            (e.g. ``"http://homeassistant.local:8123/api/hassio_ingress/<slug>"``).
        verify_ssl: Set to ``False`` to disable TLS certificate verification
            for self-signed certificates.
        mqtt_topic: Z2M base MQTT topic. Must match the ``base_topic``
            setting in your Z2M configuration (default: ``"zigbee2mqtt"``).
    """
    self._ha_url = ha_url.rstrip("/")
    self._ha_token = ha_token
    self._z2m_url = z2m_url.rstrip("/")
    self._verify_ssl = verify_ssl
    self._mqtt_topic = mqtt_topic
    self._session_token: str | None = None
    self._ha_client_instance: Any = None

disable_permit_join() -> None async

Close the Z2M network to new joiners.

Source code in src/zigporter/z2m_client.py
async def disable_permit_join(self) -> None:
    """Close the Z2M network to new joiners."""
    try:
        await self._post("/api/permit_join", {"time": 0, "device": None})
    except RuntimeError:
        await self._mqtt_publish(
            f"{self._mqtt_topic}/bridge/request/permit_join",
            json.dumps({"time": 0}),
        )

enable_permit_join(seconds: int = 120) -> None async

Open the Z2M network for new devices to join.

Source code in src/zigporter/z2m_client.py
async def enable_permit_join(self, seconds: int = 120) -> None:
    """Open the Z2M network for new devices to join."""
    try:
        await self._post("/api/permit_join", {"time": seconds, "device": None})
    except RuntimeError:
        await self._mqtt_publish(
            f"{self._mqtt_topic}/bridge/request/permit_join",
            json.dumps({"time": seconds}),
        )

get_device_by_ieee(ieee: str) -> dict[str, Any] | None async

Find a Z2M device by IEEE address. Returns None if not found.

Source code in src/zigporter/z2m_client.py
async def get_device_by_ieee(self, ieee: str) -> dict[str, Any] | None:
    """Find a Z2M device by IEEE address. Returns None if not found."""
    devices = await self.get_devices()
    target = normalize_ieee(ieee)
    for device in devices:
        if normalize_ieee(device.get("ieee_address", "")) == target:
            return device
    return None

get_devices() -> list[dict[str, Any]] async

Return the full Z2M device list.

Source code in src/zigporter/z2m_client.py
async def get_devices(self) -> list[dict[str, Any]]:
    """Return the full Z2M device list."""
    try:
        return await self._get("/api/devices")
    except (RuntimeError, httpx.HTTPStatusError, httpx.RequestError):
        return await self._get_devices_via_ha()

get_network_map(timeout: int = _NETWORK_MAP_TIMEOUT) -> dict[str, Any] async

Return the raw Z2M network map (nodes + links).

Tries the Z2M HTTP REST endpoint first. Z2M 2.x removed that endpoint, so falls back to subscribing to the MQTT response topic and publishing the request via HA's WebSocket call_service API.

timeout controls how long to wait for Z2M to respond via MQTT. Large meshes with many routers need more time — Z2M scans each router sequentially, so allow ~10 s per router.

Source code in src/zigporter/z2m_client.py
async def get_network_map(self, timeout: int = _NETWORK_MAP_TIMEOUT) -> dict[str, Any]:
    """Return the raw Z2M network map (nodes + links).

    Tries the Z2M HTTP REST endpoint first. Z2M 2.x removed that endpoint,
    so falls back to subscribing to the MQTT response topic and publishing
    the request via HA's WebSocket ``call_service`` API.

    ``timeout`` controls how long to wait for Z2M to respond via MQTT.
    Large meshes with many routers need more time — Z2M scans each router
    sequentially, so allow ~10 s per router.
    """
    try:
        return await self._get("/api/networkmap?type=raw")
    except (RuntimeError, httpx.HTTPStatusError, httpx.RequestError):
        return await self._get_network_map_via_mqtt(timeout=timeout)

rename_device(current_name: str, new_name: str) -> None async

Rename a Z2M device by its current friendly name.

Source code in src/zigporter/z2m_client.py
async def rename_device(self, current_name: str, new_name: str) -> None:
    """Rename a Z2M device by its current friendly name."""
    try:
        await self._post("/api/device", {"id": current_name, "rename": new_name})
    except RuntimeError:
        # homeassistant_rename=True also updates HA entity IDs
        await self._mqtt_publish(
            f"{self._mqtt_topic}/bridge/request/device/rename",
            json.dumps({"from": current_name, "to": new_name, "homeassistant_rename": True}),
        )

wait_for_interview(ieee: str, timeout: int = 300, on_event: Any = None) -> tuple[str, dict[str, Any] | None] async

Subscribe to Z2M bridge events and wait for device interview completion.

Subscribes to zigbee2mqtt/bridge/event via HA WebSocket and streams events until the target device's interview completes, fails, or the timeout expires.

on_event(event_type, data) is called for every bridge event received so the caller can display status updates or detect unexpected joiners.

Returns a tuple (status, data):

  • ("successful", data) — interview completed (or device re-announced)
  • ("failed", data) — interview failed
  • ("timeout", None) — timed out without a result
Source code in src/zigporter/z2m_client.py
async def wait_for_interview(
    self,
    ieee: str,
    timeout: int = 300,
    on_event: Any = None,
) -> tuple[str, dict[str, Any] | None]:
    """Subscribe to Z2M bridge events and wait for device interview completion.

    Subscribes to ``zigbee2mqtt/bridge/event`` via HA WebSocket and streams
    events until the target device's interview completes, fails, or the
    timeout expires.

    ``on_event(event_type, data)`` is called for every bridge event received
    so the caller can display status updates or detect unexpected joiners.

    Returns a tuple ``(status, data)``:

    - ``("successful", data)`` — interview completed (or device re-announced)
    - ``("failed", data)``    — interview failed
    - ``("timeout", None)``   — timed out without a result
    """
    target = normalize_ieee(ieee)
    event_topic = f"{self._mqtt_topic}/bridge/event"
    ha = self._ha_client()
    deadline = time.monotonic() + timeout

    async with ha._ws_session() as ws:
        await ws.send(json.dumps({"id": 1, "type": "mqtt/subscribe", "topic": event_topic}))

        while True:
            remaining = deadline - time.monotonic()
            if remaining <= 0:
                return "timeout", None

            try:
                raw = await asyncio.wait_for(ws.recv(), timeout=min(remaining, 2.0))
            except asyncio.TimeoutError:
                continue

            try:
                msg = json.loads(raw)
            except (json.JSONDecodeError, ValueError):
                continue

            # Validate subscribe ACK
            if msg.get("id") == 1 and msg.get("type") == "result":
                if not msg.get("success"):
                    raise RuntimeError(f"mqtt/subscribe to bridge/event failed: {msg}")
                continue

            if msg.get("type") != "event" or msg.get("id") != 1:
                continue

            try:
                payload = json.loads(msg.get("event", {}).get("payload", "{}"))
            except (json.JSONDecodeError, ValueError):
                continue

            event_type = payload.get("type")
            data = payload.get("data") or {}

            if on_event is not None:
                on_event(event_type, data)

            event_ieee = normalize_ieee(data.get("ieee_address", ""))
            if event_ieee != target:
                continue

            # A device_announce means the device is already paired and working.
            if event_type == "device_announce":
                return "successful", data

            if event_type == "device_interview":
                interview_status = data.get("status")
                if interview_status == "successful":
                    return "successful", data
                if interview_status == "failed":
                    return "failed", data