vincentbernat.i3wm-configur.../bin/i3-companion

406 lines
12 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
"""Personal i3 companion."""
import argparse
import logging
import logging.handlers
import os
import sys
import re
import asyncio
import shlex
import subprocess
import html
import functools
from i3ipc.aio import Connection
from i3ipc import Event
logger = logging.getLogger(os.path.splitext(os.path.basename(sys.argv[0]))[0])
def on(*events):
"""Tag events that should be provided to the function."""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
return fn(*args, **kwargs)
on.functions = getattr(on, "functions", {})
on.functions[fn] = events
return wrapper
return decorator
# See https://fontawesome.com/v5.15/icons
application_icons = {
"chromium": "",
"discord": "",
"emacs": "",
"firefox": "",
"gimp": "",
"google-chrome": "",
"inkscape": "",
"libreoffice": "",
"mpv": "",
"pavucontrol": "",
"signal": "",
"snes9x-gtk": "",
"spotify": "",
"steam": "",
"vbeterm": "",
"zathura": "",
"zoom": ""
}
application_icons_nomatch = ""
application_icons_alone = {
application_icons[k] for k in {
"vbeterm"
}
}
@on(Event.WINDOW_MOVE, Event.WINDOW_NEW, Event.WINDOW_CLOSE)
async def workspace_rename(i3, event):
"""Rename workspaces using icons to match what's inside it."""
tree = await i3.get_tree()
workspaces = tree.workspaces()
commands = []
def application_icon(window):
"""Get application icon for a window."""
for attr in ('window_instance',
'window_class'):
name = getattr(window, attr, None)
if name is None:
continue
for k, v in application_icons.items():
if re.match(rf"^{k}\b", name, re.IGNORECASE):
logger.debug(f"in {attr}, found '{name}', matching {k}")
return v
return application_icons_nomatch
for workspace in workspaces:
icons = set()
for window in workspace.leaves():
icon = application_icon(window)
if icon is not None:
icons.add(icon)
if any([i not in application_icons_alone for i in icons]):
icons -= application_icons_alone
new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":")
if workspace.name != new_name:
logger.debug(f"rename workspace {workspace.num}")
command = f'rename workspace "{workspace.name}" to "{new_name}"'
commands.append(command)
await i3.command(';'.join(commands))
async def _new_workspace(i3):
workspaces = await i3.get_workspaces()
workspace_nums = {w.num for w in workspaces}
max_num = max(workspace_nums)
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
logger.info(f'create new workspace number {available}')
await i3.command(f'workspace number "{available}"')
return available
@on("new-workspace", "move-to-new-workspace")
async def new_workspace(i3, event):
"""Create a new workspace and optionally move a window to it."""
# Get the currently focused window
if event == "move-to-new-workspace":
tree = await i3.get_tree()
current = tree.find_focused()
if not current:
return
num = await _new_workspace(i3)
# Move the window to this workspace
if event == "move-to-new-workspace":
await current.command(f'move container to workspace '
f'number "{num}"')
exclusive_apps = {
"emacs",
"firefox"
}
intrusive_apps = {
"vbeterm"
}
@on(Event.WINDOW_NEW)
async def worksplace_exclusive(i3, event):
"""Move new windows on a new workspace instead of sharing a workspace
with an exclusive app."""
w = event.container
def can_intrude(w):
"""Can this new window intrude any workspace?"""
if w.floating in {"auto_on", "user_on"}:
return True
if w.ipc_data['window_type'] not in {"normal", "splash", "unknown"}:
return True
if w.sticky:
return True
ids = {s is not None and s.lower() or None
for s in {w.name, w.window_class, w.window_instance}}
if ids.intersection(intrusive_apps):
return True
# Can the new window just intrude?
if can_intrude(w):
logger.debug("window {w.name} can intrude")
return
# Does the current workspace contains an exclusive app?
tree = await i3.get_tree()
workspace = tree.find_focused().workspace()
if not workspace:
return
ids = {s is not None and s.lower() or None
for ow in workspace.leaves()
for s in {ow.name, ow.window_class, ow.window_instance}
if w.id != ow.id}
exclusives = ids.intersection(exclusive_apps)
if not exclusives:
logger.debug("no exclusive app, {w.name} can go there")
return
# Create a new workspace and move the window here
num = await _new_workspace(i3)
logger.info(f'move window {w.name} to workspace {num}')
await w.command(f'move container to workspace number "{num}"')
@on("quake-console")
async def quake_console(i3, event):
"""Spawn a quake console or toggle an existing one."""
try:
_, term_exec, term_name, height = event.split(":")
height = float(height)
except Exception as exc:
logger.warn(f"unable to parse payload {event}: {exc}")
return
tree = await i3.get_tree()
term = tree.find_instanced(term_name)
if not term:
await i3.command(f'exec {term_exec} --name {term_name}')
tries = 5
while not term and tries:
tree = await i3.get_tree()
term = tree.find_instanced(term_name)
await asyncio.sleep(0.2)
tries -= 1
if not term:
raise RuntimeError("unable to spawn terminal")
term = term[0]
workspaces = await i3.get_workspaces()
workspace = [ws for ws in workspaces if ws.focused][0]
ws_x = workspace.rect.x
ws_y = workspace.rect.y
ws_width = workspace.rect.width
ws_height = workspace.rect.height
width = ws_width
height = int(ws_height * height)
posx = ws_x
posy = ws_y
command = (f'[instance={term_name}] '
'border none,'
f'resize set {width} px {height} px,'
'scratchpad show,'
f'move absolute position {posx}px {posy}px')
logger.debug(f"QuakeConsole: {command}")
await i3.command(command)
async def notify(*args):
"""Send a notification with notify-send."""
proc = await asyncio.create_subprocess_exec(
"notify-send", *args)
await proc.communicate()
@on("container-info")
async def container_info(i3, event):
"""Show information about the focused container."""
tree = await i3.get_tree()
window = tree.find_focused()
if not window:
return
logger.info(f"window raw information: {window.ipc_data}")
summary = "About focused container"
r = window.rect
w = window
info = {
"name": w.name,
"title": w.window_title,
"class": w.window_class,
"instance": w.window_instance,
"role": w.window_role,
"type": w.ipc_data['window_type'],
"sticky": w.sticky,
"floating": w.floating,
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
"layout": w.layout,
"parcent": w.percent,
"marks": ", ".join(w.marks) or "(none)"
}
body = "\n".join((f"<tt>{k:10}</tt> {html.escape(str(v))}"
for k, v in info.items()
if v is not None))
await notify(
"-i", "system-search",
"-t", "10000",
summary,
body)
@on("workspace-info")
async def workspace_info(i3, event):
"""Show information about the focused workspace."""
workspaces = await i3.get_workspaces()
focused = [w for w in workspaces if w.focused]
if not focused:
return
workspace = focused[0]
summary = f"Workspace {workspace.num} on {workspace.output}"
tree = await i3.get_tree()
workspace = [w for w in tree.workspaces()
if w.num == workspace.num]
def format(container):
if container.type == "workspace":
if container.nodes:
root = ""
else:
if container.ipc_data['workspace_layout'] != "default":
layout = container.ipc_data['workspace_layout']
else:
layout = container.layout
root = f"Empty workspace, with {layout} layout"
else:
if container.focused:
style = 'foreground="#ffaf00"'
else:
style = 'foreground="#6c98ee"'
root = (f"<span {style}>"
f"({container.layout})"
"</span>")
if container.window_title:
root += (f" {html.escape(container.window_class.lower())}:"
f" {html.escape(container.window_title)}")
children = []
for child in container.nodes:
if child == container.nodes[-1]:
first = "└─"
others = " "
else:
first = "├─"
others = ""
content = format(child).replace("\n", f"\n{others}")
children.append(f"<tt>{first}</tt>{content}")
children.insert(0, root)
return "\n".join(children)
body = format(workspace[0])
await notify(
"-i", "system-search",
"-t", "15000",
summary,
body.lstrip("\n"))
output_update_running = None
@on(Event.OUTPUT)
async def output_update(i3, event):
"""React to a XRandR change."""
global output_update_running
if output_update_running is not None:
output_update_running.cancel()
def output_update_now():
"""Execute actions to react to XRandR change."""
global output_update_running
output_update_running = None
logger.info("XRandR change detected")
cmds = (
"systemctl --user reload --no-block xsettingsd.service",
"systemctl --user start --no-block wallpaper.service",
)
for cmd in cmds:
proc = subprocess.run(shlex.split(cmd))
if proc.returncode != 0:
logger.warning(f"{cmd} exited with {proc.returncode}")
logger.debug("schedule XRandR change")
output_update_running = asyncio.get_event_loop().call_later(
1, output_update_now)
async def main(options):
i3 = await Connection().connect()
# Regular events
for fn, events in on.functions.items():
for event in events:
if isinstance(event, Event):
i3.on(event, fn)
# React to some bindings
async def binding_event(i3, event):
"""Process a binding event."""
# We only processes it when it is a nop command and we use
# this mechanism as an IPC mechanism. The alternative would be
# to use ticks but we would need to spawn an i3-msg process
# for that.
cmd = event.binding.command
if not cmd.startswith("nop "):
return
cmd = cmd[4:].strip('"\'')
if not cmd:
return
kind = cmd.split(":")[0]
for fn, events in on.functions.items():
for e in events:
if e == kind:
await fn(i3, cmd)
i3.on(Event.BINDING, binding_event)
await i3.main()
if __name__ == "__main__":
# Parse
description = sys.modules[__name__].__doc__
for fn, events in on.functions.items():
description += f" {fn.__doc__}"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("--debug", "-d", action="store_true",
default=False,
help="enable debugging")
options = parser.parse_args()
# Logging
root = logging.getLogger("")
root.setLevel(logging.WARNING)
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter(
"%(levelname)s[%(name)s] %(message)s"))
root.addHandler(ch)
try:
asyncio.get_event_loop().run_until_complete(main(options))
except Exception as e:
logger.exception("%s", e)
sys.exit(1)
sys.exit(0)