vincentbernat.i3wm-configur.../bin/i3-companion

512 lines
15 KiB
Text
Raw Normal View History

#!/usr/bin/python3
"""Personal i3 companion."""
import argparse
import logging
import logging.handlers
import sys
import re
2021-07-07 00:24:23 +02:00
import asyncio
import shlex
import subprocess
import html
import functools
import collections
2021-07-07 00:24:23 +02:00
from i3ipc.aio import Connection
from i3ipc import Event
2021-07-10 13:17:07 +02:00
from systemd import journal
import ravel
import dbussy
2021-07-10 13:17:07 +02:00
logger = logging.getLogger("i3-companion")
DBusSignal = collections.namedtuple(
"DBusSignal", ["path", "interface", "member", "signature"]
)
def on(*events):
"""Tag events that should be provided to the function."""
2021-07-11 12:23:00 +02:00
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
return fn(*args, **kwargs)
2021-07-11 12:23:00 +02:00
on.functions = getattr(on, "functions", {})
on.functions[fn] = events
return wrapper
2021-07-11 12:23:00 +02:00
return decorator
async def notify(i3, **kwargs):
"""Send a notification with notify-send."""
peer = i3.session_bus["org.freedesktop.Notifications"]
peer = peer["/org/freedesktop/Notifications"]
interface = await peer.get_async_interface("org.freedesktop.Notifications")
parameters = dict(
app_name=logger.name,
replaces_id=0,
app_icon="dialog-information",
summary="",
actions=[],
hints={},
2021-07-11 12:23:00 +02:00
expire_timeout=5000,
)
parameters.update(kwargs)
return await interface.Notify(**parameters)
# See https://fontawesome.com/v5.15/icons
application_icons = {
"chromium": "",
"discord": "",
"emacs": "",
"firefox": "",
"gimp": "",
2021-07-10 13:07:14 +02:00
"gitg": "",
"google-chrome": "",
"inkscape": "",
2021-07-07 14:42:39 +02:00
"libreoffice": "",
"mpv": "",
2021-07-07 14:42:39 +02:00
"pavucontrol": "",
"signal": "",
"snes9x-gtk": "",
"spotify": "",
2021-07-08 12:06:42 +02:00
"steam": "",
"vbeterm": "",
"zathura": "",
2021-07-11 12:23:00 +02:00
"zoom": "",
}
application_icons_nomatch = ""
2021-07-11 12:23:00 +02:00
application_icons_alone = {application_icons[k] for k in {"vbeterm"}}
@on(Event.WINDOW_MOVE, Event.WINDOW_NEW, Event.WINDOW_CLOSE)
2021-07-07 00:24:23 +02:00
async def workspace_rename(i3, event):
"""Rename workspaces using icons to match what's inside it."""
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
workspaces = tree.workspaces()
commands = []
def application_icon(window):
"""Get application icon for a window."""
2021-07-11 12:23:00 +02:00
for attr in ("window_instance", "window_class"):
name = getattr(window, attr, None)
if name is None:
continue
for k, v in application_icons.items():
if re.match(rf"^{k}\b", name, re.IGNORECASE):
logger.debug(f"in {attr}, found '{name}', matching {k}")
return v
return application_icons_nomatch
for workspace in workspaces:
icons = set()
for window in workspace.leaves():
icon = application_icon(window)
if icon is not None:
icons.add(icon)
if any([i not in application_icons_alone for i in icons]):
icons -= application_icons_alone
new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":")
if workspace.name != new_name:
logger.debug(f"rename workspace {workspace.num}")
command = f'rename workspace "{workspace.name}" to "{new_name}"'
commands.append(command)
2021-07-11 12:23:00 +02:00
await i3.command(";".join(commands))
async def _new_workspace(i3):
workspaces = await i3.get_workspaces()
workspace_nums = {w.num for w in workspaces}
max_num = max(workspace_nums)
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
2021-07-11 12:23:00 +02:00
logger.info(f"create new workspace number {available}")
await i3.command(f'workspace number "{available}"')
return available
@on("new-workspace", "move-to-new-workspace")
2021-07-07 00:24:23 +02:00
async def new_workspace(i3, event):
"""Create a new workspace and optionally move a window to it."""
# Get the currently focused window
if event == "move-to-new-workspace":
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
current = tree.find_focused()
if not current:
return
num = await _new_workspace(i3)
# Move the window to this workspace
if event == "move-to-new-workspace":
await current.command(
f"move container to workspace " f'number "{num}"'
)
2021-07-11 12:23:00 +02:00
exclusive_apps = {"emacs", "firefox"}
intrusive_apps = {"vbeterm"}
@on(Event.WINDOW_NEW)
async def worksplace_exclusive(i3, event):
"""Move new windows on a new workspace instead of sharing a workspace
with an exclusive app."""
w = event.container
def can_intrude(w):
"""Can this new window intrude any workspace?"""
if w.floating in {"auto_on", "user_on"}:
return True
2021-07-11 12:23:00 +02:00
if w.ipc_data["window_type"] not in {"normal", "splash", "unknown"}:
return True
if w.sticky:
return True
2021-07-11 12:23:00 +02:00
ids = {
s is not None and s.lower() or None
for s in {w.name, w.window_class, w.window_instance}
}
if ids.intersection(intrusive_apps):
return True
# Can the new window just intrude?
if can_intrude(w):
logger.debug("window {w.name} can intrude")
return
# Does the current workspace contains an exclusive app?
tree = await i3.get_tree()
workspace = tree.find_focused().workspace()
if not workspace:
return
2021-07-11 12:23:00 +02:00
ids = {
s is not None and s.lower() or None
for ow in workspace.leaves()
for s in {ow.name, ow.window_class, ow.window_instance}
if w.id != ow.id
}
exclusives = ids.intersection(exclusive_apps)
if not exclusives:
logger.debug("no exclusive app, {w.name} can go there")
return
# Create a new workspace and move the window here
num = await _new_workspace(i3)
2021-07-11 12:23:00 +02:00
logger.info(f"move window {w.name} to workspace {num}")
await w.command(f'move container to workspace number "{num}"')
@on("quake-console")
2021-07-07 00:24:23 +02:00
async def quake_console(i3, event):
"""Spawn a quake console or toggle an existing one."""
try:
_, term_exec, term_name, height = event.split(":")
height = float(height)
except Exception as exc:
logger.warn(f"unable to parse payload {event}: {exc}")
return
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
term = tree.find_instanced(term_name)
if not term:
2021-07-11 12:23:00 +02:00
await i3.command(f"exec exec {term_exec} --name {term_name}")
tries = 5
while not term and tries:
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
term = tree.find_instanced(term_name)
await asyncio.sleep(0.2)
tries -= 1
if not term:
raise RuntimeError("unable to spawn terminal")
term = term[0]
2021-07-07 00:24:23 +02:00
workspaces = await i3.get_workspaces()
workspace = [ws for ws in workspaces if ws.focused][0]
ws_x = workspace.rect.x
ws_y = workspace.rect.y
ws_width = workspace.rect.width
ws_height = workspace.rect.height
width = ws_width
height = int(ws_height * height)
posx = ws_x
posy = ws_y
2021-07-11 12:23:00 +02:00
command = (
f"[instance={term_name}] "
"border none,"
f"resize set {width} px {height} px,"
"scratchpad show,"
f"move absolute position {posx}px {posy}px"
)
logger.debug(f"QuakeConsole: {command}")
2021-07-07 00:24:23 +02:00
await i3.command(command)
@on("container-info")
async def container_info(i3, event):
"""Show information about the focused container."""
tree = await i3.get_tree()
window = tree.find_focused()
if not window:
return
logger.info(f"window raw information: {window.ipc_data}")
summary = "About focused container"
r = window.rect
w = window
info = {
"name": w.name,
"title": w.window_title,
"class": w.window_class,
"instance": w.window_instance,
"role": w.window_role,
2021-07-11 12:23:00 +02:00
"type": w.ipc_data["window_type"],
"sticky": w.sticky,
"floating": w.floating,
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
"layout": w.layout,
"parcent": w.percent,
2021-07-11 12:23:00 +02:00
"marks": ", ".join(w.marks) or "(none)",
}
2021-07-11 12:23:00 +02:00
body = "\n".join(
(
f"<tt>{k:10}</tt> {html.escape(str(v))}"
for k, v in info.items()
if v is not None
)
)
result = await notify(
i3,
app_icon="system-search",
expire_timeout=10000,
summary=summary,
body=body,
replaces_id=getattr(container_info, "last_id", 0),
)
container_info.last_id = result[0]
@on("workspace-info")
async def workspace_info(i3, event):
"""Show information about the focused workspace."""
workspaces = await i3.get_workspaces()
focused = [w for w in workspaces if w.focused]
if not focused:
return
workspace = focused[0]
summary = f"Workspace {workspace.num} on {workspace.output}"
tree = await i3.get_tree()
2021-07-11 12:23:00 +02:00
workspace = [w for w in tree.workspaces() if w.num == workspace.num]
def format(container):
if container.focused:
style = 'foreground="#ffaf00"'
elif not container.window:
style = 'foreground="#6c98ee"'
else:
2021-07-11 12:23:00 +02:00
style = ""
if container.window:
2021-07-11 12:23:00 +02:00
content = (
f"{(container.window_class or '???').lower()}: "
f"{(container.window_title or '???')}"
)
elif container.type == "workspace" and not container.nodes:
# Empty workspaces use workspace_layout, but when default,
# this is layout...
2021-07-11 12:23:00 +02:00
layout = container.ipc_data["workspace_layout"]
if layout == "default":
layout = container.layout
content = f"({layout})"
else:
content = f"({container.layout})"
2021-07-11 12:23:00 +02:00
root = f"<span {style}>{content.lower()}</span>"
children = []
for child in container.nodes:
if child == container.nodes[-1]:
first = "└─"
others = " "
else:
first = "├─"
others = "│ "
content = format(child).replace("\n", f"\n{others}")
children.append(f"<tt>{first}</tt>{content}")
children.insert(0, root)
return "\n".join(children)
body = format(workspace[0]).lstrip("\n")
2021-07-11 12:23:00 +02:00
result = await notify(
i3,
app_icon="system-search",
expire_timeout=20000,
summary=summary,
body=body,
replaces_id=getattr(workspace_info, "last_id", 0),
)
workspace_info.last_id = result[0]
@on(Event.OUTPUT)
async def output_update(i3, event):
"""React to a XRandR change."""
running = getattr(output_update, "running", None)
if running is not None:
running.cancel()
output_update.running = None
def output_update_now():
"""Execute actions to react to XRandR change."""
output_update.running = None
logger.info("XRandR change detected")
cmds = (
"systemctl --user reload --no-block xsettingsd.service",
"systemctl --user start --no-block wallpaper.service",
)
for cmd in cmds:
proc = subprocess.run(shlex.split(cmd))
if proc.returncode != 0:
logger.warning(f"{cmd} exited with {proc.returncode}")
logger.debug("schedule XRandR change")
output_update.running = asyncio.get_event_loop().call_later(
2, output_update_now
)
@on(
DBusSignal(
path="/",
interface="org.freedesktop.NetworkManager.Connection.Active",
member="StateChanged",
signature="uu",
)
)
async def network_manager_update(i3, event, path, state, reason):
logger.debug("from %s state: %d, reason: %d", path, state, reason)
NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2
if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}:
return
peer = i3.system_bus["org.freedesktop.NetworkManager"][path]
try:
interface = await peer.get_async_interface(
"org.freedesktop.NetworkManager.Connection.Active"
)
except dbussy.DBusError:
logger.info("interface %s has vanished", path)
return
kind = await interface.Type
id = await interface.Id
if kind == "vpn":
await notify(
i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!"
)
elif kind == "802-3-ethernet":
await notify(
i3,
app_icon="network-wired",
summary=f"{id}",
body="Ethernet connection established!",
)
elif kind == "802-11-wireless":
await notify(
i3,
app_icon="network-wireless",
summary=f"{id}",
body="Wireless connection established!",
)
async def main(options):
i3 = await Connection().connect()
i3.session_bus = await ravel.session_bus_async()
i3.system_bus = await ravel.system_bus_async()
# Regular events
for fn, events in on.functions.items():
for event in events:
if isinstance(event, Event):
i3.on(event, fn)
# React to some bindings
async def binding_event(i3, event):
"""Process a binding event."""
# We only processes it when it is a nop command and we use
# this mechanism as an IPC mechanism. The alternative would be
# to use ticks but we would need to spawn an i3-msg process
# for that.
cmd = event.binding.command
if not cmd.startswith("nop "):
return
2021-07-11 12:23:00 +02:00
cmd = cmd[4:].strip("\"'")
if not cmd:
return
kind = cmd.split(":")[0]
for fn, events in on.functions.items():
for e in events:
if e == kind:
await fn(i3, cmd)
2021-07-11 12:23:00 +02:00
i3.on(Event.BINDING, binding_event)
# Listen to DBus events
for fn, events in on.functions.items():
for event in events:
if isinstance(event, DBusSignal):
for bus in {i3.session_bus, i3.system_bus}:
def wrapping(fn, event):
@ravel.signal(
name=event.member,
in_signature=event.signature,
path_keyword="path",
args_keyword="args",
)
async def wrapped(path, args):
return await fn(i3, event, path, *args)
return wrapped
bus.listen_signal(
path=event.path,
fallback=True,
interface=event.interface,
name=event.member,
func=wrapping(fn, event),
)
2021-07-07 00:24:23 +02:00
await i3.main()
2021-07-07 00:24:23 +02:00
if __name__ == "__main__":
# Parse
description = sys.modules[__name__].__doc__
for fn, events in on.functions.items():
description += f" {fn.__doc__}"
parser = argparse.ArgumentParser(description=description)
2021-07-11 12:23:00 +02:00
parser.add_argument(
"--debug",
"-d",
action="store_true",
default=False,
help="enable debugging",
2021-07-11 12:23:00 +02:00
)
options = parser.parse_args()
# Logging
root = logging.getLogger("")
root.setLevel(logging.WARNING)
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
2021-07-10 13:17:07 +02:00
if sys.stdin.isatty():
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
else:
root.addHandler(journal.JournalHandler(SYSLOG_IDENTIFIER=logger.name))
2021-07-07 00:24:23 +02:00
try:
asyncio.get_event_loop().run_until_complete(main(options))
except Exception as e:
logger.exception("%s", e)
sys.exit(1)
sys.exit(0)