mirror of
https://github.com/vincentbernat/i3wm-configuration.git
synced 2025-06-20 17:15:41 +02:00
1221 lines
39 KiB
Python
Executable file
1221 lines
39 KiB
Python
Executable file
#!/usr/bin/python3
|
||
|
||
"""Personal i3 companion."""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import collections
|
||
import contextlib
|
||
import errno
|
||
import functools
|
||
import glob
|
||
import html
|
||
import logging
|
||
import logging.handlers
|
||
import os
|
||
import shlex
|
||
import socket
|
||
import struct
|
||
import subprocess
|
||
import sys
|
||
import types
|
||
|
||
import dbussy
|
||
import i3ipc
|
||
import ravel
|
||
import xcffib
|
||
import xcffib.randr
|
||
import xcffib.dpms
|
||
import xcffib.xproto
|
||
from i3ipc.aio import Connection
|
||
from systemd import daemon, journal
|
||
|
||
|
||
def icon(font_number, char):
|
||
"""Turn an icon into a string for Polybar."""
|
||
# Font number is from Polybar configuration.
|
||
# 2: https://fontawesome.com/v6.0/icons?s=solid
|
||
# 3: https://fontawesome.com/v6.0/icons?s=brands
|
||
return "%%{T%d}%s%%{T-}" % (font_number, char)
|
||
|
||
|
||
# Configuration
|
||
|
||
application_icons = {
|
||
"86box": icon(2, ""),
|
||
"anydesk": icon(2, ""),
|
||
"blender": icon(2, ""),
|
||
"calibre": icon(2, "📚"),
|
||
"cheese": icon(2, ""),
|
||
"chromium": icon(3, ""),
|
||
"ungoogled_chromium": icon(3, ""),
|
||
"d-feet": icon(2, ""),
|
||
"darktable": icon(2, ""),
|
||
"discord": icon(3, ""),
|
||
"draw.io": icon(2, ""),
|
||
"easyeffects": icon(2, ""),
|
||
"emacs": icon(2, ""),
|
||
"file-roller": icon(2, ""),
|
||
"fileroller": icon(2, ""),
|
||
"firefox": icon(3, ""),
|
||
"gaupol": icon(2, ""),
|
||
"geeqie": icon(2, ""),
|
||
"gimp": icon(2, ""),
|
||
"gitg": icon(2, ""),
|
||
"gnome-boxes": icon(2, ""),
|
||
"google-chrome": icon(3, ""),
|
||
"grisbi": icon(2, ""),
|
||
"inkscape": icon(2, ""),
|
||
"jitsi meet": icon(2, ""),
|
||
"libreoffice": icon(2, "📄"),
|
||
"soffice": icon(2, "📄"),
|
||
"onlyoffice desktop editors": icon(2, "📄"),
|
||
"maps": icon(2, ""),
|
||
"mednafen": icon(2, ""),
|
||
"mpv": icon(2, ""),
|
||
"nestopia": icon(2, ""),
|
||
"nsxiv": icon(2, ""),
|
||
"pavucontrol": icon(2, ""),
|
||
"peek": icon(2, ""),
|
||
"pinentry": icon(2, ""),
|
||
"qalculate": icon(2, ""),
|
||
"qemu": icon(2, ""),
|
||
"qgis3": icon(2, ""),
|
||
"retroarch": icon(2, ""),
|
||
"scummvm": icon(2, ""),
|
||
"signal": icon(2, ""),
|
||
"snes9x": icon(2, ""),
|
||
"spot": icon(3, ""),
|
||
"spotify": icon(3, ""),
|
||
"steam": icon(3, ""),
|
||
"sxiv": icon(2, ""),
|
||
"system-config-printer.py": icon(2, "⎙"),
|
||
"thunar": icon(2, ""),
|
||
"thunderbird": icon(2, ""),
|
||
"vbeterm": icon(2, ""),
|
||
"virt-manager": icon(2, ""),
|
||
"webex": icon(2, ""),
|
||
"wireshark": icon(2, ""),
|
||
"zathura": icon(2, ""),
|
||
"zeal": icon(2, ""),
|
||
"zoom": icon(2, ""),
|
||
}
|
||
icons = {
|
||
"access-point": icon(2, ""),
|
||
"battery-100": icon(2, ""),
|
||
"battery-75": icon(2, ""),
|
||
"battery-50": icon(2, ""),
|
||
"battery-25": icon(2, ""),
|
||
"battery-0": icon(2, ""),
|
||
"bluetooth": icon(2, ""),
|
||
"camera": icon(2, "⎙"),
|
||
"gamepad": icon(2, "🎮"),
|
||
"headphones": icon(2, "🎧"),
|
||
"headset": icon(2, ""),
|
||
"keyboard": icon(2, "⌨"),
|
||
"laptop": icon(2, "💻"),
|
||
"loudspeaker": icon(2, ""),
|
||
"microphone": icon(2, ""),
|
||
"mouse": icon(2, ""),
|
||
"notifications-disabled": icon(2, "🔕"),
|
||
"notifications-enabled": icon(2, ""),
|
||
"nowifi": icon(2, ""),
|
||
"phone": icon(2, "📞"),
|
||
"printer": icon(2, "⎙"),
|
||
"scanner": icon(2, ""),
|
||
"unknown": icon(2, ""),
|
||
"vpn": icon(2, ""),
|
||
"webcam": icon(2, ""),
|
||
"wifi-high": icon(2, ""),
|
||
"wifi-low": icon(2, ""),
|
||
"wifi-medium": icon(2, ""),
|
||
"wired": icon(2, ""),
|
||
}
|
||
application_icons_nomatch = icon(2, "")
|
||
application_icons_ignore = {"dimmer"}
|
||
application_icons_alone = {application_icons[k] for k in {"vbeterm"}}
|
||
exclusive_apps = {
|
||
"emacs",
|
||
"firefox",
|
||
"chromium-browser",
|
||
"google-chrome",
|
||
"thunderbird",
|
||
"thunderbird-beta",
|
||
"thunderbird-esr",
|
||
}
|
||
intrusive_apps = {"vbeterm"}
|
||
|
||
logger = logging.getLogger("i3-companion")
|
||
|
||
# Events for @on decorator
|
||
DBusSignal = collections.namedtuple(
|
||
"DBusSignal",
|
||
["interface", "member", "signature", "system", "path", "onlyif"],
|
||
defaults=(True, "/", None),
|
||
)
|
||
StartEvent = object()
|
||
I3Event = i3ipc.Event
|
||
CommandEvent = collections.namedtuple("CommandEvent", ["name"])
|
||
|
||
NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2
|
||
NM_DEVICE_TYPE_ETHERNET = 1
|
||
NM_DEVICE_TYPE_WIFI = 2
|
||
NM_DEVICE_TYPE_MODEM = 8
|
||
NM_DEVICE_STATE_UNMANAGED = 10
|
||
NM_DEVICE_STATE_ACTIVATED = 100
|
||
|
||
|
||
# Event helpers
|
||
|
||
|
||
def static(**kwargs):
|
||
"""Define static variables for the event handler."""
|
||
|
||
def decorator(fn):
|
||
fn.__dict__.update(kwargs)
|
||
return fn
|
||
|
||
return decorator
|
||
|
||
|
||
@static(functions={})
|
||
def on(*events):
|
||
"""Tag events that should be provided to the function."""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(*args, **kwargs):
|
||
return fn(*args, **kwargs)
|
||
|
||
on.functions[fn] = events
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def retry(max_retries):
|
||
"""Retry an async function."""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
retries = max_retries
|
||
while True:
|
||
try:
|
||
logger.debug("execute %s (remaining tries: %s)", fn, retries)
|
||
return await fn(*args, **kwargs)
|
||
except Exception as e:
|
||
if retries > 0:
|
||
retries -= 1
|
||
logger.warning(
|
||
f"while executing {fn} (remaining tries: %d): %s",
|
||
retries,
|
||
e,
|
||
)
|
||
else:
|
||
logger.exception(f"while executing {fn}: %s", e)
|
||
return
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def debounce(sleep, *, unless=None):
|
||
"""Debounce a function call (batch successive calls into only one).
|
||
Optional immediate execution. Ensure only one instance is
|
||
executed. It is assumed the arguments provided to the debounced
|
||
function have no effect on its execution."""
|
||
|
||
def decorator(fn):
|
||
async def worker():
|
||
while True:
|
||
with contextlib.suppress(asyncio.TimeoutError):
|
||
# Wait for an urgent work or until sleep is elapsed
|
||
await asyncio.wait_for(workers[fn].urgent.wait(), timeout=sleep)
|
||
logger.debug("urgent work received for %s", fn)
|
||
args, kwargs = workers[fn].queue
|
||
workers[fn].queue = None
|
||
workers[fn].urgent.clear()
|
||
|
||
# Execute the work
|
||
logger.debug("execute work for %s", fn)
|
||
try:
|
||
await fn(*args, **kwargs)
|
||
except Exception as e:
|
||
logger.debug("while running %s, worker got %s", fn, e)
|
||
workers[fn] = None
|
||
raise
|
||
|
||
# Do we still have something to do?
|
||
if workers[fn].queue is None:
|
||
break
|
||
|
||
# No more work
|
||
logger.debug("no more work for %s", fn)
|
||
workers[fn] = None
|
||
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
if workers[fn] is None:
|
||
logger.debug("create new worker for %s", fn)
|
||
workers[fn] = types.SimpleNamespace()
|
||
workers[fn].task = asyncio.create_task(worker())
|
||
workers[fn].urgent = asyncio.Event()
|
||
workers[fn].queue = (args, kwargs)
|
||
else:
|
||
logger.debug("enqueue new work for %s", fn)
|
||
if unless is not None and unless(*args, **kwargs):
|
||
logger.debug("wake up now for %s", fn)
|
||
workers[fn].urgent.set()
|
||
return await workers[fn].task
|
||
|
||
workers[fn] = None
|
||
return wrapper
|
||
|
||
workers = {}
|
||
return decorator
|
||
|
||
|
||
def polybar(module):
|
||
"""Use returned string to update polybar module"""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
content = await fn(*args, **kwargs)
|
||
if type(content) is not str or cache.get(module) == content:
|
||
return content
|
||
|
||
# Update cache file (for when polybar restarts)
|
||
with open(f"{os.getenv('XDG_RUNTIME_DIR')}/i3/{module}.txt", "w") as out:
|
||
out.write(content)
|
||
|
||
# Send it to polybar
|
||
cmd = bytes(f"#{module}.send.{content}", "utf-8")
|
||
data = (
|
||
b"polyipc" # magic
|
||
+ struct.pack("=BIB", 0, len(cmd), 2) # header: version, length, type
|
||
+ cmd
|
||
)
|
||
for name in glob.glob(f"{os.getenv('XDG_RUNTIME_DIR')}/polybar/*.sock"):
|
||
try:
|
||
reader, writer = await asyncio.open_unix_connection(name)
|
||
except OSError as e:
|
||
if e.errno not in (errno.ENXIO, errno.ECONNREFUSED):
|
||
raise
|
||
else:
|
||
try:
|
||
writer.write(data)
|
||
await writer.drain()
|
||
await reader.read()
|
||
finally:
|
||
writer.close()
|
||
await writer.wait_closed()
|
||
|
||
logger.info(f"polybar/{module}: content updated")
|
||
cache[module] = content
|
||
return content
|
||
|
||
return wrapper
|
||
|
||
cache = {}
|
||
return decorator
|
||
|
||
|
||
# Other helpers
|
||
|
||
|
||
async def notify(i3, **kwargs):
|
||
"""Send a notification with notify-send."""
|
||
conn = i3.session_bus["org.freedesktop.Notifications"]
|
||
obj = conn["/org/freedesktop/Notifications"]
|
||
notifications = await obj.get_async_interface("org.freedesktop.Notifications")
|
||
parameters = dict(
|
||
app_name=logger.name,
|
||
replaces_id=0,
|
||
app_icon="dialog-information",
|
||
summary="",
|
||
actions=[],
|
||
hints={},
|
||
expire_timeout=5000,
|
||
)
|
||
parameters.update(kwargs)
|
||
return await notifications.Notify(**parameters)
|
||
|
||
|
||
async def create_new_workspace(i3):
|
||
"""Create a new workspace and returns its number."""
|
||
workspaces = await i3.get_workspaces()
|
||
workspace_nums = {w.num for w in workspaces}
|
||
max_num = max(workspace_nums)
|
||
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
|
||
logger.info(f"create new workspace number {available}")
|
||
await i3.command(f'workspace number "{available}"')
|
||
return available
|
||
|
||
|
||
# Event handlers
|
||
|
||
|
||
@on(StartEvent, I3Event.WINDOW_MOVE, I3Event.WINDOW_NEW, I3Event.WINDOW_CLOSE)
|
||
@debounce(0.2)
|
||
async def workspace_rename(i3, event):
|
||
"""Rename workspaces using icons to match what's inside it."""
|
||
tree = await i3.get_tree()
|
||
workspaces = tree.workspaces()
|
||
commands = []
|
||
|
||
for workspace in workspaces:
|
||
icons = set()
|
||
for window in workspace.leaves():
|
||
if window.sticky and window.floating in {"auto_on", "user_on"}:
|
||
continue
|
||
cls = (window.window_class or "").lower()
|
||
if cls in application_icons_ignore:
|
||
continue
|
||
icon = application_icons.get(
|
||
cls,
|
||
application_icons.get(
|
||
cls.split("-")[0],
|
||
application_icons.get(
|
||
cls.split("_")[0], application_icons.get(cls.split(".")[-1])
|
||
),
|
||
),
|
||
)
|
||
icons.add(icon or application_icons_nomatch)
|
||
if any([i not in application_icons_alone for i in icons]):
|
||
icons -= application_icons_alone
|
||
new_name = f"{workspace.num}:{'|'.join(sorted(list(icons)))}".rstrip(":")
|
||
if workspace.name != new_name:
|
||
logger.debug("rename workspace %s", workspace.num)
|
||
command = f'rename workspace "{workspace.name}" to "{new_name}"'
|
||
commands.append(command)
|
||
await i3.command(";".join(commands))
|
||
|
||
|
||
@on(CommandEvent("previous-workspace"), I3Event.WORKSPACE_FOCUS)
|
||
@static(history=collections.defaultdict(list))
|
||
async def previous_workspace(i3, event):
|
||
"""Go to previous workspace on the same output."""
|
||
history = previous_workspace.history
|
||
if isinstance(event, i3ipc.WorkspaceEvent) and event.old:
|
||
data = event.old.ipc_data
|
||
output, num = data["output"], data["num"]
|
||
if history[output][-1:] != [num]:
|
||
history[output].append(num)
|
||
history[output] = history[output][-5:]
|
||
logger.debug("on %s, history is %s", output, history[output])
|
||
elif event == "previous-workspace":
|
||
workspaces = await i3.get_workspaces()
|
||
try:
|
||
focused = [w for w in workspaces if w.focused][0]
|
||
except IndexError:
|
||
return
|
||
output = focused.output
|
||
while True:
|
||
try:
|
||
previous = history[output].pop()
|
||
if [
|
||
w
|
||
for w in workspaces
|
||
if w.num != focused.num and w.num == previous and w.output == output
|
||
]:
|
||
break
|
||
except IndexError:
|
||
logger.debug("no previous workspace on %s", output)
|
||
return
|
||
logger.debug("switching to workspace %d on %s", previous, output)
|
||
await i3.command(f"workspace number {previous}")
|
||
await i3.command("focus tiling")
|
||
|
||
|
||
@on(CommandEvent("new-workspace"), CommandEvent("move-to-new-workspace"))
|
||
async def new_workspace(i3, event):
|
||
"""Create a new workspace and optionally move a window to it."""
|
||
# Get the currently focused window
|
||
if event == "move-to-new-workspace":
|
||
tree = await i3.get_tree()
|
||
current = tree.find_focused()
|
||
if not current:
|
||
return
|
||
|
||
num = await create_new_workspace(i3)
|
||
|
||
# Move the window to this workspace
|
||
if event == "move-to-new-workspace":
|
||
await current.command(f"move container to workspace " f'number "{num}"')
|
||
|
||
|
||
@on(CommandEvent("move-all-workspaces-to-next-output"))
|
||
async def move_all_workspaces_to_next_output(i3, event):
|
||
"""Move all workspaces from the current output to the next one."""
|
||
outputs = await i3.get_outputs()
|
||
if len(outputs) < 2:
|
||
return
|
||
workspaces = await i3.get_workspaces()
|
||
try:
|
||
focused = [w for w in workspaces if w.focused][0]
|
||
except IndexError:
|
||
return
|
||
output = focused.output
|
||
for workspace in workspaces:
|
||
if workspace.output == output:
|
||
await i3.command(
|
||
f"workspace number {workspace.num} ; move workspace to output next"
|
||
)
|
||
|
||
|
||
@on(I3Event.WORKSPACE_INIT)
|
||
@static(lock=asyncio.Lock())
|
||
async def workspace_rename_duplicate(i3, event):
|
||
"""Rename a workspace when initialized empty with a duplicate number."""
|
||
# This will not be needed once https://github.com/i3/i3/pull/4252 is released.
|
||
async with workspace_rename_duplicate.lock:
|
||
workspace = event.current
|
||
workspaces = await i3.get_workspaces()
|
||
workspace_nums = {w.num for w in workspaces if workspace.id != w.ipc_data["id"]}
|
||
if workspace.num in workspace_nums:
|
||
max_num = max(workspace_nums)
|
||
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
|
||
await i3.command(f"rename workspace {workspace.num} to {available}")
|
||
|
||
|
||
@on(I3Event.WINDOW_NEW, CommandEvent("inhibit-exclusive"))
|
||
@static(inhibited_by=False)
|
||
async def workspace_exclusive(i3, event):
|
||
"""Move new windows on a new workspace instead of sharing a workspace
|
||
with an exclusive app."""
|
||
if event == "inhibit-exclusive":
|
||
logger.debug("inhibit exclusive workspace")
|
||
workspace_exclusive.inhibited_by = me = object()
|
||
await asyncio.sleep(1)
|
||
if workspace_exclusive.inhibited_by is me:
|
||
logger.info("cancel exclusive workspace inhibition")
|
||
workspace_exclusive.inhibited_by = None
|
||
return
|
||
if workspace_exclusive.inhibited_by:
|
||
workspace_exclusive.inhibited_by = None
|
||
return
|
||
w = event.container
|
||
|
||
# Can the current window intrude the workspace?
|
||
if (
|
||
w.floating in {"auto_on", "user_on"}
|
||
or (w.window_class or "").lower() in intrusive_apps
|
||
):
|
||
logger.debug("window %s can intrude", w.window_class)
|
||
return
|
||
|
||
tree = await i3.get_tree()
|
||
|
||
# Get the window workspace. From an event, w.workspace() is None,
|
||
# so search it in the tree.
|
||
current_window = tree.find_by_id(w.id)
|
||
if current_window is None:
|
||
logger.debug("cannot find new window in tree?")
|
||
return
|
||
current_workspace = current_window.workspace()
|
||
if current_workspace is None:
|
||
logger.debug("cannot find new window workspace?")
|
||
return
|
||
|
||
# Get the list of workspaces with an exclusive app, excluding the
|
||
# current window and windows of the same class.
|
||
exclusive_workspaces = {
|
||
ow.workspace().num
|
||
for ow in tree.leaves()
|
||
if w.id != ow.id
|
||
and (w.window_class or object()) != ow.window_class
|
||
and (ow.window_class or "").lower() in exclusive_apps
|
||
and not (ow.sticky and ow.floating in {"auto_on", "user_on"})
|
||
}
|
||
|
||
# If current one is OK, don't move
|
||
if current_workspace.num not in exclusive_workspaces:
|
||
logger.debug("no exclusive app, %s can go there", w.window_class)
|
||
return
|
||
|
||
# Are there other workspaces with the same app but no exclusive apps?
|
||
candidate_workspaces = {
|
||
ow.workspace().num
|
||
for ow in tree.leaves()
|
||
if w.id != ow.id and (w.window_class or object()) == ow.window_class
|
||
}
|
||
candidate_workspaces -= exclusive_workspaces
|
||
candidate_workspaces -= {-1} # scratchpad
|
||
|
||
if candidate_workspaces:
|
||
# Use one of the candidates
|
||
num = next(iter(candidate_workspaces))
|
||
else:
|
||
# Create a new workspace
|
||
num = await create_new_workspace(i3)
|
||
|
||
logger.info(f"move window {w.window_class} to workspace {num}")
|
||
await w.command(f'move container to workspace number "{num}", focus')
|
||
|
||
|
||
@on(CommandEvent("quake-console"))
|
||
async def quake_console(i3, event):
|
||
"""Spawn a quake console or toggle an existing one."""
|
||
try:
|
||
_, term_exec, term_name, height = event.split(":")
|
||
height = float(height)
|
||
except Exception as exc:
|
||
logger.warn(f"unable to parse payload {event}: {exc}")
|
||
return
|
||
|
||
# Look for the terminal or spawn it
|
||
tree = await i3.get_tree()
|
||
try:
|
||
term = tree.find_instanced(term_name)[0]
|
||
except IndexError:
|
||
quake_window = asyncio.get_event_loop().create_future()
|
||
|
||
def wait_for_quake(i3, event):
|
||
w = event.container
|
||
if quake_window.done() or w.window_instance != term_name:
|
||
return
|
||
quake_window.set_result(w)
|
||
|
||
i3.on(I3Event.WINDOW_NEW, wait_for_quake)
|
||
try:
|
||
await i3.command(f"exec exec {term_exec} --name {term_name}")
|
||
done, pending = await asyncio.wait((quake_window,), timeout=1)
|
||
finally:
|
||
i3.off(wait_for_quake)
|
||
if not done:
|
||
raise RuntimeError("unable to spawn terminal")
|
||
term = quake_window.result()
|
||
await term.command("move window to scratchpad")
|
||
|
||
workspaces = await i3.get_workspaces()
|
||
workspace = [ws for ws in workspaces if ws.focused][0]
|
||
ws_x, ws_y = workspace.rect.x, workspace.rect.y
|
||
ws_width, ws_height = workspace.rect.width, workspace.rect.height
|
||
height = int(ws_height * height)
|
||
command = (
|
||
f"[instance={term_name}] "
|
||
"border none,"
|
||
f"resize set {ws_width} px {height} px,"
|
||
"scratchpad show,"
|
||
f"move absolute position {ws_x}px {ws_y}px"
|
||
)
|
||
logger.debug("QuakeConsole: %s", command)
|
||
await i3.command(command)
|
||
|
||
|
||
@on(CommandEvent("container-info"))
|
||
@static(last_id=0)
|
||
async def container_info(i3, event):
|
||
"""Show information about the focused container."""
|
||
tree = await i3.get_tree()
|
||
window = tree.find_focused()
|
||
if not window:
|
||
return
|
||
logger.info(f"window raw information: {window.ipc_data}")
|
||
summary = "About focused container"
|
||
r = window.rect
|
||
w = window
|
||
info = {
|
||
"name": w.name,
|
||
"title": w.window_title,
|
||
"class": w.window_class,
|
||
"instance": w.window_instance,
|
||
"role": w.window_role,
|
||
"type": w.ipc_data["window_type"],
|
||
"sticky": w.sticky,
|
||
"floating": w.floating,
|
||
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
|
||
"layout": w.layout,
|
||
"parcent": w.percent,
|
||
"marks": ", ".join(w.marks) or "(none)",
|
||
}
|
||
body = "\n".join(
|
||
(
|
||
f"<tt>{k:10}</tt> {html.escape(str(v))}"
|
||
for k, v in info.items()
|
||
if v is not None
|
||
)
|
||
)
|
||
result = await notify(
|
||
i3,
|
||
app_icon="info",
|
||
expire_timeout=10000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=container_info.last_id,
|
||
)
|
||
container_info.last_id = result[0]
|
||
|
||
|
||
@on(CommandEvent("workspace-info"))
|
||
@static(last_id=0)
|
||
async def workspace_info(i3, event):
|
||
"""Show information about the focused workspace."""
|
||
workspaces = await i3.get_workspaces()
|
||
focused = [w for w in workspaces if w.focused]
|
||
if not focused:
|
||
return
|
||
workspace = focused[0]
|
||
summary = f"Workspace {workspace.num} on {workspace.output}"
|
||
tree = await i3.get_tree()
|
||
workspace = [w for w in tree.workspaces() if w.num == workspace.num]
|
||
|
||
def format(container):
|
||
if container.focused:
|
||
style = 'foreground="#ffaf00"'
|
||
elif not container.window:
|
||
style = 'foreground="#6c98ee"'
|
||
else:
|
||
style = ""
|
||
if container.window:
|
||
content = (
|
||
f"{(container.window_class or '???').lower()}: "
|
||
f"{(container.window_title or '???')}"
|
||
)
|
||
elif container.type == "workspace" and not container.nodes:
|
||
# Empty workspaces use workspace_layout, but when default,
|
||
# this is layout...
|
||
layout = container.ipc_data["workspace_layout"]
|
||
if layout == "default":
|
||
layout = container.layout
|
||
content = f"({layout})"
|
||
else:
|
||
content = f"({container.layout})"
|
||
root = f"<span {style}>{content.lower()}</span>"
|
||
children = []
|
||
for child in container.nodes:
|
||
if child == container.nodes[-1]:
|
||
first = "└─"
|
||
others = " "
|
||
else:
|
||
first = "├─"
|
||
others = "│ "
|
||
content = format(child).replace("\n", f"\n<tt>{others}</tt>")
|
||
children.append(f"<tt>{first}</tt>{content}")
|
||
children.insert(0, root)
|
||
return "\n".join(children)
|
||
|
||
body = format(workspace[0]).lstrip("\n")
|
||
result = await notify(
|
||
i3,
|
||
app_icon="system-search",
|
||
expire_timeout=20000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=workspace_info.last_id,
|
||
)
|
||
workspace_info.last_id = result[0]
|
||
|
||
|
||
@on(I3Event.OUTPUT, StartEvent)
|
||
@static(last_setup=None)
|
||
@debounce(2)
|
||
async def output_update(i3, event):
|
||
"""React to a XRandR change."""
|
||
|
||
# Grab current setup. Synchronous, but it's short enough
|
||
randr = i3.x11(xcffib.randr.key)
|
||
screen = i3.x11.get_setup().roots[0]
|
||
monitors = randr.GetMonitors(screen.root, 1).reply().monitors
|
||
current_setup = {
|
||
(
|
||
i3.x11.core.GetAtomName(m.name).reply().name.to_string(),
|
||
m.width,
|
||
m.height,
|
||
m.x,
|
||
m.y,
|
||
)
|
||
for m in monitors
|
||
}
|
||
|
||
# Compare to current setup
|
||
if current_setup == output_update.last_setup:
|
||
logger.debug("current xrandr setup unchanged")
|
||
return
|
||
output_update.last_setup = current_setup
|
||
logger.info("xrandr setup: %s", current_setup)
|
||
if event is StartEvent:
|
||
return
|
||
|
||
# Trigger changes
|
||
logger.info("xrandr change detected")
|
||
cmds = (
|
||
"systemctl --user reload --no-block xsettingsd.service",
|
||
"systemctl --user start --no-block wallpaper.service",
|
||
)
|
||
for cmd in cmds:
|
||
proc = subprocess.run(shlex.split(cmd))
|
||
if proc.returncode != 0:
|
||
logger.warning(f"{cmd} exited with {proc.returncode}")
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
path="/org/bluez",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: args[0] == "org.bluez.Device1" and "Connected" in args[1],
|
||
)
|
||
)
|
||
async def bluetooth_notifications(i3, event, path, interface, changed, invalid):
|
||
"""Display notifications related to Bluetooth state."""
|
||
obj = i3.system_bus["org.bluez"][path]
|
||
obd = await obj.get_async_interface(interface)
|
||
name = await obd.Name
|
||
icon = await obd.Icon
|
||
state = await obd.Connected
|
||
state = "connected" if state else "disconnected"
|
||
await notify(
|
||
i3,
|
||
app_icon=icon,
|
||
summary=name,
|
||
body=f"Bluetooth device {state}",
|
||
)
|
||
|
||
|
||
@on(
|
||
StartEvent,
|
||
DBusSignal(
|
||
path="/org/bluez",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: (
|
||
args[0] == "org.bluez.Device1"
|
||
and "Connected" in args[1]
|
||
or args[0] == "org.bluez.Device1"
|
||
and "ServicesResolved" in args[1]
|
||
or args[0] == "org.bluez.Adapter1"
|
||
and "Powered" in args[1]
|
||
or args[0] == "org.bluez.Battery1"
|
||
and "Percentage" in args[1]
|
||
),
|
||
),
|
||
DBusSignal(
|
||
path="/",
|
||
interface="org.freedesktop.DBus.ObjectManager",
|
||
member="InterfacesAdded",
|
||
signature="oa{sa{sv}}",
|
||
onlyif=lambda args: ("org.bluez.Battery1" in args[1]),
|
||
),
|
||
)
|
||
@retry(2)
|
||
@debounce(1)
|
||
@polybar("bluetooth")
|
||
async def bluetooth_status(i3, event, *args):
|
||
"""Update bluetooth status for Polybar."""
|
||
if event is StartEvent:
|
||
# Do we have a bluetooth device?
|
||
if not os.path.exists("/sys/class/bluetooth"):
|
||
logger.info("no bluetooth detected")
|
||
return ""
|
||
|
||
# OK, get the info
|
||
conn = i3.system_bus["org.bluez"]
|
||
om = await conn["/"].get_async_interface("org.freedesktop.DBus.ObjectManager")
|
||
objects = await om.GetManagedObjects()
|
||
objects = objects[0]
|
||
powered = False
|
||
devices = []
|
||
for path, interfaces in objects.items():
|
||
if "org.bluez.Adapter1" in interfaces:
|
||
# We get an adapter!
|
||
adapter = interfaces["org.bluez.Adapter1"]
|
||
if adapter["Powered"][1]:
|
||
powered = True
|
||
elif "org.bluez.Device1" in interfaces:
|
||
# We have a device!
|
||
device = types.SimpleNamespace(battery=None, icon=None)
|
||
interface = interfaces["org.bluez.Device1"]
|
||
if not interface["Connected"][1]:
|
||
continue
|
||
try:
|
||
device.battery = interfaces["org.bluez.Battery1"]["Percentage"][1]
|
||
except KeyError:
|
||
pass
|
||
try:
|
||
device.icon = interface["Icon"][1]
|
||
except KeyError:
|
||
pass
|
||
devices.append(device)
|
||
|
||
if not powered:
|
||
return ""
|
||
output = ["bluetooth"]
|
||
for device in devices:
|
||
bicons = {
|
||
"audio-card": "loudspeaker",
|
||
"audio-headphones": "headphones",
|
||
"audio-headset": "headset",
|
||
"camera-photo": "camera",
|
||
"camera-video": "webcam",
|
||
"computer": "laptop",
|
||
"input-gaming": "gamepad",
|
||
"input-keyboard": "keyboard",
|
||
"input-mouse": "mouse",
|
||
"network-wireless": "access-point",
|
||
"phone": "phone",
|
||
"printer": "printer",
|
||
"scanner": "scanner",
|
||
}
|
||
output.append(bicons.get(device.icon, "unknown"))
|
||
if device.battery is not None:
|
||
output[-1] = (output[-1], f"battery-{(device.battery+12)//25*25}")
|
||
return "|".join(
|
||
(" ".join(icons[oo] for oo in o) if type(o) is tuple else icons[o])
|
||
for o in output
|
||
)
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
system=False,
|
||
path="/org/freedesktop/Notifications",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: args[0] == "org.dunstproject.cmd0" and "paused" in args[1],
|
||
)
|
||
)
|
||
@polybar("dunst")
|
||
async def dunst_status_update(i3, event, path, interface, changed, invalid):
|
||
"""Update notification status in Polybar."""
|
||
paused = changed["paused"][1]
|
||
return icons[paused and "notifications-disabled" or "notifications-enabled"]
|
||
|
||
|
||
@on(StartEvent)
|
||
@polybar("dunst")
|
||
async def dunst_status_check(i3, event):
|
||
"""Display notification status for Polybar."""
|
||
conn = i3.session_bus["org.freedesktop.Notifications"]
|
||
obj = conn["/org/freedesktop/Notifications"]
|
||
dunst = await obj.get_async_interface("org.dunstproject.cmd0")
|
||
paused = await dunst.paused
|
||
return icons[paused and "notifications-disabled" or "notifications-enabled"]
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
interface="org.usbguard.Devices1",
|
||
member="DevicePolicyApplied",
|
||
signature="uusua{ss}",
|
||
)
|
||
)
|
||
@static(history=collections.defaultdict(list))
|
||
async def usbguard_notifications(
|
||
i3, event, path, id, target_new, device_rule, rule_id, attributes
|
||
):
|
||
"""Display notifications related when a USBGuard policy is applied."""
|
||
if attributes["name"]:
|
||
human = f"\"{attributes['name']}\" ({attributes['id']})"
|
||
else:
|
||
human = f"{attributes['id']}"
|
||
extra_args = {}
|
||
if target_new == 0:
|
||
action = "accepted"
|
||
elif target_new == 1:
|
||
action = "blocked"
|
||
extra_args["expire_timeout"] = 0
|
||
elif target_new == 2:
|
||
action = "rejected"
|
||
else:
|
||
return
|
||
nid = await notify(
|
||
i3,
|
||
app_icon="media-removable",
|
||
summary=f"New {action} USB device",
|
||
replaces_id=usbguard_notifications.history.get(id, 0),
|
||
body="\n".join(
|
||
[
|
||
f"USB device {human} {action}",
|
||
f"ID: <tt>{id}</tt>, Port: <tt>{attributes['via-port']}</tt>",
|
||
]
|
||
),
|
||
**extra_args,
|
||
)
|
||
if nid:
|
||
usbguard_notifications.history[id] = nid[0]
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
)
|
||
)
|
||
async def network_manager_notifications(i3, event, path, state, reason):
|
||
"""Display notifications related to Network Manager state."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
logger.debug("from %s state: %s, reason: %s", path, state, reason)
|
||
if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}:
|
||
# Deactivated state does not contain enough information,
|
||
# unless we maintain state.
|
||
return
|
||
obj = i3.system_bus[ofnm][path]
|
||
try:
|
||
nmca = await obj.get_async_interface(f"{ofnm}.Connection.Active")
|
||
except dbussy.DBusError:
|
||
logger.info(f"interface {path} has vanished")
|
||
return
|
||
kind = await nmca.Type
|
||
id = await nmca.Id
|
||
if kind == "vpn" or kind == "wireguard":
|
||
await notify(i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!")
|
||
elif kind == "802-3-ethernet":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wired",
|
||
summary=f"{id}",
|
||
body="Ethernet connection established!",
|
||
)
|
||
elif kind == "802-11-wireless":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wireless",
|
||
summary=f"{id}",
|
||
body="Wireless connection established!",
|
||
)
|
||
|
||
|
||
@on(
|
||
StartEvent,
|
||
DBusSignal(
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
),
|
||
DBusSignal(
|
||
path="/org/freedesktop/NetworkManager/AccessPoint",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: (args[0] == "org.freedesktop.NetworkManager.AccessPoint"),
|
||
),
|
||
)
|
||
@retry(2)
|
||
@debounce(
|
||
1,
|
||
unless=lambda i3, event, *args: (
|
||
isinstance(event, DBusSignal) and event.interface.endswith(".Active")
|
||
),
|
||
)
|
||
@polybar("network")
|
||
async def network_manager_status(i3, event, *args):
|
||
"""Compute network manager status."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
status = []
|
||
|
||
# Build status from devices
|
||
conn = i3.system_bus[ofnm]
|
||
nm = await conn["/org/freedesktop/NetworkManager"].get_async_interface(ofnm)
|
||
devices = await nm.AllDevices
|
||
for device in devices:
|
||
nmd = await conn[device].get_async_interface(f"{ofnm}.Device")
|
||
kind = await nmd.DeviceType
|
||
state = await nmd.State
|
||
if state == NM_DEVICE_STATE_UNMANAGED:
|
||
continue
|
||
if kind == NM_DEVICE_TYPE_WIFI:
|
||
|
||
def wrap(s):
|
||
rofi = os.path.expanduser("~/.config/i3/bin/rofi-wifi")
|
||
return "%%{A1:%s:}%s%%{A}" % (rofi, s)
|
||
|
||
if state != NM_DEVICE_STATE_ACTIVATED:
|
||
status.append(wrap(icons["nowifi"]))
|
||
continue
|
||
nmw = await conn[device].get_async_interface(f"{ofnm}.Device.Wireless")
|
||
ap = await nmw.ActiveAccessPoint
|
||
if not ap:
|
||
status.append(wrap(icons["nowifi"]))
|
||
continue
|
||
network_manager_status.active_ap = ap
|
||
nmap = await conn[ap].get_async_interface(f"{ofnm}.AccessPoint")
|
||
name = await nmap.Ssid
|
||
strength = int(await nmap.Strength)
|
||
status.append(
|
||
wrap(
|
||
[
|
||
icons["wifi-low"],
|
||
icons["wifi-medium"],
|
||
icons["wifi-high"],
|
||
][strength // 34]
|
||
+ " "
|
||
+ bytes(name).decode("utf-8", errors="replace").replace("%", "%%")
|
||
)
|
||
)
|
||
elif kind == NM_DEVICE_TYPE_ETHERNET and state == NM_DEVICE_STATE_ACTIVATED:
|
||
status.append(icons["wired"])
|
||
|
||
# Build status for VPN connection
|
||
connections = await nm.ActiveConnections
|
||
for connection in connections:
|
||
nma = await conn[connection].get_async_interface(f"{ofnm}.Connection.Active")
|
||
vpn = await nma.Vpn
|
||
kind = await nma.Type
|
||
if vpn or kind == "wireguard":
|
||
state = await nma.State
|
||
if state == NM_ACTIVE_CONNECTION_STATE_ACTIVATED:
|
||
status.append(icons["vpn"])
|
||
status.append((await nma.Id).replace("%", "%%"))
|
||
|
||
# Final status line
|
||
return " ".join(status)
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
interface="org.freedesktop.login1.Manager",
|
||
member="PrepareForSleep",
|
||
signature="b",
|
||
),
|
||
)
|
||
async def on_resume(i3, event, path, sleeping):
|
||
"""Switch monitor on when resuming from sleep."""
|
||
if not sleeping:
|
||
logger.info("resume from sleep")
|
||
dpms = i3.x11(xcffib.dpms.key)
|
||
if dpms.Info().reply().power_level != 0:
|
||
dpms.ForceLevel(0)
|
||
i3.x11.flush()
|
||
|
||
|
||
# Main function
|
||
|
||
|
||
async def main(options):
|
||
i3 = await Connection(auto_reconnect=True).connect()
|
||
i3.session_bus = await ravel.session_bus_async()
|
||
i3.system_bus = await ravel.system_bus_async()
|
||
i3.x11 = xcffib.connect()
|
||
|
||
# Regular events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, I3Event):
|
||
|
||
def wrapping(fn, event):
|
||
async def wrapped(i3, event):
|
||
logger.debug("received i3 event %s for %s", event, fn)
|
||
return await fn(i3, event)
|
||
|
||
return wrapped
|
||
|
||
i3.on(event, wrapping(fn, event))
|
||
|
||
# React to some bindings
|
||
async def binding_event(i3, event):
|
||
"""Process a binding event."""
|
||
# We only processes it when it is a nop command and we use
|
||
# this mechanism as an IPC mechanism. The alternative would be
|
||
# to use ticks but we would need to spawn an i3-msg process
|
||
# for that.
|
||
for cmd in event.binding.command.split(";"):
|
||
cmd = cmd.strip()
|
||
if not cmd.startswith("nop "):
|
||
continue
|
||
cmd = cmd[4:].strip(" \"'")
|
||
if not cmd:
|
||
continue
|
||
kind = cmd.split(":")[0]
|
||
for fn, events in on.functions.items():
|
||
for e in events:
|
||
if isinstance(e, CommandEvent) and e.name == kind:
|
||
logger.debug("received command event %s for %s", event, fn)
|
||
await fn(i3, cmd)
|
||
|
||
i3.on(I3Event.BINDING, binding_event)
|
||
|
||
# React to ticks
|
||
async def tick_event(i3, event):
|
||
"""Process tick events."""
|
||
kind = event.payload.split(":")[0]
|
||
for fn, events in on.functions.items():
|
||
for e in events:
|
||
if isinstance(e, CommandEvent) and e.name == kind:
|
||
logger.debug("received command event %s for %s", event, fn)
|
||
await fn(i3, event.payload)
|
||
|
||
i3.on(I3Event.TICK, tick_event)
|
||
|
||
# Listen to DBus events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, DBusSignal):
|
||
bus = i3.system_bus if event.system else i3.session_bus
|
||
|
||
def wrapping(fn, event):
|
||
@ravel.signal(
|
||
name=event.member,
|
||
in_signature=event.signature,
|
||
path_keyword="path",
|
||
args_keyword="args",
|
||
)
|
||
async def wrapped(path, args):
|
||
if event.onlyif is not None and not event.onlyif(args):
|
||
logger.debug(
|
||
"received DBus event for %s, not interested", fn
|
||
)
|
||
return
|
||
logger.debug("received DBus event %s for %s", event, fn)
|
||
return await fn(i3, event, path, *args)
|
||
|
||
return wrapped
|
||
|
||
bus.listen_signal(
|
||
path=event.path,
|
||
fallback=True,
|
||
interface=event.interface,
|
||
name=event.member,
|
||
func=wrapping(fn, event),
|
||
)
|
||
|
||
# Run events that should run on start
|
||
start_tasks = []
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if event is StartEvent:
|
||
start_tasks.append(asyncio.create_task(fn(i3, event)))
|
||
|
||
daemon.notify("READY=1")
|
||
await i3.main()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Parse
|
||
description = sys.modules[__name__].__doc__ or ""
|
||
for fn, events in on.functions.items():
|
||
description += f" {fn.__doc__}"
|
||
parser = argparse.ArgumentParser(description=description)
|
||
parser.add_argument(
|
||
"--debug",
|
||
"-d",
|
||
action="store_true",
|
||
default=False,
|
||
help="enable debugging",
|
||
)
|
||
options = parser.parse_args()
|
||
|
||
# Logging
|
||
root = logging.getLogger("")
|
||
root.setLevel(logging.WARNING)
|
||
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
|
||
if sys.stderr.isatty():
|
||
ch = logging.StreamHandler()
|
||
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
|
||
root.addHandler(ch)
|
||
else:
|
||
root.addHandler(journal.JournalHandler(SYSLOG_IDENTIFIER=logger.name))
|
||
|
||
try:
|
||
asyncio.run(main(options))
|
||
except Exception as e:
|
||
logger.exception("%s", e)
|
||
sys.exit(1)
|
||
sys.exit(0)
|