vincentbernat.i3wm-configur.../bin/i3-companion

789 lines
24 KiB
Text
Raw Normal View History

#!/usr/bin/python3
"""Personal i3 companion."""
import argparse
import asyncio
import collections
import errno
import functools
import glob
import html
import logging
import logging.handlers
import os
import re
import shlex
import subprocess
import sys
import types
import i3ipc
2021-07-07 00:24:23 +02:00
from i3ipc.aio import Connection
2021-07-10 13:17:07 +02:00
from systemd import journal
import ravel
import dbussy
def icon(font_number, char):
"""Turn an icon into a string for Polybar."""
# Font number is from Polybar configuration.
# 2: https://fontawesome.com/v6.0/icons?s=solid
# 3: https://fontawesome.com/v6.0/icons?s=brands
return "%%{T%d}%s%%{T-}" % (font_number, char)
# Configuration
application_icons = {
"chromium": icon(3, ""),
"discord": icon(3, ""),
"emacs": icon(2, ""),
"firefox": icon(3, ""),
"gimp": icon(2, ""),
"gitg": icon(2, ""),
"google-chrome": icon(3, ""),
"inkscape": icon(2, ""),
"libreoffice": icon(2, ""),
"mpv": icon(2, ""),
"pavucontrol": icon(2, ""),
"signal": icon(2, ""),
"snes9x-gtk": icon(2, ""),
"spotify": icon(3, ""),
"steam": icon(3, ""),
"vbeterm": icon(2, ""),
"zathura": icon(2, ""),
"zoom": icon(2, ""),
}
icons = {
"nowifi": icon(2, ""),
"vpn": icon(2, ""),
"wifi-low": icon(2, ""),
"wifi-medium": icon(2, ""),
"wifi-high": icon(2, ""),
"wired": icon(2, ""),
}
application_icons_nomatch = icon(2, "")
application_icons_alone = {application_icons[k] for k in {"vbeterm"}}
exclusive_apps = {"emacs", "firefox"}
intrusive_apps = {"vbeterm"}
2021-07-10 13:17:07 +02:00
logger = logging.getLogger("i3-companion")
# Events for @on decorator
DBusSignal = collections.namedtuple(
"DBusSignal", ["path", "interface", "member", "signature"]
)
StartEvent = object()
I3Event = i3ipc.Event
CommandEvent = collections.namedtuple("CommandEvent", ["name"])
NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2
NM_DEVICE_TYPE_ETHERNET = 1
NM_DEVICE_TYPE_WIFI = 2
NM_DEVICE_TYPE_MODEM = 8
NM_DEVICE_STATE_UNMANAGED = 10
NM_DEVICE_STATE_ACTIVATED = 100
# Event helpers
def static(**kwargs):
"""Define static variables for the event handler."""
def decorator(fn):
for k, v in kwargs.items():
setattr(fn, k, v)
return fn
return decorator
@static(functions={})
def on(*events):
"""Tag events that should be provided to the function."""
2021-07-11 12:23:00 +02:00
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
return fn(*args, **kwargs)
2021-07-11 12:23:00 +02:00
on.functions[fn] = events
return wrapper
2021-07-11 12:23:00 +02:00
return decorator
def debounce(sleep, *, unless=None, retry=0):
"""Debounce a function call (batch successive calls into only one).
Optional immediate execution. Optional retry on failure. Ensure
only one instance is executed. It is assumed the arguments
provided to the debounced function have no effect on its
execution.
"""
def decorator(fn):
async def worker():
while True:
try:
# Wait for an urgent work or until sleep is elapsed
await asyncio.wait_for(
workers[fn].urgent.wait(), timeout=sleep
)
logger.debug(f"urgent work received for {fn}")
except asyncio.TimeoutError:
pass
retry, args, kwargs = workers[fn].queue
workers[fn].queue = None
workers[fn].urgent.clear()
# Execute the work
logger.debug(f"execute work for {fn}")
try:
await fn(*args, **kwargs)
except Exception as e:
if not retry:
logger.exception(f"while executing {fn}: %s", e)
return
retry -= 1
logger.warning(
f"while executing {fn} (remaining tries: %d): %s",
retry,
str(e),
)
# Retry, unless we have something already scheduled
if workers[fn].queue is not None:
logger.debug(f"retry now with queued event for {fn}")
workers[fn].urgent.set()
continue
logger.debug(f"reschedule retry for {fn}")
workers[fn].queue = (retry, args, kwargs)
if unless is not None and unless(*args, **kwargs):
logger.debug(f"wake up now for retry of {fn}")
workers[fn].urgent.set()
# Do we still have something to do?
if workers[fn].queue is None:
break
# No more work
logger.debug(f"no more work for {fn}")
workers[fn] = None
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
if workers[fn] is None:
logger.debug(f"create new worker for {fn}")
workers[fn] = types.SimpleNamespace()
workers[fn].task = asyncio.create_task(worker())
workers[fn].urgent = asyncio.Event()
workers[fn].queue = (retry, args, kwargs)
else:
logger.debug(f"enqueue new work for {fn}")
if unless is not None and unless(*args, **kwargs):
logger.debug(f"wake up now for {fn}")
workers[fn].urgent.set()
workers[fn] = None
return wrapper
workers = {}
return decorator
# Other helpers
async def notify(i3, **kwargs):
"""Send a notification with notify-send."""
peer = i3.session_bus["org.freedesktop.Notifications"]
peer = peer["/org/freedesktop/Notifications"]
notifications = await peer.get_async_interface(
"org.freedesktop.Notifications"
)
parameters = dict(
app_name=logger.name,
replaces_id=0,
app_icon="dialog-information",
summary="",
actions=[],
hints={},
2021-07-11 12:23:00 +02:00
expire_timeout=5000,
)
parameters.update(kwargs)
return await notifications.Notify(**parameters)
def polybar(module, content):
"""Update Polybar module with the provided content."""
# Update cache file (for when polybar restarts)
with open(f"{os.getenv('XDG_RUNTIME_DIR')}/i3/{module}.txt", "w") as out:
out.write(content)
# Send it to polybar
for name in glob.glob("/tmp/polybar_mqueue.*"):
try:
with open(os.open(name, os.O_WRONLY | os.O_NONBLOCK), "w") as out:
cmd = f"action:#{module}.send.{content}"
out.write(cmd)
except OSError as e:
if e.errno != errno.ENXIO:
raise
# Event handlers
@on(StartEvent, I3Event.WINDOW_MOVE, I3Event.WINDOW_NEW, I3Event.WINDOW_CLOSE)
@debounce(0.2)
2021-07-07 00:24:23 +02:00
async def workspace_rename(i3, event):
"""Rename workspaces using icons to match what's inside it."""
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
workspaces = tree.workspaces()
commands = []
def application_icon(window):
"""Get application icon for a window."""
2021-07-11 12:23:00 +02:00
for attr in ("window_instance", "window_class"):
name = getattr(window, attr, None)
if name is None:
continue
for k, v in application_icons.items():
if re.match(rf"^{k}\b", name, re.IGNORECASE):
logger.debug(f"in {attr}, found '{name}', matching {k}")
return v
return application_icons_nomatch
for workspace in workspaces:
icons = set()
for window in workspace.leaves():
if window.sticky:
continue
icon = application_icon(window)
if icon is not None:
icons.add(icon)
if any([i not in application_icons_alone for i in icons]):
icons -= application_icons_alone
new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":")
if workspace.name != new_name:
logger.debug(f"rename workspace {workspace.num}")
command = f'rename workspace "{workspace.name}" to "{new_name}"'
commands.append(command)
2021-07-11 12:23:00 +02:00
await i3.command(";".join(commands))
async def _new_workspace(i3):
workspaces = await i3.get_workspaces()
workspace_nums = {w.num for w in workspaces}
max_num = max(workspace_nums)
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
2021-07-11 12:23:00 +02:00
logger.info(f"create new workspace number {available}")
await i3.command(f'workspace number "{available}"')
return available
@on(CommandEvent("new-workspace"), CommandEvent("move-to-new-workspace"))
2021-07-07 00:24:23 +02:00
async def new_workspace(i3, event):
"""Create a new workspace and optionally move a window to it."""
# Get the currently focused window
if event == "move-to-new-workspace":
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
current = tree.find_focused()
if not current:
return
num = await _new_workspace(i3)
# Move the window to this workspace
if event == "move-to-new-workspace":
await current.command(
f"move container to workspace " f'number "{num}"'
)
@on(I3Event.WINDOW_NEW)
async def worksplace_exclusive(i3, event):
"""Move new windows on a new workspace instead of sharing a workspace
with an exclusive app."""
w = event.container
def can_intrude(w):
"""Can this new window intrude any workspace?"""
if w.floating in {"auto_on", "user_on"}:
return True
2021-07-11 12:23:00 +02:00
if w.ipc_data["window_type"] not in {"normal", "splash", "unknown"}:
return True
if w.sticky:
return True
2021-07-11 12:23:00 +02:00
ids = {
s is not None and s.lower() or None
for s in {w.name, w.window_class, w.window_instance}
}
if ids.intersection(intrusive_apps):
return True
# Can the new window just intrude?
if can_intrude(w):
logger.debug(f"window {w.name} can intrude")
return
# Get the workspace. From an event, w.workspace() is None, so
# search it in the tree.
tree = await i3.get_tree()
workspace = next(
(ow.workspace() for ow in tree.leaves() if w.id == ow.id), None
)
if not workspace:
return
# Does the target workspace contains an exclusive app?
2021-07-11 12:23:00 +02:00
ids = {
s is not None and s.lower() or None
for ow in workspace.leaves()
for s in {ow.name, ow.window_class, ow.window_instance}
if w.id != ow.id
}
exclusives = ids.intersection(exclusive_apps)
if not exclusives:
logger.debug("no exclusive app, {w.name} can go there")
return
# Create a new workspace and move the window here
num = await _new_workspace(i3)
2021-07-11 12:23:00 +02:00
logger.info(f"move window {w.name} to workspace {num}")
await w.command(f'move container to workspace number "{num}"')
@on(CommandEvent("quake-console"))
2021-07-07 00:24:23 +02:00
async def quake_console(i3, event):
"""Spawn a quake console or toggle an existing one."""
try:
_, term_exec, term_name, height = event.split(":")
height = float(height)
except Exception as exc:
logger.warn(f"unable to parse payload {event}: {exc}")
return
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
term = tree.find_instanced(term_name)
if not term:
2021-07-11 12:23:00 +02:00
await i3.command(f"exec exec {term_exec} --name {term_name}")
tries = 5
while not term and tries:
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
term = tree.find_instanced(term_name)
await asyncio.sleep(0.2)
tries -= 1
if not term:
raise RuntimeError("unable to spawn terminal")
term = term[0]
2021-07-07 00:24:23 +02:00
workspaces = await i3.get_workspaces()
workspace = [ws for ws in workspaces if ws.focused][0]
ws_x = workspace.rect.x
ws_y = workspace.rect.y
ws_width = workspace.rect.width
ws_height = workspace.rect.height
width = ws_width
height = int(ws_height * height)
posx = ws_x
posy = ws_y
2021-07-11 12:23:00 +02:00
command = (
f"[instance={term_name}] "
"border none,"
f"resize set {width} px {height} px,"
"scratchpad show,"
f"move absolute position {posx}px {posy}px"
)
logger.debug(f"QuakeConsole: {command}")
2021-07-07 00:24:23 +02:00
await i3.command(command)
@on(CommandEvent("container-info"))
@static(last_id=0)
async def container_info(i3, event):
"""Show information about the focused container."""
tree = await i3.get_tree()
window = tree.find_focused()
if not window:
return
logger.info(f"window raw information: {window.ipc_data}")
summary = "About focused container"
r = window.rect
w = window
info = {
"name": w.name,
"title": w.window_title,
"class": w.window_class,
"instance": w.window_instance,
"role": w.window_role,
2021-07-11 12:23:00 +02:00
"type": w.ipc_data["window_type"],
"sticky": w.sticky,
"floating": w.floating,
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
"layout": w.layout,
"parcent": w.percent,
2021-07-11 12:23:00 +02:00
"marks": ", ".join(w.marks) or "(none)",
}
2021-07-11 12:23:00 +02:00
body = "\n".join(
(
f"<tt>{k:10}</tt> {html.escape(str(v))}"
for k, v in info.items()
if v is not None
)
)
result = await notify(
i3,
app_icon="system-search",
expire_timeout=10000,
summary=summary,
body=body,
replaces_id=container_info.last_id,
2021-07-11 12:23:00 +02:00
)
container_info.last_id = result[0]
@on(CommandEvent("workspace-info"))
@static(last_id=0)
async def workspace_info(i3, event):
"""Show information about the focused workspace."""
workspaces = await i3.get_workspaces()
focused = [w for w in workspaces if w.focused]
if not focused:
return
workspace = focused[0]
summary = f"Workspace {workspace.num} on {workspace.output}"
tree = await i3.get_tree()
2021-07-11 12:23:00 +02:00
workspace = [w for w in tree.workspaces() if w.num == workspace.num]
def format(container):
if container.focused:
style = 'foreground="#ffaf00"'
elif not container.window:
style = 'foreground="#6c98ee"'
else:
2021-07-11 12:23:00 +02:00
style = ""
if container.window:
2021-07-11 12:23:00 +02:00
content = (
f"{(container.window_class or '???').lower()}: "
f"{(container.window_title or '???')}"
)
elif container.type == "workspace" and not container.nodes:
# Empty workspaces use workspace_layout, but when default,
# this is layout...
2021-07-11 12:23:00 +02:00
layout = container.ipc_data["workspace_layout"]
if layout == "default":
layout = container.layout
content = f"({layout})"
else:
content = f"({container.layout})"
2021-07-11 12:23:00 +02:00
root = f"<span {style}>{content.lower()}</span>"
children = []
for child in container.nodes:
if child == container.nodes[-1]:
first = "└─"
others = " "
else:
first = "├─"
others = "│ "
content = format(child).replace("\n", f"\n{others}")
children.append(f"<tt>{first}</tt>{content}")
children.insert(0, root)
return "\n".join(children)
body = format(workspace[0]).lstrip("\n")
2021-07-11 12:23:00 +02:00
result = await notify(
i3,
app_icon="system-search",
expire_timeout=20000,
summary=summary,
body=body,
replaces_id=workspace_info.last_id,
2021-07-11 12:23:00 +02:00
)
workspace_info.last_id = result[0]
@on(I3Event.OUTPUT)
@debounce(2)
async def output_update(i3, event):
"""React to a XRandR change."""
logger.info("XRandR change detected")
cmds = (
"systemctl --user reload --no-block xsettingsd.service",
"systemctl --user start --no-block wallpaper.service",
)
for cmd in cmds:
proc = subprocess.run(shlex.split(cmd))
if proc.returncode != 0:
logger.warning(f"{cmd} exited with {proc.returncode}")
@on(
DBusSignal(
path="/org/bluez",
interface="org.freedesktop.DBus.Properties",
member="PropertiesChanged",
signature="sa{sv}as",
)
)
async def bluetooth_notifications(
i3, event, path, interface, changed, invalid
):
"""Display notifications related to Bluetooth state."""
if interface != "org.bluez.Device1":
return
if "Connected" not in changed:
return
peer = i3.system_bus["org.bluez"][path]
obd = await peer.get_async_interface(interface)
name = await obd.Name
icon = await obd.Icon
state = await obd.Connected
state = "connected" if state else "disconnected"
await notify(
i3,
app_icon=icon,
summary=name,
body=f"Bluetooth device {state}",
)
@on(
DBusSignal(
path="/",
interface="org.freedesktop.NetworkManager.Connection.Active",
member="StateChanged",
signature="uu",
)
)
async def network_manager_notifications(i3, event, path, state, reason):
"""Display notifications related to Network Manager state."""
ofnm = "org.freedesktop.NetworkManager"
logger.debug(f"from {path} state: {state}, reason: {reason}")
if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}:
# Deactivated state does not contain enough information,
# unless we maintain state.
return
peer = i3.system_bus[ofnm][path]
try:
nmca = await peer.get_async_interface(f"{ofnm}.Connection.Active")
except dbussy.DBusError:
logger.info(f"interface {path} has vanished")
return
kind = await nmca.Type
id = await nmca.Id
if kind == "vpn":
await notify(
i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!"
)
elif kind == "802-3-ethernet":
await notify(
i3,
app_icon="network-wired",
summary=f"{id}",
body="Ethernet connection established!",
)
elif kind == "802-11-wireless":
await notify(
i3,
app_icon="network-wireless",
summary=f"{id}",
body="Wireless connection established!",
)
@on(
StartEvent,
DBusSignal(
path="/",
interface="org.freedesktop.NetworkManager.Connection.Active",
member="StateChanged",
signature="uu",
),
DBusSignal(
path="/",
interface="org.freedesktop.NetworkManager.AccessPoint",
member="PropertiesChanged",
signature="a{sv}",
),
)
@static(last=None)
@debounce(
1,
unless=lambda i3, event, *args: (
isinstance(event, DBusSignal) and event.interface.endswith(".Active")
),
retry=2,
)
async def network_manager_status(i3, event, *args):
"""Compute network manager status."""
ofnm = "org.freedesktop.NetworkManager"
status = []
# Build status from devices
bus = i3.system_bus[ofnm]
nm = await bus["/org/freedesktop/NetworkManager"].get_async_interface(ofnm)
devices = await nm.AllDevices
for device in devices:
nmd = await bus[device].get_async_interface(f"{ofnm}.Device")
kind = await nmd.DeviceType
state = await nmd.State
if state == NM_DEVICE_STATE_UNMANAGED:
continue
if kind == NM_DEVICE_TYPE_WIFI:
if state != NM_DEVICE_STATE_ACTIVATED:
status.append(icons["nowifi"])
continue
nmw = await bus[device].get_async_interface(
f"{ofnm}.Device.Wireless"
)
ap = await nmw.ActiveAccessPoint
if not ap:
status.append(icons["nowifi"])
continue
network_manager_status.active_ap = ap
nmap = await bus[ap].get_async_interface(f"{ofnm}.AccessPoint")
name = await nmap.Ssid
strength = int(await nmap.Strength)
status.append(
[
icons["wifi-low"],
icons["wifi-medium"],
icons["wifi-high"],
][strength // 34]
)
status.append(
bytes(name)
.decode("utf-8", errors="replace")
.replace("%", "%%")
)
elif (
kind == NM_DEVICE_TYPE_ETHERNET
and state == NM_DEVICE_STATE_ACTIVATED
):
status.append(icons["wired"])
# Build status for VPN connection
connections = await nm.ActiveConnections
for conn in connections:
nma = await bus[conn].get_async_interface(f"{ofnm}.Connection.Active")
vpn = await nma.Vpn
if vpn:
state = await nma.State
if state == NM_ACTIVE_CONNECTION_STATE_ACTIVATED:
status.append(icons["vpn"])
status.append(await nma.Id)
# Final status line
status = " ".join(status)
if status != network_manager_status.last:
logger.info("updated network status")
polybar("network", status)
network_manager_status.last = status
# Main function
async def main(options):
i3 = await Connection().connect()
i3.session_bus = await ravel.session_bus_async()
i3.system_bus = await ravel.system_bus_async()
# Regular events
for fn, events in on.functions.items():
for event in events:
if isinstance(event, I3Event):
i3.on(event, fn)
# React to some bindings
async def binding_event(i3, event):
"""Process a binding event."""
# We only processes it when it is a nop command and we use
# this mechanism as an IPC mechanism. The alternative would be
# to use ticks but we would need to spawn an i3-msg process
# for that.
cmd = event.binding.command
if not cmd.startswith("nop "):
return
2021-07-11 12:23:00 +02:00
cmd = cmd[4:].strip("\"'")
if not cmd:
return
kind = cmd.split(":")[0]
for fn, events in on.functions.items():
for e in events:
if isinstance(e, CommandEvent) and e.name == kind:
await fn(i3, cmd)
2021-07-11 12:23:00 +02:00
i3.on(I3Event.BINDING, binding_event)
# Listen to DBus events
for fn, events in on.functions.items():
for event in events:
if isinstance(event, DBusSignal):
for bus in {i3.session_bus, i3.system_bus}:
def wrapping(fn, event):
@ravel.signal(
name=event.member,
in_signature=event.signature,
path_keyword="path",
args_keyword="args",
)
async def wrapped(path, args):
return await fn(i3, event, path, *args)
return wrapped
bus.listen_signal(
path=event.path,
fallback=True,
interface=event.interface,
name=event.member,
func=wrapping(fn, event),
)
# Run events that should run on start
for fn, events in on.functions.items():
for event in events:
if event is StartEvent:
asyncio.create_task(fn(i3, event))
2021-07-07 00:24:23 +02:00
await i3.main()
2021-07-07 00:24:23 +02:00
if __name__ == "__main__":
# Parse
description = sys.modules[__name__].__doc__
for fn, events in on.functions.items():
description += f" {fn.__doc__}"
parser = argparse.ArgumentParser(description=description)
2021-07-11 12:23:00 +02:00
parser.add_argument(
"--debug",
"-d",
action="store_true",
default=False,
help="enable debugging",
2021-07-11 12:23:00 +02:00
)
options = parser.parse_args()
# Logging
root = logging.getLogger("")
root.setLevel(logging.WARNING)
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
2021-07-12 21:17:25 +02:00
if sys.stderr.isatty():
2021-07-10 13:17:07 +02:00
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
2021-07-12 21:17:25 +02:00
root.addHandler(ch)
2021-07-10 13:17:07 +02:00
else:
root.addHandler(journal.JournalHandler(SYSLOG_IDENTIFIER=logger.name))
2021-07-07 00:24:23 +02:00
try:
asyncio.run(main(options))
except Exception as e:
logger.exception("%s", e)
sys.exit(1)
sys.exit(0)