vincentbernat.i3wm-configur.../bin/i3-companion

409 lines
12 KiB
Text
Raw Normal View History

#!/usr/bin/python3
"""Personal i3 companion."""
import argparse
import logging
import logging.handlers
import os
import sys
import re
2021-07-07 00:24:23 +02:00
import asyncio
import shlex
import subprocess
import html
import functools
2021-07-07 00:24:23 +02:00
from i3ipc.aio import Connection
from i3ipc import Event
logger = logging.getLogger(os.path.splitext(os.path.basename(sys.argv[0]))[0])
def on(*events):
"""Tag events that should be provided to the function."""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
return fn(*args, **kwargs)
on.functions = getattr(on, "functions", {})
on.functions[fn] = events
return wrapper
return decorator
# See https://fontawesome.com/v5.15/icons
application_icons = {
"chromium": "",
"discord": "",
"emacs": "",
"firefox": "",
"gimp": "",
2021-07-10 13:07:14 +02:00
"gitg": "",
"google-chrome": "",
"inkscape": "",
2021-07-07 14:42:39 +02:00
"libreoffice": "",
"mpv": "",
2021-07-07 14:42:39 +02:00
"pavucontrol": "",
"signal": "",
"snes9x-gtk": "",
"spotify": "",
2021-07-08 12:06:42 +02:00
"steam": "",
"vbeterm": "",
"zathura": "",
"zoom": ""
}
application_icons_nomatch = ""
application_icons_alone = {
application_icons[k] for k in {
"vbeterm"
}
}
@on(Event.WINDOW_MOVE, Event.WINDOW_NEW, Event.WINDOW_CLOSE)
2021-07-07 00:24:23 +02:00
async def workspace_rename(i3, event):
"""Rename workspaces using icons to match what's inside it."""
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
workspaces = tree.workspaces()
commands = []
def application_icon(window):
"""Get application icon for a window."""
for attr in ('window_instance',
'window_class'):
name = getattr(window, attr, None)
if name is None:
continue
for k, v in application_icons.items():
if re.match(rf"^{k}\b", name, re.IGNORECASE):
logger.debug(f"in {attr}, found '{name}', matching {k}")
return v
return application_icons_nomatch
for workspace in workspaces:
icons = set()
for window in workspace.leaves():
icon = application_icon(window)
if icon is not None:
icons.add(icon)
if any([i not in application_icons_alone for i in icons]):
icons -= application_icons_alone
new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":")
if workspace.name != new_name:
logger.debug(f"rename workspace {workspace.num}")
command = f'rename workspace "{workspace.name}" to "{new_name}"'
commands.append(command)
2021-07-07 00:24:23 +02:00
await i3.command(';'.join(commands))
async def _new_workspace(i3):
workspaces = await i3.get_workspaces()
workspace_nums = {w.num for w in workspaces}
max_num = max(workspace_nums)
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
logger.info(f'create new workspace number {available}')
await i3.command(f'workspace number "{available}"')
return available
@on("new-workspace", "move-to-new-workspace")
2021-07-07 00:24:23 +02:00
async def new_workspace(i3, event):
"""Create a new workspace and optionally move a window to it."""
# Get the currently focused window
if event == "move-to-new-workspace":
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
current = tree.find_focused()
if not current:
return
num = await _new_workspace(i3)
# Move the window to this workspace
if event == "move-to-new-workspace":
await current.command(f'move container to workspace '
f'number "{num}"')
exclusive_apps = {
"emacs",
"firefox"
}
intrusive_apps = {
"vbeterm"
}
@on(Event.WINDOW_NEW)
async def worksplace_exclusive(i3, event):
"""Move new windows on a new workspace instead of sharing a workspace
with an exclusive app."""
w = event.container
def can_intrude(w):
"""Can this new window intrude any workspace?"""
if w.floating in {"auto_on", "user_on"}:
return True
if w.ipc_data['window_type'] not in {"normal", "splash", "unknown"}:
return True
if w.sticky:
return True
ids = {s is not None and s.lower() or None
for s in {w.name, w.window_class, w.window_instance}}
if ids.intersection(intrusive_apps):
return True
# Can the new window just intrude?
if can_intrude(w):
logger.debug("window {w.name} can intrude")
return
# Does the current workspace contains an exclusive app?
tree = await i3.get_tree()
workspace = tree.find_focused().workspace()
if not workspace:
return
ids = {s is not None and s.lower() or None
for ow in workspace.leaves()
for s in {ow.name, ow.window_class, ow.window_instance}
if w.id != ow.id}
exclusives = ids.intersection(exclusive_apps)
if not exclusives:
logger.debug("no exclusive app, {w.name} can go there")
return
# Create a new workspace and move the window here
num = await _new_workspace(i3)
logger.info(f'move window {w.name} to workspace {num}')
await w.command(f'move container to workspace number "{num}"')
@on("quake-console")
2021-07-07 00:24:23 +02:00
async def quake_console(i3, event):
"""Spawn a quake console or toggle an existing one."""
try:
_, term_exec, term_name, height = event.split(":")
height = float(height)
except Exception as exc:
logger.warn(f"unable to parse payload {event}: {exc}")
return
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
term = tree.find_instanced(term_name)
if not term:
2021-07-07 00:24:23 +02:00
await i3.command(f'exec {term_exec} --name {term_name}')
tries = 5
while not term and tries:
2021-07-07 00:24:23 +02:00
tree = await i3.get_tree()
term = tree.find_instanced(term_name)
await asyncio.sleep(0.2)
tries -= 1
if not term:
raise RuntimeError("unable to spawn terminal")
term = term[0]
2021-07-07 00:24:23 +02:00
workspaces = await i3.get_workspaces()
workspace = [ws for ws in workspaces if ws.focused][0]
ws_x = workspace.rect.x
ws_y = workspace.rect.y
ws_width = workspace.rect.width
ws_height = workspace.rect.height
width = ws_width
height = int(ws_height * height)
posx = ws_x
posy = ws_y
command = (f'[instance={term_name}] '
'border none,'
f'resize set {width} px {height} px,'
'scratchpad show,'
f'move absolute position {posx}px {posy}px')
logger.debug(f"QuakeConsole: {command}")
2021-07-07 00:24:23 +02:00
await i3.command(command)
async def notify(*args):
"""Send a notification with notify-send."""
proc = await asyncio.create_subprocess_exec(
"notify-send", *args)
await proc.communicate()
@on("container-info")
async def container_info(i3, event):
"""Show information about the focused container."""
tree = await i3.get_tree()
window = tree.find_focused()
if not window:
return
logger.info(f"window raw information: {window.ipc_data}")
summary = "About focused container"
r = window.rect
w = window
info = {
"name": w.name,
"title": w.window_title,
"class": w.window_class,
"instance": w.window_instance,
"role": w.window_role,
"type": w.ipc_data['window_type'],
"sticky": w.sticky,
"floating": w.floating,
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
"layout": w.layout,
"parcent": w.percent,
"marks": ", ".join(w.marks) or "(none)"
}
body = "\n".join((f"<tt>{k:10}</tt> {html.escape(str(v))}"
for k, v in info.items()
if v is not None))
await notify(
"-i", "system-search",
"-t", "10000",
summary,
body)
@on("workspace-info")
async def workspace_info(i3, event):
"""Show information about the focused workspace."""
workspaces = await i3.get_workspaces()
focused = [w for w in workspaces if w.focused]
if not focused:
return
workspace = focused[0]
summary = f"Workspace {workspace.num} on {workspace.output}"
tree = await i3.get_tree()
workspace = [w for w in tree.workspaces()
if w.num == workspace.num]
def format(container):
if container.focused:
style = 'foreground="#ffaf00"'
elif not container.window:
style = 'foreground="#6c98ee"'
else:
style = ''
if container.window:
content = (f"{(container.window_class or '???').lower()}: "
f"{(container.window_title or '???')}")
elif container.type == 'workspace' and not container.nodes:
# Empty workspaces use workspace_layout, but when default,
# this is layout...
layout = container.ipc_data['workspace_layout']
if layout == "default":
layout = container.layout
content = f"({layout})"
else:
content = f"({container.layout})"
root = (f"<span {style}>"
f"{content.lower()}"
"</span>")
children = []
for child in container.nodes:
if child == container.nodes[-1]:
first = "└─"
others = " "
else:
first = "├─"
others = "│ "
content = format(child).replace("\n", f"\n{others}")
children.append(f"<tt>{first}</tt>{content}")
children.insert(0, root)
return "\n".join(children)
body = format(workspace[0])
await notify(
"-i", "system-search",
"-t", "15000",
summary,
body.lstrip("\n"))
output_update_running = None
@on(Event.OUTPUT)
async def output_update(i3, event):
"""React to a XRandR change."""
global output_update_running
if output_update_running is not None:
output_update_running.cancel()
def output_update_now():
"""Execute actions to react to XRandR change."""
global output_update_running
output_update_running = None
logger.info("XRandR change detected")
cmds = (
"systemctl --user reload --no-block xsettingsd.service",
"systemctl --user start --no-block wallpaper.service",
)
for cmd in cmds:
proc = subprocess.run(shlex.split(cmd))
if proc.returncode != 0:
logger.warning(f"{cmd} exited with {proc.returncode}")
logger.debug("schedule XRandR change")
output_update_running = asyncio.get_event_loop().call_later(
1, output_update_now)
async def main(options):
i3 = await Connection().connect()
# Regular events
for fn, events in on.functions.items():
for event in events:
if isinstance(event, Event):
i3.on(event, fn)
# React to some bindings
async def binding_event(i3, event):
"""Process a binding event."""
# We only processes it when it is a nop command and we use
# this mechanism as an IPC mechanism. The alternative would be
# to use ticks but we would need to spawn an i3-msg process
# for that.
cmd = event.binding.command
if not cmd.startswith("nop "):
return
cmd = cmd[4:].strip('"\'')
if not cmd:
return
kind = cmd.split(":")[0]
for fn, events in on.functions.items():
for e in events:
if e == kind:
await fn(i3, cmd)
i3.on(Event.BINDING, binding_event)
2021-07-07 00:24:23 +02:00
await i3.main()
2021-07-07 00:24:23 +02:00
if __name__ == "__main__":
# Parse
description = sys.modules[__name__].__doc__
for fn, events in on.functions.items():
description += f" {fn.__doc__}"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("--debug", "-d", action="store_true",
default=False,
help="enable debugging")
options = parser.parse_args()
# Logging
root = logging.getLogger("")
root.setLevel(logging.WARNING)
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter(
"%(levelname)s[%(name)s] %(message)s"))
root.addHandler(ch)
2021-07-07 00:24:23 +02:00
try:
asyncio.get_event_loop().run_until_complete(main(options))
except Exception as e:
logger.exception("%s", e)
sys.exit(1)
sys.exit(0)