#!/usr/bin/python3 """Personal i3 companion.""" import argparse import asyncio import collections import errno import functools import glob import html import logging import logging.handlers import os import re import shlex import subprocess import sys import types import i3ipc from i3ipc.aio import Connection from systemd import journal import ravel import dbussy from Xlib import display from Xlib.ext import randr def icon(font_number, char): """Turn an icon into a string for Polybar.""" # Font number is from Polybar configuration. # 2: https://fontawesome.com/v6.0/icons?s=solid # 3: https://fontawesome.com/v6.0/icons?s=brands return "%%{T%d}%s%%{T-}" % (font_number, char) # Configuration application_icons = { "chromium": icon(3, ""), "discord": icon(3, ""), "d-feet": icon(2, ""), "emacs": icon(2, ""), "firefox": icon(3, ""), "gimp": icon(2, ""), "gitg": icon(2, ""), "google-chrome": icon(3, ""), "inkscape": icon(2, ""), "libreoffice": icon(2, ""), "mpv": icon(2, ""), "pavucontrol": icon(2, ""), "signal": icon(2, ""), "snes9x-gtk": icon(2, ""), "spotify": icon(3, ""), "steam": icon(3, ""), "vbeterm": icon(2, ""), "zathura": icon(2, ""), "zoom": icon(2, ""), } icons = { "access-point": icon(2, ""), "bluetooth": icon(2, ""), "camera": icon(2, "⎙"), "car": icon(2, "🚘"), "gamepad": icon(2, "🎮"), "headphones": icon(2, "🎧"), "headset": icon(2, ""), "keyboard": icon(2, "⌨"), "laptop": icon(2, "💻"), "loudspeaker": icon(2, ""), "microphone": icon(2, ""), "mouse": icon(2, ""), "notifications-disabled": icon(2, "🔕"), "notifications-enabled": icon(2, ""), "nowifi": icon(2, ""), "phone": icon(2, "📞"), "printer": icon(2, "⎙"), "scanner": icon(2, ""), "unknown": icon(2, ""), "vpn": icon(2, ""), "webcam": icon(2, ""), "wifi-high": icon(2, ""), "wifi-low": icon(2, ""), "wifi-medium": icon(2, ""), "wired": icon(2, ""), } application_icons_nomatch = icon(2, "") application_icons_alone = {application_icons[k] for k in {"vbeterm"}} exclusive_apps = {"emacs", "firefox"} intrusive_apps = {"vbeterm"} logger = logging.getLogger("i3-companion") # Events for @on decorator DBusSignal = collections.namedtuple( "DBusSignal", ["interface", "member", "signature", "system", "path", "onlyif"], defaults=(True, "/", None), ) StartEvent = object() I3Event = i3ipc.Event CommandEvent = collections.namedtuple("CommandEvent", ["name"]) NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2 NM_DEVICE_TYPE_ETHERNET = 1 NM_DEVICE_TYPE_WIFI = 2 NM_DEVICE_TYPE_MODEM = 8 NM_DEVICE_STATE_UNMANAGED = 10 NM_DEVICE_STATE_ACTIVATED = 100 # Event helpers def static(**kwargs): """Define static variables for the event handler.""" def decorator(fn): for k, v in kwargs.items(): setattr(fn, k, v) return fn return decorator @static(functions={}) def on(*events): """Tag events that should be provided to the function.""" def decorator(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): return fn(*args, **kwargs) on.functions[fn] = events return wrapper return decorator def retry(max_retries): """Retry an async function.""" def decorator(fn): @functools.wraps(fn) async def wrapper(*args, **kwargs): retries = max_retries while True: try: logger.debug( "execute %s (remaining tries: %s)", fn, retries ) return await fn(*args, **kwargs) except Exception as e: if retries > 0: retries -= 1 logger.warning( f"while executing {fn} (remaining tries: %d): %s", retries, e, ) else: logger.exception(f"while executing {fn}: %s", e) return return wrapper return decorator def debounce(sleep, *, unless=None): """Debounce a function call (batch successive calls into only one). Optional immediate execution. Ensure only one instance is executed. It is assumed the arguments provided to the debounced function have no effect on its execution.""" def decorator(fn): async def worker(): while True: try: # Wait for an urgent work or until sleep is elapsed await asyncio.wait_for( workers[fn].urgent.wait(), timeout=sleep ) logger.debug("urgent work received for %s", fn) except asyncio.TimeoutError: pass args, kwargs = workers[fn].queue workers[fn].queue = None workers[fn].urgent.clear() # Execute the work logger.debug("execute work for %s", fn) try: await fn(*args, **kwargs) except Exception as e: logger.debug("while running %s, worker got %s", fn, e) workers[fn] = None raise # Do we still have something to do? if workers[fn].queue is None: break # No more work logger.debug("no more work for %s", fn) workers[fn] = None @functools.wraps(fn) async def wrapper(*args, **kwargs): if workers[fn] is None: logger.debug("create new worker for %s", fn) workers[fn] = types.SimpleNamespace() workers[fn].task = asyncio.create_task(worker()) workers[fn].urgent = asyncio.Event() workers[fn].queue = (args, kwargs) else: logger.debug("enqueue new work for %s", fn) if unless is not None and unless(*args, **kwargs): logger.debug("wake up now for %s", fn) workers[fn].urgent.set() return await workers[fn].task workers[fn] = None return wrapper workers = {} return decorator # Other helpers async def notify(i3, **kwargs): """Send a notification with notify-send.""" conn = i3.session_bus["org.freedesktop.Notifications"] obj = conn["/org/freedesktop/Notifications"] notifications = await obj.get_async_interface( "org.freedesktop.Notifications" ) parameters = dict( app_name=logger.name, replaces_id=0, app_icon="dialog-information", summary="", actions=[], hints={}, expire_timeout=5000, ) parameters.update(kwargs) return await notifications.Notify(**parameters) def polybar(module, content): """Update Polybar module with the provided content.""" # Update cache file (for when polybar restarts) with open(f"{os.getenv('XDG_RUNTIME_DIR')}/i3/{module}.txt", "w") as out: out.write(content) # Send it to polybar for name in glob.glob("/tmp/polybar_mqueue.*"): try: with open(os.open(name, os.O_WRONLY | os.O_NONBLOCK), "w") as out: cmd = f"action:#{module}.send.{content}" out.write(cmd) except OSError as e: if e.errno != errno.ENXIO: raise # Event handlers @on(StartEvent, I3Event.WINDOW_MOVE, I3Event.WINDOW_NEW, I3Event.WINDOW_CLOSE) @debounce(0.2) async def workspace_rename(i3, event): """Rename workspaces using icons to match what's inside it.""" tree = await i3.get_tree() workspaces = tree.workspaces() commands = [] def application_icon(window): """Get application icon for a window.""" for attr in ("window_instance", "window_class"): name = getattr(window, attr, None) if name is None: continue for k, v in application_icons.items(): if re.match(rf"^{k}\b", name, re.IGNORECASE): logger.debug( "in %s, found '%s', matching %s", attr, name, k ) return v return application_icons_nomatch for workspace in workspaces: icons = set() for window in workspace.leaves(): if window.sticky: continue icon = application_icon(window) if icon is not None: icons.add(icon) if any([i not in application_icons_alone for i in icons]): icons -= application_icons_alone new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":") if workspace.name != new_name: logger.debug("rename workspace %s", workspace.num) command = f'rename workspace "{workspace.name}" to "{new_name}"' commands.append(command) await i3.command(";".join(commands)) async def _new_workspace(i3): workspaces = await i3.get_workspaces() workspace_nums = {w.num for w in workspaces} max_num = max(workspace_nums) available = (set(range(1, max_num + 2)) - workspace_nums).pop() logger.info(f"create new workspace number {available}") await i3.command(f'workspace number "{available}"') return available @on(CommandEvent("new-workspace"), CommandEvent("move-to-new-workspace")) async def new_workspace(i3, event): """Create a new workspace and optionally move a window to it.""" # Get the currently focused window if event == "move-to-new-workspace": tree = await i3.get_tree() current = tree.find_focused() if not current: return num = await _new_workspace(i3) # Move the window to this workspace if event == "move-to-new-workspace": await current.command( f"move container to workspace " f'number "{num}"' ) @on(I3Event.WINDOW_NEW) async def worksplace_exclusive(i3, event): """Move new windows on a new workspace instead of sharing a workspace with an exclusive app.""" w = event.container def can_intrude(w): """Can this new window intrude any workspace?""" if w.floating in {"auto_on", "user_on"}: return True if w.ipc_data["window_type"] not in {"normal", "splash", "unknown"}: return True if w.sticky: return True ids = { s is not None and s.lower() or None for s in {w.name, w.window_class, w.window_instance} } if ids.intersection(intrusive_apps): return True # Can the new window just intrude? if can_intrude(w): logger.debug("window %s can intrude", w.name) return # Get the workspace. From an event, w.workspace() is None, so # search it in the tree. tree = await i3.get_tree() workspace = next( (ow.workspace() for ow in tree.leaves() if w.id == ow.id), None ) if not workspace: return # Does the target workspace contains an exclusive app? ids = { s is not None and s.lower() or None for ow in workspace.leaves() for s in {ow.name, ow.window_class, ow.window_instance} if w.id != ow.id } exclusives = ids.intersection(exclusive_apps) if not exclusives: logger.debug("no exclusive app, %s can go there", w.name) return # Create a new workspace and move the window here num = await _new_workspace(i3) logger.info(f"move window {w.window_class} to workspace {num}") await w.command(f'move container to workspace number "{num}"') @on(CommandEvent("quake-console")) async def quake_console(i3, event): """Spawn a quake console or toggle an existing one.""" try: _, term_exec, term_name, height = event.split(":") height = float(height) except Exception as exc: logger.warn(f"unable to parse payload {event}: {exc}") return tree = await i3.get_tree() term = tree.find_instanced(term_name) if not term: await i3.command(f"exec exec {term_exec} --name {term_name}") tries = 5 while not term and tries: tree = await i3.get_tree() term = tree.find_instanced(term_name) await asyncio.sleep(0.2) tries -= 1 if not term: raise RuntimeError("unable to spawn terminal") term = term[0] workspaces = await i3.get_workspaces() workspace = [ws for ws in workspaces if ws.focused][0] ws_x = workspace.rect.x ws_y = workspace.rect.y ws_width = workspace.rect.width ws_height = workspace.rect.height width = ws_width height = int(ws_height * height) posx = ws_x posy = ws_y command = ( f"[instance={term_name}] " "border none," f"resize set {width} px {height} px," "scratchpad show," f"move absolute position {posx}px {posy}px" ) logger.debug("QuakeConsole: %s", command) await i3.command(command) @on(CommandEvent("container-info")) @static(last_id=0) async def container_info(i3, event): """Show information about the focused container.""" tree = await i3.get_tree() window = tree.find_focused() if not window: return logger.info(f"window raw information: {window.ipc_data}") summary = "About focused container" r = window.rect w = window info = { "name": w.name, "title": w.window_title, "class": w.window_class, "instance": w.window_instance, "role": w.window_role, "type": w.ipc_data["window_type"], "sticky": w.sticky, "floating": w.floating, "geometry": f"{r.width}×{r.height}+{r.x}+{r.y}", "layout": w.layout, "parcent": w.percent, "marks": ", ".join(w.marks) or "(none)", } body = "\n".join( ( f"{k:10} {html.escape(str(v))}" for k, v in info.items() if v is not None ) ) result = await notify( i3, app_icon="system-search", expire_timeout=10000, summary=summary, body=body, replaces_id=container_info.last_id, ) container_info.last_id = result[0] @on(CommandEvent("workspace-info")) @static(last_id=0) async def workspace_info(i3, event): """Show information about the focused workspace.""" workspaces = await i3.get_workspaces() focused = [w for w in workspaces if w.focused] if not focused: return workspace = focused[0] summary = f"Workspace {workspace.num} on {workspace.output}" tree = await i3.get_tree() workspace = [w for w in tree.workspaces() if w.num == workspace.num] def format(container): if container.focused: style = 'foreground="#ffaf00"' elif not container.window: style = 'foreground="#6c98ee"' else: style = "" if container.window: content = ( f"{(container.window_class or '???').lower()}: " f"{(container.window_title or '???')}" ) elif container.type == "workspace" and not container.nodes: # Empty workspaces use workspace_layout, but when default, # this is layout... layout = container.ipc_data["workspace_layout"] if layout == "default": layout = container.layout content = f"({layout})" else: content = f"({container.layout})" root = f"{content.lower()}" children = [] for child in container.nodes: if child == container.nodes[-1]: first = "└─" others = " " else: first = "├─" others = "│ " content = format(child).replace("\n", f"\n{others}") children.append(f"{first}{content}") children.insert(0, root) return "\n".join(children) body = format(workspace[0]).lstrip("\n") result = await notify( i3, app_icon="system-search", expire_timeout=20000, summary=summary, body=body, replaces_id=workspace_info.last_id, ) workspace_info.last_id = result[0] @on(I3Event.OUTPUT, StartEvent) @static(last_setup=None) @debounce(2) async def output_update(i3, event): """React to a XRandR change.""" # Grab current setup. Synchronous, but it's short enough d = display.Display() try: screen = d.screen() window = screen.root.create_window(0, 0, 1, 1, 1, screen.root_depth) screen_resources = randr.get_screen_resources_current(window) current_setup = set() for output in screen_resources.outputs: output_info = randr.get_output_info( window, output, screen_resources.timestamp ) if output_info.crtc == 0: continue crtc_info = randr.get_crtc_info( window, output_info.crtc, output_info.timestamp ) current_setup.add( ( output_info.name, crtc_info.width, crtc_info.height, crtc_info.x, crtc_info.y, ) ) finally: d.close() # Compare to current setup if current_setup == output_update.last_setup: logger.debug("current xrandr setup unchanged") return output_update.last_setup = current_setup logger.info("xrandr setup: %s", current_setup) if event is StartEvent: return # Trigger changes logger.info("xrandr change detected") cmds = ( "systemctl --user reload --no-block xsettingsd.service", "systemctl --user start --no-block wallpaper.service", ) for cmd in cmds: proc = subprocess.run(shlex.split(cmd)) if proc.returncode != 0: logger.warning(f"{cmd} exited with {proc.returncode}") @on( DBusSignal( path="/org/bluez", interface="org.freedesktop.DBus.Properties", member="PropertiesChanged", signature="sa{sv}as", onlyif=lambda args: ( args[0] == "org.bluez.Device1" and "Connected" in args[1] ), ) ) async def bluetooth_notifications( i3, event, path, interface, changed, invalid ): """Display notifications related to Bluetooth state.""" obj = i3.system_bus["org.bluez"][path] obd = await obj.get_async_interface(interface) name = await obd.Name icon = await obd.Icon state = await obd.Connected state = "connected" if state else "disconnected" await notify( i3, app_icon=icon, summary=name, body=f"Bluetooth device {state}", ) @on( StartEvent, DBusSignal( path="/org/bluez", interface="org.freedesktop.DBus.Properties", member="PropertiesChanged", signature="sa{sv}as", onlyif=lambda args: ( args[0] == "org.bluez.Device1" and "Connected" in args[1] or args[0] == "org.bluez.Adapter1" and "Powered" in args[1] ), ), ) @static(last=None) @retry(2) @debounce(0.2) async def bluetooth_status(i3, event, *args): """Update bluetooth status for polybar.""" if event is StartEvent: # Do we have a bluetooth device? if not os.path.exists("/sys/class/bluetooth"): logger.info("no bluetooth detected") polybar("bluetooth", "") return # OK, get the info conn = i3.system_bus["org.bluez"] om = await conn["/"].get_async_interface( "org.freedesktop.DBus.ObjectManager" ) objects = await om.GetManagedObjects() objects = objects[0] powered = False devices = [] for path, interfaces in objects.items(): if "org.bluez.Adapter1" in interfaces: # We get an adapter! adapter = interfaces["org.bluez.Adapter1"] if adapter["Powered"][1]: powered = True elif "org.bluez.Device1" in interfaces: # We have a device! device = interfaces["org.bluez.Device1"] if not device["Connected"][1]: continue device_class = device["Class"][1] major = (device_class & 0x1F00) >> 8 minor = (device_class & 0xFC) >> 2 devices.append((major, minor)) # Choose appropriate icons for output # See: https://btprodspecificationrefs.blob.core.windows.net/assigned-numbers/Assigned%20Number%20Types/Baseband.pdf if not powered: output = "" else: output = ["bluetooth"] for major, minor in devices: classes = { 1: "laptop", 2: "phone", 3: "access-point", (4, 1): "headset", (4, 2): "headset", (4, 4): "microphone", (4, 5): "loudspeaker", (4, 7): "loudspeaker", (4, 10): "loudspeaker", (4, 6): "headphones", (4, 8): "car", (4, 12): "webcam", (5, 1): "gamepad", (5, 2): "gamepad", 5: [ (lambda x: x & 0x10, "keyboard"), (lambda x: x & 0x20, "mouse"), ], 6: [ (lambda x: x & 0x8, "camera"), (lambda x: x & 0x10, "scanner"), (lambda x: x & 0x20, "printer"), ], } icon = classes.get((major, minor)) or classes.get(major, "unknown") if type(icon) is list: for matcher, name in icon: if matcher(minor): icon = name break else: icon = "unknown" output.append(icon) output = "|".join(icons[o] for o in output) # Update polybar if bluetooth_status.last != output: logger.info("updated bluetooth status") polybar("bluetooth", output) bluetooth_status.last = output @on( DBusSignal( system=False, path="/org/freedesktop/Notifications", interface="org.freedesktop.DBus.Properties", member="PropertiesChanged", signature="sa{sv}as", onlyif=lambda args: ( args[0] == "org.dunstproject.cmd0" and "paused" in args[1] ), ) ) async def dunst_status_update(i3, event, path, interface, changed, invalid): """Update notification status in polybar.""" if interface != "org.dunstproject.cmd0": return if "paused" not in changed: return polybar( "dunst", icons[ changed["paused"][1] and "notifications-disabled" or "notifications-enabled" ], ) @on(StartEvent) async def dunst_status_check(i3, event): """Display notification status for polybar.""" conn = i3.session_bus["org.freedesktop.Notifications"] obj = conn["/org/freedesktop/Notifications"] dunst = await obj.get_async_interface("org.dunstproject.cmd0") paused = await dunst.paused polybar( "dunst", icons[paused and "notifications-disabled" or "notifications-enabled"], ) @on( DBusSignal( interface="org.freedesktop.NetworkManager.Connection.Active", member="StateChanged", signature="uu", ) ) async def network_manager_notifications(i3, event, path, state, reason): """Display notifications related to Network Manager state.""" ofnm = "org.freedesktop.NetworkManager" logger.debug("from %s state: %s, reason: %s", path, state, reason) if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}: # Deactivated state does not contain enough information, # unless we maintain state. return obj = i3.system_bus[ofnm][path] try: nmca = await obj.get_async_interface(f"{ofnm}.Connection.Active") except dbussy.DBusError: logger.info(f"interface {path} has vanished") return kind = await nmca.Type id = await nmca.Id if kind == "vpn": await notify( i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!" ) elif kind == "802-3-ethernet": await notify( i3, app_icon="network-wired", summary=f"{id}", body="Ethernet connection established!", ) elif kind == "802-11-wireless": await notify( i3, app_icon="network-wireless", summary=f"{id}", body="Wireless connection established!", ) @on( StartEvent, DBusSignal( interface="org.freedesktop.NetworkManager.Connection.Active", member="StateChanged", signature="uu", ), DBusSignal( interface="org.freedesktop.NetworkManager.AccessPoint", member="PropertiesChanged", signature="a{sv}", ), ) @static(last=None) @retry(2) @debounce( 1, unless=lambda i3, event, *args: ( isinstance(event, DBusSignal) and event.interface.endswith(".Active") ), ) async def network_manager_status(i3, event, *args): """Compute network manager status.""" ofnm = "org.freedesktop.NetworkManager" status = [] # Build status from devices conn = i3.system_bus[ofnm] nm = await conn["/org/freedesktop/NetworkManager"].get_async_interface( ofnm ) devices = await nm.AllDevices for device in devices: nmd = await conn[device].get_async_interface(f"{ofnm}.Device") kind = await nmd.DeviceType state = await nmd.State if state == NM_DEVICE_STATE_UNMANAGED: continue if kind == NM_DEVICE_TYPE_WIFI: if state != NM_DEVICE_STATE_ACTIVATED: status.append(icons["nowifi"]) continue nmw = await conn[device].get_async_interface( f"{ofnm}.Device.Wireless" ) ap = await nmw.ActiveAccessPoint if not ap: status.append(icons["nowifi"]) continue network_manager_status.active_ap = ap nmap = await conn[ap].get_async_interface(f"{ofnm}.AccessPoint") name = await nmap.Ssid strength = int(await nmap.Strength) status.append( [ icons["wifi-low"], icons["wifi-medium"], icons["wifi-high"], ][strength // 34] ) status.append( bytes(name) .decode("utf-8", errors="replace") .replace("%", "%%") ) elif ( kind == NM_DEVICE_TYPE_ETHERNET and state == NM_DEVICE_STATE_ACTIVATED ): status.append(icons["wired"]) # Build status for VPN connection connections = await nm.ActiveConnections for connection in connections: nma = await conn[connection].get_async_interface( f"{ofnm}.Connection.Active" ) vpn = await nma.Vpn if vpn: state = await nma.State if state == NM_ACTIVE_CONNECTION_STATE_ACTIVATED: status.append(icons["vpn"]) status.append(await nma.Id) # Final status line status = " ".join(status) if status != network_manager_status.last: logger.info("updated network status") polybar("network", status) network_manager_status.last = status # Main function async def main(options): i3 = await Connection(auto_reconnect=True).connect() i3.session_bus = await ravel.session_bus_async() i3.system_bus = await ravel.system_bus_async() # Regular events for fn, events in on.functions.items(): for event in events: if isinstance(event, I3Event): def wrapping(fn, event): async def wrapped(i3, event): logger.debug("received i3 event %s for %s", event, fn) return await fn(i3, event) return wrapped i3.on(event, wrapping(fn, event)) # React to some bindings async def binding_event(i3, event): """Process a binding event.""" # We only processes it when it is a nop command and we use # this mechanism as an IPC mechanism. The alternative would be # to use ticks but we would need to spawn an i3-msg process # for that. cmd = event.binding.command if not cmd.startswith("nop "): return cmd = cmd[4:].strip("\"'") if not cmd: return kind = cmd.split(":")[0] for fn, events in on.functions.items(): for e in events: if isinstance(e, CommandEvent) and e.name == kind: logger.debug("received command event %s for %s", event, fn) await fn(i3, cmd) i3.on(I3Event.BINDING, binding_event) # Listen to DBus events for fn, events in on.functions.items(): for event in events: if isinstance(event, DBusSignal): bus = i3.system_bus if event.system else i3.session_bus def wrapping(fn, event): @ravel.signal( name=event.member, in_signature=event.signature, path_keyword="path", args_keyword="args", ) async def wrapped(path, args): if event.onlyif is not None and not event.onlyif(args): logger.debug( "received DBus event for %s, not interested", fn, ) return logger.debug( "received DBus event %s for %s", event, fn ) return await fn(i3, event, path, *args) return wrapped bus.listen_signal( path=event.path, fallback=True, interface=event.interface, name=event.member, func=wrapping(fn, event), ) # Run events that should run on start start_tasks = [] for fn, events in on.functions.items(): for event in events: if event is StartEvent: start_tasks.append(asyncio.create_task(fn(i3, event))) await i3.main() if __name__ == "__main__": # Parse description = sys.modules[__name__].__doc__ for fn, events in on.functions.items(): description += f" {fn.__doc__}" parser = argparse.ArgumentParser(description=description) parser.add_argument( "--debug", "-d", action="store_true", default=False, help="enable debugging", ) options = parser.parse_args() # Logging root = logging.getLogger("") root.setLevel(logging.WARNING) logger.setLevel(options.debug and logging.DEBUG or logging.INFO) if sys.stderr.isatty(): ch = logging.StreamHandler() ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s")) root.addHandler(ch) else: root.addHandler(journal.JournalHandler(SYSLOG_IDENTIFIER=logger.name)) try: asyncio.run(main(options)) except Exception as e: logger.exception("%s", e) sys.exit(1) sys.exit(0)