#!/usr/bin/python3 """Personal i3 companion This script will listen to i3 events and react accordingly. """ import argparse import logging import logging.handlers import os import sys import re import asyncio import shlex import subprocess import html import functools from i3ipc.aio import Connection from i3ipc import Event logger = logging.getLogger(os.path.splitext(os.path.basename(sys.argv[0]))[0]) class CustomFormatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): pass def on(*events): """Tag events that should be provided to the function.""" def decorator(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): return fn(*args, **kwargs) on.functions = getattr(on, "functions", {}) on.functions[fn] = events return wrapper return decorator # See https://fontawesome.com/v5.15/icons application_icons = { "chromium": "", "discord": "", "emacs": "", "firefox": "", "gimp": "", "google-chrome": "", "inkscape": "", "signal": "", "snes9x-gtk": "", "spotify": "", "steam": "", "vbeterm": "", "zathura": "", "zoom": "", "NOMATCH": "" } application_icons_alone = { application_icons[k] for k in { "vbeterm" } } @on(Event.WINDOW_MOVE, Event.WINDOW_NEW, Event.WINDOW_CLOSE) async def workspace_rename(i3, event): """Rename workspaces using icons to match what's inside it.""" tree = await i3.get_tree() workspaces = tree.workspaces() commands = [] def application_icon(window): """Get application icon for a window.""" for attr in ('name', 'window_instance', 'window_class'): name = getattr(window, attr, None) if name is None: continue for k, v in application_icons.items(): if re.match(k, name, re.IGNORECASE): logger.debug(f"in {attr}, found '{name}', matching {k}") return v return application_icons["NOMATCH"] for workspace in workspaces: icons = set() for window in workspace.leaves(): icon = application_icon(window) if icon is not None: icons.add(icon) if any([i not in application_icons_alone for i in icons]): icons -= application_icons_alone new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":") if workspace.name != new_name: logger.debug(f"rename workspace {workspace.num}") command = f'rename workspace "{workspace.name}" to "{new_name}"' commands.append(command) await i3.command(';'.join(commands)) @on("new-workspace", "move-to-new-workspace") async def new_workspace(i3, event): """Create a new workspace and optionally move a window to it.""" # Get the currently focused window if event.payload == "move-to-new-workspace": tree = await i3.get_tree() current = tree.find_focused() if not current: return # Create a new workspace workspaces = await i3.get_workspaces() workspace_nums = {w.num for w in workspaces} max_num = max(workspace_nums) available = (set(range(1, max_num + 2)) - workspace_nums).pop() logger.info(f'create new workspace number {available}') await i3.command(f'workspace number "{available}"') # Move the window to this workspace if event.payload == "move-to-new-workspace": await current.command(f'move container to workspace ' f'number "{available}"') @on("quake-console") async def quake_console(i3, event): """Spawn a quake console or toggle an existing one.""" try: _, term_exec, term_name, height = event.payload.split(":") height = float(height) except Exception as exc: logger.warn(f"unable to parse payload {event.payload}: {exc}") return tree = await i3.get_tree() term = tree.find_instanced(term_name) if not term: await i3.command(f'exec {term_exec} --name {term_name}') tries = 5 while not term and tries: tree = await i3.get_tree() term = tree.find_instanced(term_name) await asyncio.sleep(0.2) tries -= 1 if not term: raise RuntimeError("unable to spawn terminal") term = term[0] workspaces = await i3.get_workspaces() workspace = [ws for ws in workspaces if ws.focused][0] ws_x = workspace.rect.x ws_y = workspace.rect.y ws_width = workspace.rect.width ws_height = workspace.rect.height width = ws_width height = int(ws_height * height) posx = ws_x posy = ws_y command = (f'[instance={term_name}] ' 'border none,' f'resize set {width} px {height} px,' 'scratchpad show,' f'move absolute position {posx}px {posy}px') logger.debug(f"QuakeConsole: {command}") await i3.command(command) @on("info") async def window_info(i3, event): """Show information about the focused window.""" tree = await i3.get_tree() window = tree.find_focused() if not window: return logger.info(f"window raw information: {window.ipc_data}") summary = f"Information about window {window.window}" r = window.rect w = window info = { "name": w.name, "title": w.window_title, "class": w.window_class, "instance": w.window_instance, "role": w.window_role, "sticky": w.sticky, "floating": w.floating, "geometry": f"{r.width}×{r.height}+{r.x}+{r.y}", "layout": w.layout, "marks": ", ".join(w.marks) or "(none)" } body = "\n".join((f"{k:10} {html.escape(str(v))}" for k, v in info.items() if v is not None)) proc = await asyncio.create_subprocess_exec( "notify-send", "-i", "system-search", summary, body) await proc.communicate() output_update_running = None async def output_update(i3, event): """React to a XRandR change.""" global output_update_running if output_update_running is not None: output_update_running.cancel() def output_update_now(): """Execute actions to react to XRandR change.""" global output_update_running output_update_running = None logger.info("XRandR change detected") cmds = ( "systemctl --user reload --no-block xsettingsd.service", "systemctl --user start --no-block wallpaper.service", ) for cmd in cmds: proc = subprocess.run(shlex.split(cmd)) if proc.returncode != 0: logger.warning(f"{cmd} exited with {proc.returncode}") logger.debug("schedule XRandR change") output_update_running = asyncio.get_event_loop().call_later( 1, output_update_now) async def main(options): i3 = await Connection().connect() # Regular events for fn, events in on.functions.items(): for event in events: if isinstance(event, Event): i3.on(event, fn) # Ticks async def tick_event(i3, event): """Process a TICK event.""" if type(event.payload) is not str: return kind = event.payload.split(":")[0] for fn, events in on.functions.items(): for e in events: if e == kind: await fn(i3, event) i3.on(Event.TICK, tick_event) await i3.main() if __name__ == "__main__": # Parse parser = argparse.ArgumentParser( description=sys.modules[__name__].__doc__) parser.add_argument("--debug", "-d", action="store_true", default=False, help="enable debugging") options = parser.parse_args() # Logging root = logging.getLogger("") root.setLevel(logging.WARNING) logger.setLevel(options.debug and logging.DEBUG or logging.INFO) ch = logging.StreamHandler() ch.setFormatter(logging.Formatter( "%(levelname)s[%(name)s] %(message)s")) root.addHandler(ch) try: asyncio.get_event_loop().run_until_complete(main(options)) except Exception as e: logger.exception("%s", e) sys.exit(1) sys.exit(0)