i3-companion: also listen to DBus for NetworkManager

As a proof of concept add notifications for events.
This commit is contained in:
Vincent Bernat 2021-07-11 22:05:36 +02:00
parent e8b7487512
commit 83ba6b5a23
2 changed files with 73 additions and 1 deletions

View file

@ -12,14 +12,19 @@ import shlex
import subprocess
import html
import functools
import collections
from i3ipc.aio import Connection
from i3ipc import Event
from systemd import journal
import ravel
import dbussy
logger = logging.getLogger("i3-companion")
DBusSignal = collections.namedtuple(
"DBusSignal", ["path", "interface", "member", "signature"]
)
def on(*events):
@ -370,9 +375,53 @@ async def output_update(i3, event):
)
@on(
DBusSignal(
path="/",
interface="org.freedesktop.NetworkManager.Connection.Active",
member="StateChanged",
signature="uu",
)
)
async def network_manager_update(i3, event, path, state, reason):
logger.debug("from %s state: %d, reason: %d", path, state, reason)
NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2
if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}:
return
peer = i3.system_bus["org.freedesktop.NetworkManager"][path]
try:
interface = await peer.get_async_interface(
"org.freedesktop.NetworkManager.Connection.Active"
)
except dbussy.DBusError:
logger.info("interface %s has vanished", path)
return
kind = await interface.Type
id = await interface.Id
if kind == "vpn":
await notify(
i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!"
)
elif kind == "802-3-ethernet":
await notify(
i3,
app_icon="network-wired",
summary=f"{id}",
body="Ethernet connection established!",
)
elif kind == "802-11-wireless":
await notify(
i3,
app_icon="network-wireless",
summary=f"{id}",
body="Wireless connection established!",
)
async def main(options):
i3 = await Connection().connect()
i3.session_bus = await ravel.session_bus_async()
i3.system_bus = await ravel.system_bus_async()
# Regular events
for fn, events in on.functions.items():
@ -401,6 +450,29 @@ async def main(options):
i3.on(Event.BINDING, binding_event)
# Listen to DBus events
for fn, events in on.functions.items():
for event in events:
if isinstance(event, DBusSignal):
for bus in {i3.session_bus, i3.system_bus}:
@ravel.signal(
name=event.member,
in_signature=event.signature,
path_keyword="path",
args_keyword="args",
)
async def wrapped(path, args):
return await fn(i3, event, path, *args)
bus.listen_signal(
path=event.path,
fallback=True,
interface=event.interface,
name=event.member,
func=wrapped,
)
await i3.main()

View file

@ -52,7 +52,7 @@
icon_position = left
min_icon_size = 32
max_icon_size = 32
icon_path = /usr/share/icons/Adwaita/64x64/status/:/usr/share/icons/Adwaita/64x64/devices/:/usr/share/icons/Adwaita/64x64/apps/:/usr/share/icons/Adwaita/256x256/legacy/:/usr/share/icons/hicolor/scalable/apps/:/usr/share/icons/hicolor/64x64/apps/:/usr/share/icons/gnome/256x256/actions/
icon_path = /usr/share/icons/Adwaita/64x64/status/:/usr/share/icons/Adwaita/64x64/devices/:/usr/share/icons/Adwaita/64x64/apps/:/usr/share/icons/Adwaita/256x256/legacy/:/usr/share/icons/hicolor/scalable/apps/:/usr/share/icons/hicolor/64x64/apps/:/usr/share/icons/gnome/256x256/actions/:/usr/share/icons/Adwaita/48x48/legacy/
# History
sticky_history = yes