mirror of
https://github.com/vincentbernat/i3wm-configuration.git
synced 2025-07-15 12:44:21 +02:00
Zoom is trying very hard to break all existing X11 conventions. Let's try to make it work anyway.
1133 lines
37 KiB
Python
Executable file
1133 lines
37 KiB
Python
Executable file
#!/usr/bin/python3
|
||
|
||
"""Personal i3 companion."""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import collections
|
||
import contextlib
|
||
import errno
|
||
import functools
|
||
import glob
|
||
import html
|
||
import logging
|
||
import logging.handlers
|
||
import os
|
||
import shlex
|
||
import socket
|
||
import struct
|
||
import subprocess
|
||
import sys
|
||
import types
|
||
|
||
import dbussy
|
||
import i3ipc
|
||
import ravel
|
||
import xcffib
|
||
import xcffib.randr
|
||
import xcffib.xproto
|
||
from i3ipc.aio import Connection
|
||
from systemd import daemon, journal
|
||
|
||
|
||
def icon(font_number, char):
|
||
"""Turn an icon into a string for Polybar."""
|
||
# Font number is from Polybar configuration.
|
||
# 2: https://fontawesome.com/v6.0/icons?s=solid
|
||
# 3: https://fontawesome.com/v6.0/icons?s=brands
|
||
return "%%{T%d}%s%%{T-}" % (font_number, char)
|
||
|
||
|
||
# Configuration
|
||
|
||
application_icons = {
|
||
"calibre": icon(2, "📚"),
|
||
"chromium": icon(3, ""),
|
||
"d-feet": icon(2, ""),
|
||
"darktable": icon(2, ""),
|
||
"discord": icon(3, ""),
|
||
"draw.io": icon(2, ""),
|
||
"easyeffects": icon(2, ""),
|
||
"emacs": icon(2, ""),
|
||
"file-roller": icon(2, ""),
|
||
"firefox": icon(3, ""),
|
||
"gaupol": icon(2, ""),
|
||
"gimp": icon(2, ""),
|
||
"gitg": icon(2, ""),
|
||
"gnome-boxes": icon(2, ""),
|
||
"google-chrome": icon(3, ""),
|
||
"grisbi": icon(2, ""),
|
||
"inkscape": icon(2, ""),
|
||
"jitsi meet": icon(2, ""),
|
||
"libreoffice": icon(2, "📄"),
|
||
"mednafen": icon(2, ""),
|
||
"mpv": icon(2, ""),
|
||
"nestopia": icon(2, ""),
|
||
"org.gnome.maps": icon(2, ""),
|
||
"pavucontrol": icon(2, ""),
|
||
"qalculate": icon(2, ""),
|
||
"qemu": icon(2, ""),
|
||
"retroarch": icon(2, ""),
|
||
"scummvm": icon(2, ""),
|
||
"signal": icon(2, ""),
|
||
"snes9x": icon(2, ""),
|
||
"spot": icon(3, ""),
|
||
"spotify": icon(3, ""),
|
||
"steam": icon(3, ""),
|
||
"sxiv": icon(2, ""),
|
||
"system-config-printer.py": icon(2, "⎙"),
|
||
"thunar": icon(2, ""),
|
||
"thunderbird": icon(2, ""),
|
||
"vbeterm": icon(2, ""),
|
||
"virt-manager": icon(2, ""),
|
||
"webex": icon(2, ""),
|
||
"wireshark": icon(2, ""),
|
||
"zathura": icon(2, ""),
|
||
"zoom": icon(2, ""),
|
||
"zoom.real": icon(2, ""),
|
||
}
|
||
icons = {
|
||
"access-point": icon(2, ""),
|
||
"battery-100": icon(2, ""),
|
||
"battery-75": icon(2, ""),
|
||
"battery-50": icon(2, ""),
|
||
"battery-25": icon(2, ""),
|
||
"battery-0": icon(2, ""),
|
||
"bluetooth": icon(2, ""),
|
||
"camera": icon(2, "⎙"),
|
||
"car": icon(2, "🚘"),
|
||
"gamepad": icon(2, "🎮"),
|
||
"headphones": icon(2, "🎧"),
|
||
"headset": icon(2, ""),
|
||
"keyboard": icon(2, "⌨"),
|
||
"laptop": icon(2, "💻"),
|
||
"loudspeaker": icon(2, ""),
|
||
"microphone": icon(2, ""),
|
||
"mouse": icon(2, ""),
|
||
"notifications-disabled": icon(2, "🔕"),
|
||
"notifications-enabled": icon(2, ""),
|
||
"nowifi": icon(2, ""),
|
||
"phone": icon(2, "📞"),
|
||
"printer": icon(2, "⎙"),
|
||
"scanner": icon(2, ""),
|
||
"unknown": icon(2, ""),
|
||
"vpn": icon(2, ""),
|
||
"webcam": icon(2, ""),
|
||
"wifi-high": icon(2, ""),
|
||
"wifi-low": icon(2, ""),
|
||
"wifi-medium": icon(2, ""),
|
||
"wired": icon(2, ""),
|
||
}
|
||
application_icons_nomatch = icon(2, "")
|
||
application_icons_ignore = {"dimmer"}
|
||
application_icons_alone = {application_icons[k] for k in {"vbeterm"}}
|
||
exclusive_apps = {"emacs", "firefox", "chromium-browser", "google-chrome"}
|
||
intrusive_apps = {"vbeterm"}
|
||
|
||
logger = logging.getLogger("i3-companion")
|
||
|
||
# Events for @on decorator
|
||
DBusSignal = collections.namedtuple(
|
||
"DBusSignal",
|
||
["interface", "member", "signature", "system", "path", "onlyif"],
|
||
defaults=(True, "/", None),
|
||
)
|
||
StartEvent = object()
|
||
I3Event = i3ipc.Event
|
||
CommandEvent = collections.namedtuple("CommandEvent", ["name"])
|
||
|
||
NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2
|
||
NM_DEVICE_TYPE_ETHERNET = 1
|
||
NM_DEVICE_TYPE_WIFI = 2
|
||
NM_DEVICE_TYPE_MODEM = 8
|
||
NM_DEVICE_STATE_UNMANAGED = 10
|
||
NM_DEVICE_STATE_ACTIVATED = 100
|
||
|
||
|
||
# Event helpers
|
||
|
||
|
||
def static(**kwargs):
|
||
"""Define static variables for the event handler."""
|
||
|
||
def decorator(fn):
|
||
fn.__dict__.update(kwargs)
|
||
return fn
|
||
|
||
return decorator
|
||
|
||
|
||
@static(functions={})
|
||
def on(*events):
|
||
"""Tag events that should be provided to the function."""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(*args, **kwargs):
|
||
return fn(*args, **kwargs)
|
||
|
||
on.functions[fn] = events
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def retry(max_retries):
|
||
"""Retry an async function."""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
retries = max_retries
|
||
while True:
|
||
try:
|
||
logger.debug("execute %s (remaining tries: %s)", fn, retries)
|
||
return await fn(*args, **kwargs)
|
||
except Exception as e:
|
||
if retries > 0:
|
||
retries -= 1
|
||
logger.warning(
|
||
f"while executing {fn} (remaining tries: %d): %s",
|
||
retries,
|
||
e,
|
||
)
|
||
else:
|
||
logger.exception(f"while executing {fn}: %s", e)
|
||
return
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def debounce(sleep, *, unless=None):
|
||
"""Debounce a function call (batch successive calls into only one).
|
||
Optional immediate execution. Ensure only one instance is
|
||
executed. It is assumed the arguments provided to the debounced
|
||
function have no effect on its execution."""
|
||
|
||
def decorator(fn):
|
||
async def worker():
|
||
while True:
|
||
with contextlib.suppress(asyncio.TimeoutError):
|
||
# Wait for an urgent work or until sleep is elapsed
|
||
await asyncio.wait_for(workers[fn].urgent.wait(), timeout=sleep)
|
||
logger.debug("urgent work received for %s", fn)
|
||
args, kwargs = workers[fn].queue
|
||
workers[fn].queue = None
|
||
workers[fn].urgent.clear()
|
||
|
||
# Execute the work
|
||
logger.debug("execute work for %s", fn)
|
||
try:
|
||
await fn(*args, **kwargs)
|
||
except Exception as e:
|
||
logger.debug("while running %s, worker got %s", fn, e)
|
||
workers[fn] = None
|
||
raise
|
||
|
||
# Do we still have something to do?
|
||
if workers[fn].queue is None:
|
||
break
|
||
|
||
# No more work
|
||
logger.debug("no more work for %s", fn)
|
||
workers[fn] = None
|
||
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
if workers[fn] is None:
|
||
logger.debug("create new worker for %s", fn)
|
||
workers[fn] = types.SimpleNamespace()
|
||
workers[fn].task = asyncio.create_task(worker())
|
||
workers[fn].urgent = asyncio.Event()
|
||
workers[fn].queue = (args, kwargs)
|
||
else:
|
||
logger.debug("enqueue new work for %s", fn)
|
||
if unless is not None and unless(*args, **kwargs):
|
||
logger.debug("wake up now for %s", fn)
|
||
workers[fn].urgent.set()
|
||
return await workers[fn].task
|
||
|
||
workers[fn] = None
|
||
return wrapper
|
||
|
||
workers = {}
|
||
return decorator
|
||
|
||
|
||
def polybar(module):
|
||
"""Use returned string to update polybar module"""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
content = await fn(*args, **kwargs)
|
||
if type(content) is not str or cache.get(module) == content:
|
||
return content
|
||
|
||
# Update cache file (for when polybar restarts)
|
||
with open(f"{os.getenv('XDG_RUNTIME_DIR')}/i3/{module}.txt", "w") as out:
|
||
out.write(content)
|
||
|
||
# Send it to polybar
|
||
cmd = bytes(f"#{module}.send.{content}", "utf-8")
|
||
data = (
|
||
b"polyipc" # magic
|
||
+ struct.pack("=BIB", 0, len(cmd), 2) # header: version, length, type
|
||
+ cmd
|
||
)
|
||
for name in glob.glob(f"{os.getenv('XDG_RUNTIME_DIR')}/polybar/*.sock"):
|
||
try:
|
||
reader, writer = await asyncio.open_unix_connection(name)
|
||
except OSError as e:
|
||
if e.errno not in (errno.ENXIO, errno.ECONNREFUSED):
|
||
raise
|
||
else:
|
||
try:
|
||
writer.write(data)
|
||
await writer.drain()
|
||
await reader.read()
|
||
finally:
|
||
writer.close()
|
||
await writer.wait_closed()
|
||
|
||
logger.info(f"polybar/{module}: content updated")
|
||
cache[module] = content
|
||
return content
|
||
|
||
return wrapper
|
||
|
||
cache = {}
|
||
return decorator
|
||
|
||
|
||
# Other helpers
|
||
|
||
|
||
async def notify(i3, **kwargs):
|
||
"""Send a notification with notify-send."""
|
||
conn = i3.session_bus["org.freedesktop.Notifications"]
|
||
obj = conn["/org/freedesktop/Notifications"]
|
||
notifications = await obj.get_async_interface("org.freedesktop.Notifications")
|
||
parameters = dict(
|
||
app_name=logger.name,
|
||
replaces_id=0,
|
||
app_icon="dialog-information",
|
||
summary="",
|
||
actions=[],
|
||
hints={},
|
||
expire_timeout=5000,
|
||
)
|
||
parameters.update(kwargs)
|
||
return await notifications.Notify(**parameters)
|
||
|
||
|
||
async def create_new_workspace(i3):
|
||
"""Create a new workspace and returns its number."""
|
||
workspaces = await i3.get_workspaces()
|
||
workspace_nums = {w.num for w in workspaces}
|
||
max_num = max(workspace_nums)
|
||
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
|
||
logger.info(f"create new workspace number {available}")
|
||
await i3.command(f'workspace number "{available}"')
|
||
return available
|
||
|
||
|
||
# Event handlers
|
||
|
||
|
||
@on(StartEvent, I3Event.WINDOW_MOVE, I3Event.WINDOW_NEW, I3Event.WINDOW_CLOSE)
|
||
@debounce(0.2)
|
||
async def workspace_rename(i3, event):
|
||
"""Rename workspaces using icons to match what's inside it."""
|
||
tree = await i3.get_tree()
|
||
workspaces = tree.workspaces()
|
||
commands = []
|
||
|
||
for workspace in workspaces:
|
||
icons = set()
|
||
for window in workspace.leaves():
|
||
if window.sticky:
|
||
continue
|
||
cls = (window.window_class or "").lower()
|
||
if cls in application_icons_ignore:
|
||
continue
|
||
icon = application_icons.get(cls, application_icons.get(cls.split("-")[0]))
|
||
if icon is None:
|
||
# Zoom (😱)
|
||
if (window.window_title or "").startswith("Zoom") or (
|
||
window.window_instance or ""
|
||
).startswith(("start?action=start", "join?action=join")):
|
||
icon = application_icons["zoom"]
|
||
icons.add(icon or application_icons_nomatch)
|
||
if any([i not in application_icons_alone for i in icons]):
|
||
icons -= application_icons_alone
|
||
new_name = f"{workspace.num}:{'|'.join(sorted(list(icons)))}".rstrip(":")
|
||
if workspace.name != new_name:
|
||
logger.debug("rename workspace %s", workspace.num)
|
||
command = f'rename workspace "{workspace.name}" to "{new_name}"'
|
||
commands.append(command)
|
||
await i3.command(";".join(commands))
|
||
|
||
|
||
@on(CommandEvent("previous-workspace"), I3Event.WORKSPACE_FOCUS)
|
||
@static(history=collections.defaultdict(list))
|
||
async def previous_workspace(i3, event):
|
||
"""Go to previous workspace on the same output."""
|
||
history = previous_workspace.history
|
||
if isinstance(event, i3ipc.WorkspaceEvent) and event.old:
|
||
data = event.old.ipc_data
|
||
output, num = data["output"], data["num"]
|
||
if history[output][-1:] != [num]:
|
||
history[output].append(num)
|
||
history[output] = history[output][-5:]
|
||
logger.debug("on %s, history is %s", output, history[output])
|
||
elif event == "previous-workspace":
|
||
workspaces = await i3.get_workspaces()
|
||
try:
|
||
focused = [w for w in workspaces if w.focused][0]
|
||
except IndexError:
|
||
return
|
||
output = focused.output
|
||
while True:
|
||
try:
|
||
previous = history[output].pop()
|
||
if [
|
||
w
|
||
for w in workspaces
|
||
if w.num != focused.num and w.num == previous and w.output == output
|
||
]:
|
||
break
|
||
except IndexError:
|
||
logger.debug("no previous workspace on %s", output)
|
||
return
|
||
logger.debug("switching to workspace %d on %s", previous, output)
|
||
await i3.command(f"workspace number {previous}")
|
||
|
||
|
||
@on(CommandEvent("new-workspace"), CommandEvent("move-to-new-workspace"))
|
||
async def new_workspace(i3, event):
|
||
"""Create a new workspace and optionally move a window to it."""
|
||
# Get the currently focused window
|
||
if event == "move-to-new-workspace":
|
||
tree = await i3.get_tree()
|
||
current = tree.find_focused()
|
||
if not current:
|
||
return
|
||
|
||
num = await create_new_workspace(i3)
|
||
|
||
# Move the window to this workspace
|
||
if event == "move-to-new-workspace":
|
||
await current.command(f"move container to workspace " f'number "{num}"')
|
||
|
||
|
||
@on(I3Event.WORKSPACE_INIT)
|
||
@static(lock=asyncio.Lock())
|
||
async def workspace_rename_duplicate(i3, event):
|
||
"""Rename a workspace when initialized empty with a duplicate number."""
|
||
# This will not be needed once https://github.com/i3/i3/pull/4252 is released.
|
||
async with workspace_rename_duplicate.lock:
|
||
workspace = event.current
|
||
workspaces = await i3.get_workspaces()
|
||
workspace_nums = {w.num for w in workspaces if workspace.id != w.ipc_data["id"]}
|
||
if workspace.num in workspace_nums:
|
||
max_num = max(workspace_nums)
|
||
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
|
||
await i3.command(f"rename workspace {workspace.num} to {available}")
|
||
|
||
|
||
@on(I3Event.WINDOW_NEW, CommandEvent("inhibit-exclusive"))
|
||
@static(inhibited_by=False)
|
||
async def workspace_exclusive(i3, event):
|
||
"""Move new windows on a new workspace instead of sharing a workspace
|
||
with an exclusive app."""
|
||
if event == "inhibit-exclusive":
|
||
logger.debug("inhibit exclusive workspace")
|
||
workspace_exclusive.inhibited_by = me = object()
|
||
await asyncio.sleep(1)
|
||
if workspace_exclusive.inhibited_by is me:
|
||
logger.info("cancel exclusive workspace inhibition")
|
||
workspace_exclusive.inhibited_by = None
|
||
return
|
||
if workspace_exclusive.inhibited_by:
|
||
workspace_exclusive.inhibited_by = None
|
||
return
|
||
w = event.container
|
||
|
||
# Can the current window intrude the workspace?
|
||
if (
|
||
w.floating in {"auto_on", "user_on"}
|
||
or (w.window_class or "").lower() in intrusive_apps
|
||
):
|
||
logger.debug("window %s can intrude", w.window_class)
|
||
return
|
||
|
||
tree = await i3.get_tree()
|
||
|
||
# Get the window workspace. From an event, w.workspace() is None,
|
||
# so search it in the tree.
|
||
current_window = tree.find_by_id(w.id)
|
||
if current_window is None:
|
||
logger.debug("cannot find new window in tree?")
|
||
return
|
||
current_workspace = current_window.workspace()
|
||
if current_workspace is None:
|
||
logger.debug("cannot find new window workspace?")
|
||
return
|
||
|
||
# Get the list of workspaces with an exclusive app, excluding the
|
||
# current window and windows of the same class.
|
||
exclusive_workspaces = {
|
||
ow.workspace().num
|
||
for ow in tree.leaves()
|
||
if w.id != ow.id
|
||
and (w.window_class or object()) != ow.window_class
|
||
and (ow.window_class or "").lower() in exclusive_apps
|
||
and not ow.sticky
|
||
}
|
||
|
||
# If current one is OK, don't move
|
||
if current_workspace.num not in exclusive_workspaces:
|
||
logger.debug("no exclusive app, %s can go there", w.window_class)
|
||
return
|
||
|
||
# Are there other workspaces with the same app but no exclusive apps?
|
||
candidate_workspaces = {
|
||
ow.workspace().num
|
||
for ow in tree.leaves()
|
||
if w.id != ow.id and (w.window_class or object()) == ow.window_class
|
||
}
|
||
candidate_workspaces -= exclusive_workspaces
|
||
candidate_workspaces -= {-1} # scratchpad
|
||
|
||
if candidate_workspaces:
|
||
# Use one of the candidates
|
||
num = next(iter(candidate_workspaces))
|
||
else:
|
||
# Create a new workspace
|
||
num = await create_new_workspace(i3)
|
||
|
||
logger.info(f"move window {w.window_class} to workspace {num}")
|
||
await w.command(f'move container to workspace number "{num}", focus')
|
||
|
||
|
||
@on(CommandEvent("quake-console"))
|
||
async def quake_console(i3, event):
|
||
"""Spawn a quake console or toggle an existing one."""
|
||
try:
|
||
_, term_exec, term_name, height = event.split(":")
|
||
height = float(height)
|
||
except Exception as exc:
|
||
logger.warn(f"unable to parse payload {event}: {exc}")
|
||
return
|
||
|
||
# Look for the terminal or spawn it
|
||
tree = await i3.get_tree()
|
||
try:
|
||
term = tree.find_instanced(term_name)[0]
|
||
except IndexError:
|
||
quake_window = asyncio.get_event_loop().create_future()
|
||
|
||
def wait_for_quake(i3, event):
|
||
w = event.container
|
||
if quake_window.done() or w.window_instance != term_name:
|
||
return
|
||
quake_window.set_result(w)
|
||
|
||
i3.on(I3Event.WINDOW_NEW, wait_for_quake)
|
||
try:
|
||
await i3.command(f"exec exec {term_exec} --name {term_name}")
|
||
done, pending = await asyncio.wait((quake_window,), timeout=1)
|
||
finally:
|
||
i3.off(wait_for_quake)
|
||
if not done:
|
||
raise RuntimeError("unable to spawn terminal")
|
||
term = quake_window.result()
|
||
await term.command("move window to scratchpad")
|
||
|
||
workspaces = await i3.get_workspaces()
|
||
workspace = [ws for ws in workspaces if ws.focused][0]
|
||
ws_x, ws_y = workspace.rect.x, workspace.rect.y
|
||
ws_width, ws_height = workspace.rect.width, workspace.rect.height
|
||
height = int(ws_height * height)
|
||
command = (
|
||
f"[instance={term_name}] "
|
||
"border none,"
|
||
f"resize set {ws_width} px {height} px,"
|
||
"scratchpad show,"
|
||
f"move absolute position {ws_x}px {ws_y}px"
|
||
)
|
||
logger.debug("QuakeConsole: %s", command)
|
||
await i3.command(command)
|
||
|
||
|
||
@on(CommandEvent("container-info"))
|
||
@static(last_id=0)
|
||
async def container_info(i3, event):
|
||
"""Show information about the focused container."""
|
||
tree = await i3.get_tree()
|
||
window = tree.find_focused()
|
||
if not window:
|
||
return
|
||
logger.info(f"window raw information: {window.ipc_data}")
|
||
summary = "About focused container"
|
||
r = window.rect
|
||
w = window
|
||
info = {
|
||
"name": w.name,
|
||
"title": w.window_title,
|
||
"class": w.window_class,
|
||
"instance": w.window_instance,
|
||
"role": w.window_role,
|
||
"type": w.ipc_data["window_type"],
|
||
"sticky": w.sticky,
|
||
"floating": w.floating,
|
||
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
|
||
"layout": w.layout,
|
||
"parcent": w.percent,
|
||
"marks": ", ".join(w.marks) or "(none)",
|
||
}
|
||
body = "\n".join(
|
||
(
|
||
f"<tt>{k:10}</tt> {html.escape(str(v))}"
|
||
for k, v in info.items()
|
||
if v is not None
|
||
)
|
||
)
|
||
result = await notify(
|
||
i3,
|
||
app_icon="info",
|
||
expire_timeout=10000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=container_info.last_id,
|
||
)
|
||
container_info.last_id = result[0]
|
||
|
||
|
||
@on(CommandEvent("workspace-info"))
|
||
@static(last_id=0)
|
||
async def workspace_info(i3, event):
|
||
"""Show information about the focused workspace."""
|
||
workspaces = await i3.get_workspaces()
|
||
focused = [w for w in workspaces if w.focused]
|
||
if not focused:
|
||
return
|
||
workspace = focused[0]
|
||
summary = f"Workspace {workspace.num} on {workspace.output}"
|
||
tree = await i3.get_tree()
|
||
workspace = [w for w in tree.workspaces() if w.num == workspace.num]
|
||
|
||
def format(container):
|
||
if container.focused:
|
||
style = 'foreground="#ffaf00"'
|
||
elif not container.window:
|
||
style = 'foreground="#6c98ee"'
|
||
else:
|
||
style = ""
|
||
if container.window:
|
||
content = (
|
||
f"{(container.window_class or '???').lower()}: "
|
||
f"{(container.window_title or '???')}"
|
||
)
|
||
elif container.type == "workspace" and not container.nodes:
|
||
# Empty workspaces use workspace_layout, but when default,
|
||
# this is layout...
|
||
layout = container.ipc_data["workspace_layout"]
|
||
if layout == "default":
|
||
layout = container.layout
|
||
content = f"({layout})"
|
||
else:
|
||
content = f"({container.layout})"
|
||
root = f"<span {style}>{content.lower()}</span>"
|
||
children = []
|
||
for child in container.nodes:
|
||
if child == container.nodes[-1]:
|
||
first = "└─"
|
||
others = " "
|
||
else:
|
||
first = "├─"
|
||
others = "│ "
|
||
content = format(child).replace("\n", f"\n<tt>{others}</tt>")
|
||
children.append(f"<tt>{first}</tt>{content}")
|
||
children.insert(0, root)
|
||
return "\n".join(children)
|
||
|
||
body = format(workspace[0]).lstrip("\n")
|
||
result = await notify(
|
||
i3,
|
||
app_icon="system-search",
|
||
expire_timeout=20000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=workspace_info.last_id,
|
||
)
|
||
workspace_info.last_id = result[0]
|
||
|
||
|
||
@on(I3Event.OUTPUT, StartEvent)
|
||
@static(last_setup=None)
|
||
@debounce(2)
|
||
async def output_update(i3, event):
|
||
"""React to a XRandR change."""
|
||
|
||
# Grab current setup. Synchronous, but it's short enough
|
||
randr = i3.x11(xcffib.randr.key)
|
||
screen = i3.x11.get_setup().roots[0]
|
||
monitors = randr.GetMonitors(screen.root, 1).reply().monitors
|
||
current_setup = {
|
||
(
|
||
i3.x11.core.GetAtomName(m.name).reply().name.to_string(),
|
||
m.width,
|
||
m.height,
|
||
m.x,
|
||
m.y,
|
||
)
|
||
for m in monitors
|
||
}
|
||
|
||
# Compare to current setup
|
||
if current_setup == output_update.last_setup:
|
||
logger.debug("current xrandr setup unchanged")
|
||
return
|
||
output_update.last_setup = current_setup
|
||
logger.info("xrandr setup: %s", current_setup)
|
||
if event is StartEvent:
|
||
return
|
||
|
||
# Trigger changes
|
||
logger.info("xrandr change detected")
|
||
cmds = (
|
||
"systemctl --user reload --no-block xsettingsd.service",
|
||
"systemctl --user start --no-block wallpaper.service",
|
||
)
|
||
for cmd in cmds:
|
||
proc = subprocess.run(shlex.split(cmd))
|
||
if proc.returncode != 0:
|
||
logger.warning(f"{cmd} exited with {proc.returncode}")
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
path="/org/bluez",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: args[0] == "org.bluez.Device1" and "Connected" in args[1],
|
||
)
|
||
)
|
||
async def bluetooth_notifications(i3, event, path, interface, changed, invalid):
|
||
"""Display notifications related to Bluetooth state."""
|
||
obj = i3.system_bus["org.bluez"][path]
|
||
obd = await obj.get_async_interface(interface)
|
||
name = await obd.Name
|
||
icon = await obd.Icon
|
||
state = await obd.Connected
|
||
state = "connected" if state else "disconnected"
|
||
await notify(
|
||
i3,
|
||
app_icon=icon,
|
||
summary=name,
|
||
body=f"Bluetooth device {state}",
|
||
)
|
||
|
||
|
||
@on(
|
||
StartEvent,
|
||
DBusSignal(
|
||
path="/org/bluez",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: (
|
||
args[0] == "org.bluez.Device1"
|
||
and "Connected" in args[1]
|
||
or args[0] == "org.bluez.Adapter1"
|
||
and "Powered" in args[1]
|
||
or args[0] == "org.bluez.Battery1"
|
||
and "Percentage" in args[1]
|
||
),
|
||
),
|
||
)
|
||
@retry(2)
|
||
@debounce(4)
|
||
@polybar("bluetooth")
|
||
async def bluetooth_status(i3, event, *args):
|
||
"""Update bluetooth status for Polybar."""
|
||
if event is StartEvent:
|
||
# Do we have a bluetooth device?
|
||
if not os.path.exists("/sys/class/bluetooth"):
|
||
logger.info("no bluetooth detected")
|
||
return ""
|
||
|
||
# OK, get the info
|
||
conn = i3.system_bus["org.bluez"]
|
||
om = await conn["/"].get_async_interface("org.freedesktop.DBus.ObjectManager")
|
||
objects = await om.GetManagedObjects()
|
||
objects = objects[0]
|
||
powered = False
|
||
devices = []
|
||
for path, interfaces in objects.items():
|
||
if "org.bluez.Adapter1" in interfaces:
|
||
# We get an adapter!
|
||
adapter = interfaces["org.bluez.Adapter1"]
|
||
if adapter["Powered"][1]:
|
||
powered = True
|
||
elif "org.bluez.Device1" in interfaces:
|
||
# We have a device!
|
||
device = types.SimpleNamespace(major=0, minor=0, battery=None)
|
||
interface = interfaces["org.bluez.Device1"]
|
||
if not interface["Connected"][1]:
|
||
continue
|
||
try:
|
||
device_class = interface["Class"][1]
|
||
device.major = (device_class & 0x1F00) >> 8
|
||
device.minor = (device_class & 0xFC) >> 2
|
||
device.battery = interfaces["org.bluez.Battery1"]["Percentage"][1]
|
||
except KeyError:
|
||
pass
|
||
devices.append(device)
|
||
|
||
# Choose appropriate icons for output
|
||
# See: https://btprodspecificationrefs.blob.core.windows.net/assigned-numbers
|
||
# /Assigned%20Number%20Types/Baseband.pdf
|
||
if not powered:
|
||
output = ""
|
||
else:
|
||
output = ["bluetooth"]
|
||
for device in devices:
|
||
classes = {
|
||
1: "laptop",
|
||
2: "phone",
|
||
3: "access-point",
|
||
(4, 1): "headset",
|
||
(4, 2): "headset",
|
||
(4, 4): "microphone",
|
||
(4, 5): "loudspeaker",
|
||
(4, 7): "loudspeaker",
|
||
(4, 10): "loudspeaker",
|
||
(4, 6): "headphones",
|
||
(4, 8): "car",
|
||
(4, 12): "webcam",
|
||
(5, 1): "gamepad",
|
||
(5, 2): "gamepad",
|
||
5: [
|
||
(lambda x: x & 0x10, "keyboard"),
|
||
(lambda x: x & 0x20, "mouse"),
|
||
],
|
||
6: [
|
||
(lambda x: x & 0x8, "camera"),
|
||
(lambda x: x & 0x10, "scanner"),
|
||
(lambda x: x & 0x20, "printer"),
|
||
],
|
||
}
|
||
icon = classes.get((device.major, device.minor)) or classes.get(
|
||
device.major, "unknown"
|
||
)
|
||
if type(icon) is list:
|
||
for matcher, name in icon:
|
||
if matcher(device.minor):
|
||
icon = name
|
||
break
|
||
else:
|
||
icon = "unknown"
|
||
output.append(icon)
|
||
if device.battery is not None:
|
||
icon = f"battery-{(device.battery+12)//25*25}"
|
||
output[-1] = (output[-1], icon)
|
||
return "|".join(
|
||
(" ".join(icons[oo] for oo in o) if type(o) is tuple else icons[o])
|
||
for o in output
|
||
)
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
system=False,
|
||
path="/org/freedesktop/Notifications",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: args[0] == "org.dunstproject.cmd0" and "paused" in args[1],
|
||
)
|
||
)
|
||
@polybar("dunst")
|
||
async def dunst_status_update(i3, event, path, interface, changed, invalid):
|
||
"""Update notification status in Polybar."""
|
||
paused = changed["paused"][1]
|
||
return icons[paused and "notifications-disabled" or "notifications-enabled"]
|
||
|
||
|
||
@on(StartEvent)
|
||
@polybar("dunst")
|
||
async def dunst_status_check(i3, event):
|
||
"""Display notification status for Polybar."""
|
||
conn = i3.session_bus["org.freedesktop.Notifications"]
|
||
obj = conn["/org/freedesktop/Notifications"]
|
||
dunst = await obj.get_async_interface("org.dunstproject.cmd0")
|
||
paused = await dunst.paused
|
||
return icons[paused and "notifications-disabled" or "notifications-enabled"]
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
)
|
||
)
|
||
async def network_manager_notifications(i3, event, path, state, reason):
|
||
"""Display notifications related to Network Manager state."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
logger.debug("from %s state: %s, reason: %s", path, state, reason)
|
||
if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}:
|
||
# Deactivated state does not contain enough information,
|
||
# unless we maintain state.
|
||
return
|
||
obj = i3.system_bus[ofnm][path]
|
||
try:
|
||
nmca = await obj.get_async_interface(f"{ofnm}.Connection.Active")
|
||
except dbussy.DBusError:
|
||
logger.info(f"interface {path} has vanished")
|
||
return
|
||
kind = await nmca.Type
|
||
id = await nmca.Id
|
||
if kind == "vpn" or kind == "wireguard":
|
||
await notify(i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!")
|
||
elif kind == "802-3-ethernet":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wired",
|
||
summary=f"{id}",
|
||
body="Ethernet connection established!",
|
||
)
|
||
elif kind == "802-11-wireless":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wireless",
|
||
summary=f"{id}",
|
||
body="Wireless connection established!",
|
||
)
|
||
|
||
|
||
@on(
|
||
StartEvent,
|
||
DBusSignal(
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
),
|
||
DBusSignal(
|
||
path="/org/freedesktop/NetworkManager/AccessPoint",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: (args[0] == "org.freedesktop.NetworkManager.AccessPoint"),
|
||
),
|
||
)
|
||
@retry(2)
|
||
@debounce(
|
||
1,
|
||
unless=lambda i3, event, *args: (
|
||
isinstance(event, DBusSignal) and event.interface.endswith(".Active")
|
||
),
|
||
)
|
||
@polybar("network")
|
||
async def network_manager_status(i3, event, *args):
|
||
"""Compute network manager status."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
status = []
|
||
|
||
# Build status from devices
|
||
conn = i3.system_bus[ofnm]
|
||
nm = await conn["/org/freedesktop/NetworkManager"].get_async_interface(ofnm)
|
||
devices = await nm.AllDevices
|
||
for device in devices:
|
||
nmd = await conn[device].get_async_interface(f"{ofnm}.Device")
|
||
kind = await nmd.DeviceType
|
||
state = await nmd.State
|
||
if state == NM_DEVICE_STATE_UNMANAGED:
|
||
continue
|
||
if kind == NM_DEVICE_TYPE_WIFI:
|
||
|
||
def wrap(s):
|
||
rofi = os.path.expanduser("~/.config/i3/bin/rofi-wifi")
|
||
return "%%{A1:%s:}%s%%{A}" % (rofi, s)
|
||
|
||
if state != NM_DEVICE_STATE_ACTIVATED:
|
||
status.append(wrap(icons["nowifi"]))
|
||
continue
|
||
nmw = await conn[device].get_async_interface(f"{ofnm}.Device.Wireless")
|
||
ap = await nmw.ActiveAccessPoint
|
||
if not ap:
|
||
status.append(wrap(icons["nowifi"]))
|
||
continue
|
||
network_manager_status.active_ap = ap
|
||
nmap = await conn[ap].get_async_interface(f"{ofnm}.AccessPoint")
|
||
name = await nmap.Ssid
|
||
strength = int(await nmap.Strength)
|
||
status.append(
|
||
wrap(
|
||
[
|
||
icons["wifi-low"],
|
||
icons["wifi-medium"],
|
||
icons["wifi-high"],
|
||
][strength // 34]
|
||
+ " "
|
||
+ bytes(name).decode("utf-8", errors="replace").replace("%", "%%")
|
||
)
|
||
)
|
||
elif kind == NM_DEVICE_TYPE_ETHERNET and state == NM_DEVICE_STATE_ACTIVATED:
|
||
status.append(icons["wired"])
|
||
|
||
# Build status for VPN connection
|
||
connections = await nm.ActiveConnections
|
||
for connection in connections:
|
||
nma = await conn[connection].get_async_interface(f"{ofnm}.Connection.Active")
|
||
vpn = await nma.Vpn
|
||
kind = await nma.Type
|
||
if vpn or kind == "wireguard":
|
||
state = await nma.State
|
||
if state == NM_ACTIVE_CONNECTION_STATE_ACTIVATED:
|
||
status.append(icons["vpn"])
|
||
status.append((await nma.Id).replace("%", "%%"))
|
||
|
||
# Final status line
|
||
return " ".join(status)
|
||
|
||
|
||
# Main function
|
||
|
||
|
||
async def main(options):
|
||
i3 = await Connection(auto_reconnect=True).connect()
|
||
i3.session_bus = await ravel.session_bus_async()
|
||
i3.system_bus = await ravel.system_bus_async()
|
||
i3.x11 = xcffib.connect()
|
||
|
||
# Regular events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, I3Event):
|
||
|
||
def wrapping(fn, event):
|
||
async def wrapped(i3, event):
|
||
logger.debug("received i3 event %s for %s", event, fn)
|
||
return await fn(i3, event)
|
||
|
||
return wrapped
|
||
|
||
i3.on(event, wrapping(fn, event))
|
||
|
||
# React to some bindings
|
||
async def binding_event(i3, event):
|
||
"""Process a binding event."""
|
||
# We only processes it when it is a nop command and we use
|
||
# this mechanism as an IPC mechanism. The alternative would be
|
||
# to use ticks but we would need to spawn an i3-msg process
|
||
# for that.
|
||
for cmd in event.binding.command.split(";"):
|
||
cmd = cmd.strip()
|
||
if not cmd.startswith("nop "):
|
||
continue
|
||
cmd = cmd[4:].strip(" \"'")
|
||
if not cmd:
|
||
continue
|
||
kind = cmd.split(":")[0]
|
||
for fn, events in on.functions.items():
|
||
for e in events:
|
||
if isinstance(e, CommandEvent) and e.name == kind:
|
||
logger.debug("received command event %s for %s", event, fn)
|
||
await fn(i3, cmd)
|
||
|
||
i3.on(I3Event.BINDING, binding_event)
|
||
|
||
# React to ticks
|
||
async def tick_event(i3, event):
|
||
"""Process tick events."""
|
||
kind = event.payload.split(":")[0]
|
||
for fn, events in on.functions.items():
|
||
for e in events:
|
||
if isinstance(e, CommandEvent) and e.name == kind:
|
||
logger.debug("received command event %s for %s", event, fn)
|
||
await fn(i3, event.payload)
|
||
|
||
i3.on(I3Event.TICK, tick_event)
|
||
|
||
# Listen to DBus events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, DBusSignal):
|
||
bus = i3.system_bus if event.system else i3.session_bus
|
||
|
||
def wrapping(fn, event):
|
||
@ravel.signal(
|
||
name=event.member,
|
||
in_signature=event.signature,
|
||
path_keyword="path",
|
||
args_keyword="args",
|
||
)
|
||
async def wrapped(path, args):
|
||
if event.onlyif is not None and not event.onlyif(args):
|
||
logger.debug(
|
||
"received DBus event for %s, not interested", fn
|
||
)
|
||
return
|
||
logger.debug("received DBus event %s for %s", event, fn)
|
||
return await fn(i3, event, path, *args)
|
||
|
||
return wrapped
|
||
|
||
bus.listen_signal(
|
||
path=event.path,
|
||
fallback=True,
|
||
interface=event.interface,
|
||
name=event.member,
|
||
func=wrapping(fn, event),
|
||
)
|
||
|
||
# Run events that should run on start
|
||
start_tasks = []
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if event is StartEvent:
|
||
start_tasks.append(asyncio.create_task(fn(i3, event)))
|
||
|
||
daemon.notify("READY=1")
|
||
await i3.main()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Parse
|
||
description = sys.modules[__name__].__doc__
|
||
for fn, events in on.functions.items():
|
||
description += f" {fn.__doc__}"
|
||
parser = argparse.ArgumentParser(description=description)
|
||
parser.add_argument(
|
||
"--debug",
|
||
"-d",
|
||
action="store_true",
|
||
default=False,
|
||
help="enable debugging",
|
||
)
|
||
options = parser.parse_args()
|
||
|
||
# Logging
|
||
root = logging.getLogger("")
|
||
root.setLevel(logging.WARNING)
|
||
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
|
||
if sys.stderr.isatty():
|
||
ch = logging.StreamHandler()
|
||
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
|
||
root.addHandler(ch)
|
||
else:
|
||
root.addHandler(journal.JournalHandler(SYSLOG_IDENTIFIER=logger.name))
|
||
|
||
try:
|
||
asyncio.run(main(options))
|
||
except Exception as e:
|
||
logger.exception("%s", e)
|
||
sys.exit(1)
|
||
sys.exit(0)
|