mirror of
https://github.com/vincentbernat/i3wm-configuration.git
synced 2025-06-21 01:25:42 +02:00
1008 lines
32 KiB
Python
Executable file
1008 lines
32 KiB
Python
Executable file
#!/usr/bin/python3
|
||
|
||
"""Personal i3 companion."""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import collections
|
||
import errno
|
||
import functools
|
||
import glob
|
||
import html
|
||
import logging
|
||
import logging.handlers
|
||
import os
|
||
import re
|
||
import shlex
|
||
import subprocess
|
||
import sys
|
||
import types
|
||
|
||
import i3ipc
|
||
from i3ipc.aio import Connection
|
||
from systemd import journal
|
||
import ravel
|
||
import dbussy
|
||
from Xlib import display
|
||
from Xlib.ext import randr
|
||
|
||
|
||
def icon(font_number, char):
|
||
"""Turn an icon into a string for Polybar."""
|
||
# Font number is from Polybar configuration.
|
||
# 2: https://fontawesome.com/v6.0/icons?s=solid
|
||
# 3: https://fontawesome.com/v6.0/icons?s=brands
|
||
return "%%{T%d}%s%%{T-}" % (font_number, char)
|
||
|
||
|
||
# Configuration
|
||
|
||
application_icons = {
|
||
"chromium": icon(3, ""),
|
||
"discord": icon(3, ""),
|
||
"d-feet": icon(2, ""),
|
||
"emacs": icon(2, ""),
|
||
"firefox": icon(3, ""),
|
||
"gimp": icon(2, ""),
|
||
"gitg": icon(2, ""),
|
||
"google-chrome": icon(3, ""),
|
||
"inkscape": icon(2, ""),
|
||
"libreoffice": icon(2, ""),
|
||
"mpv": icon(2, ""),
|
||
"pavucontrol": icon(2, ""),
|
||
"signal": icon(2, ""),
|
||
"snes9x-gtk": icon(2, ""),
|
||
"spotify": icon(3, ""),
|
||
"steam": icon(3, ""),
|
||
"vbeterm": icon(2, ""),
|
||
"zathura": icon(2, ""),
|
||
"zoom": icon(2, ""),
|
||
}
|
||
icons = {
|
||
"access-point": icon(2, ""),
|
||
"bluetooth": icon(2, ""),
|
||
"camera": icon(2, "⎙"),
|
||
"car": icon(2, "🚘"),
|
||
"gamepad": icon(2, "🎮"),
|
||
"headphones": icon(2, "🎧"),
|
||
"headset": icon(2, ""),
|
||
"keyboard": icon(2, "⌨"),
|
||
"laptop": icon(2, "💻"),
|
||
"loudspeaker": icon(2, ""),
|
||
"microphone": icon(2, ""),
|
||
"mouse": icon(2, ""),
|
||
"notifications-disabled": icon(2, "🔕"),
|
||
"notifications-enabled": icon(2, ""),
|
||
"nowifi": icon(2, ""),
|
||
"phone": icon(2, "📞"),
|
||
"printer": icon(2, "⎙"),
|
||
"scanner": icon(2, ""),
|
||
"unknown": icon(2, ""),
|
||
"vpn": icon(2, ""),
|
||
"webcam": icon(2, ""),
|
||
"wifi-high": icon(2, ""),
|
||
"wifi-low": icon(2, ""),
|
||
"wifi-medium": icon(2, ""),
|
||
"wired": icon(2, ""),
|
||
}
|
||
application_icons_nomatch = icon(2, "")
|
||
application_icons_alone = {application_icons[k] for k in {"vbeterm"}}
|
||
exclusive_apps = {"emacs", "firefox"}
|
||
intrusive_apps = {"vbeterm"}
|
||
|
||
logger = logging.getLogger("i3-companion")
|
||
|
||
# Events for @on decorator
|
||
DBusSignal = collections.namedtuple(
|
||
"DBusSignal",
|
||
["interface", "member", "signature", "system", "path", "onlyif"],
|
||
defaults=(True, "/", None),
|
||
)
|
||
StartEvent = object()
|
||
I3Event = i3ipc.Event
|
||
CommandEvent = collections.namedtuple("CommandEvent", ["name"])
|
||
|
||
NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2
|
||
NM_DEVICE_TYPE_ETHERNET = 1
|
||
NM_DEVICE_TYPE_WIFI = 2
|
||
NM_DEVICE_TYPE_MODEM = 8
|
||
NM_DEVICE_STATE_UNMANAGED = 10
|
||
NM_DEVICE_STATE_ACTIVATED = 100
|
||
|
||
|
||
# Event helpers
|
||
|
||
|
||
def static(**kwargs):
|
||
"""Define static variables for the event handler."""
|
||
|
||
def decorator(fn):
|
||
for k, v in kwargs.items():
|
||
setattr(fn, k, v)
|
||
return fn
|
||
|
||
return decorator
|
||
|
||
|
||
@static(functions={})
|
||
def on(*events):
|
||
"""Tag events that should be provided to the function."""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(*args, **kwargs):
|
||
return fn(*args, **kwargs)
|
||
|
||
on.functions[fn] = events
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def retry(max_retries):
|
||
"""Retry an async function."""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
retries = max_retries
|
||
while True:
|
||
try:
|
||
logger.debug("execute %s (remaining tries: %s)", fn, retries)
|
||
return await fn(*args, **kwargs)
|
||
except Exception as e:
|
||
if retries > 0:
|
||
retries -= 1
|
||
logger.warning(
|
||
f"while executing {fn} (remaining tries: %d): %s",
|
||
retries,
|
||
e,
|
||
)
|
||
else:
|
||
logger.exception(f"while executing {fn}: %s", e)
|
||
return
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def debounce(sleep, *, unless=None):
|
||
"""Debounce a function call (batch successive calls into only one).
|
||
Optional immediate execution. Ensure only one instance is
|
||
executed. It is assumed the arguments provided to the debounced
|
||
function have no effect on its execution."""
|
||
|
||
def decorator(fn):
|
||
async def worker():
|
||
while True:
|
||
try:
|
||
# Wait for an urgent work or until sleep is elapsed
|
||
await asyncio.wait_for(workers[fn].urgent.wait(), timeout=sleep)
|
||
logger.debug("urgent work received for %s", fn)
|
||
except asyncio.TimeoutError:
|
||
pass
|
||
args, kwargs = workers[fn].queue
|
||
workers[fn].queue = None
|
||
workers[fn].urgent.clear()
|
||
|
||
# Execute the work
|
||
logger.debug("execute work for %s", fn)
|
||
try:
|
||
await fn(*args, **kwargs)
|
||
except Exception as e:
|
||
logger.debug("while running %s, worker got %s", fn, e)
|
||
workers[fn] = None
|
||
raise
|
||
|
||
# Do we still have something to do?
|
||
if workers[fn].queue is None:
|
||
break
|
||
|
||
# No more work
|
||
logger.debug("no more work for %s", fn)
|
||
workers[fn] = None
|
||
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
if workers[fn] is None:
|
||
logger.debug("create new worker for %s", fn)
|
||
workers[fn] = types.SimpleNamespace()
|
||
workers[fn].task = asyncio.create_task(worker())
|
||
workers[fn].urgent = asyncio.Event()
|
||
workers[fn].queue = (args, kwargs)
|
||
else:
|
||
logger.debug("enqueue new work for %s", fn)
|
||
if unless is not None and unless(*args, **kwargs):
|
||
logger.debug("wake up now for %s", fn)
|
||
workers[fn].urgent.set()
|
||
return await workers[fn].task
|
||
|
||
workers[fn] = None
|
||
return wrapper
|
||
|
||
workers = {}
|
||
return decorator
|
||
|
||
|
||
# Other helpers
|
||
|
||
|
||
async def notify(i3, **kwargs):
|
||
"""Send a notification with notify-send."""
|
||
conn = i3.session_bus["org.freedesktop.Notifications"]
|
||
obj = conn["/org/freedesktop/Notifications"]
|
||
notifications = await obj.get_async_interface("org.freedesktop.Notifications")
|
||
parameters = dict(
|
||
app_name=logger.name,
|
||
replaces_id=0,
|
||
app_icon="dialog-information",
|
||
summary="",
|
||
actions=[],
|
||
hints={},
|
||
expire_timeout=5000,
|
||
)
|
||
parameters.update(kwargs)
|
||
return await notifications.Notify(**parameters)
|
||
|
||
|
||
def polybar(module, content):
|
||
"""Update Polybar module with the provided content."""
|
||
# Update cache file (for when polybar restarts)
|
||
with open(f"{os.getenv('XDG_RUNTIME_DIR')}/i3/{module}.txt", "w") as out:
|
||
out.write(content)
|
||
|
||
# Send it to polybar
|
||
for name in glob.glob("/tmp/polybar_mqueue.*"):
|
||
try:
|
||
with open(os.open(name, os.O_WRONLY | os.O_NONBLOCK), "w") as out:
|
||
cmd = f"action:#{module}.send.{content}"
|
||
out.write(cmd)
|
||
except OSError as e:
|
||
if e.errno != errno.ENXIO:
|
||
raise
|
||
|
||
|
||
# Event handlers
|
||
|
||
|
||
@on(StartEvent, I3Event.WINDOW_MOVE, I3Event.WINDOW_NEW, I3Event.WINDOW_CLOSE)
|
||
@debounce(0.2)
|
||
async def workspace_rename(i3, event):
|
||
"""Rename workspaces using icons to match what's inside it."""
|
||
tree = await i3.get_tree()
|
||
workspaces = tree.workspaces()
|
||
commands = []
|
||
|
||
def application_icon(window):
|
||
"""Get application icon for a window."""
|
||
for attr in ("window_instance", "window_class"):
|
||
name = getattr(window, attr, None)
|
||
if name is None:
|
||
continue
|
||
for k, v in application_icons.items():
|
||
if re.match(rf"^{k}\b", name, re.IGNORECASE):
|
||
logger.debug("in %s, found '%s', matching %s", attr, name, k)
|
||
return v
|
||
return application_icons_nomatch
|
||
|
||
for workspace in workspaces:
|
||
icons = set()
|
||
for window in workspace.leaves():
|
||
if window.sticky:
|
||
continue
|
||
icon = application_icon(window)
|
||
if icon is not None:
|
||
icons.add(icon)
|
||
if any([i not in application_icons_alone for i in icons]):
|
||
icons -= application_icons_alone
|
||
new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":")
|
||
if workspace.name != new_name:
|
||
logger.debug("rename workspace %s", workspace.num)
|
||
command = f'rename workspace "{workspace.name}" to "{new_name}"'
|
||
commands.append(command)
|
||
await i3.command(";".join(commands))
|
||
|
||
|
||
async def _new_workspace(i3):
|
||
workspaces = await i3.get_workspaces()
|
||
workspace_nums = {w.num for w in workspaces}
|
||
max_num = max(workspace_nums)
|
||
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
|
||
logger.info(f"create new workspace number {available}")
|
||
await i3.command(f'workspace number "{available}"')
|
||
return available
|
||
|
||
|
||
@on(CommandEvent("new-workspace"), CommandEvent("move-to-new-workspace"))
|
||
async def new_workspace(i3, event):
|
||
"""Create a new workspace and optionally move a window to it."""
|
||
# Get the currently focused window
|
||
if event == "move-to-new-workspace":
|
||
tree = await i3.get_tree()
|
||
current = tree.find_focused()
|
||
if not current:
|
||
return
|
||
|
||
num = await _new_workspace(i3)
|
||
|
||
# Move the window to this workspace
|
||
if event == "move-to-new-workspace":
|
||
await current.command(f"move container to workspace " f'number "{num}"')
|
||
|
||
|
||
@on(I3Event.WINDOW_NEW)
|
||
async def worksplace_exclusive(i3, event):
|
||
"""Move new windows on a new workspace instead of sharing a workspace
|
||
with an exclusive app."""
|
||
w = event.container
|
||
|
||
def partof(w, apps):
|
||
"""Provided window is part of the provided apps."""
|
||
names = {
|
||
s is not None and s.lower() or None
|
||
for s in {w.name, w.window_class, w.window_instance}
|
||
}
|
||
return bool(names.intersection(apps))
|
||
|
||
# Can the current window intrude the workspace?
|
||
if (
|
||
w.floating in {"auto_on", "user_on"}
|
||
or w.ipc_data["window_type"] not in {"normal", "splash", "unknown"}
|
||
or w.sticky
|
||
or partof(w, intrusive_apps)
|
||
):
|
||
logger.debug("window %s can intrude", w.name)
|
||
return
|
||
|
||
tree = await i3.get_tree()
|
||
|
||
# Get the window workspace. From an event, w.workspace() is None,
|
||
# so search it in the tree.
|
||
current_workspace = next(
|
||
(ow.workspace().num for ow in tree.leaves() if w.id == ow.id), None
|
||
)
|
||
if not current_workspace:
|
||
logger.info(f"cannot get workspace for {w.window_class}")
|
||
return
|
||
|
||
# Get the list of workspaces with an exclusive app, excluding the
|
||
# current window and windows of the same class.
|
||
exclusive_workspaces = {
|
||
ow.workspace().num
|
||
for ow in tree.leaves()
|
||
if w.id != ow.id
|
||
and (w.window_class or object()) != ow.window_class
|
||
and partof(ow, exclusive_apps)
|
||
}
|
||
|
||
# If current one is OK, don't move
|
||
if current_workspace not in exclusive_workspaces:
|
||
logger.debug("no exclusive app, %s can go there", w.window_class)
|
||
return
|
||
|
||
# Are there other workspaces with the same app but no exclusive apps?
|
||
candidate_workspaces = {
|
||
ow.workspace().num
|
||
for ow in tree.leaves()
|
||
if w.id != ow.id and (w.window_class or object()) == ow.window_class
|
||
}
|
||
candidate_workspaces -= exclusive_workspaces
|
||
|
||
if candidate_workspaces:
|
||
# Use one of the candidates
|
||
num = next(iter(candidate_workspaces))
|
||
else:
|
||
# Create a new workspace
|
||
num = await _new_workspace(i3)
|
||
|
||
logger.info(f"move window {w.window_class} to workspace {num}")
|
||
await w.command(f'move container to workspace number "{num}", focus')
|
||
|
||
|
||
@on(CommandEvent("quake-console"))
|
||
async def quake_console(i3, event):
|
||
"""Spawn a quake console or toggle an existing one."""
|
||
try:
|
||
_, term_exec, term_name, height = event.split(":")
|
||
height = float(height)
|
||
except Exception as exc:
|
||
logger.warn(f"unable to parse payload {event}: {exc}")
|
||
return
|
||
|
||
tree = await i3.get_tree()
|
||
term = tree.find_instanced(term_name)
|
||
if not term:
|
||
await i3.command(f"exec exec {term_exec} --name {term_name}")
|
||
tries = 5
|
||
while not term and tries:
|
||
tree = await i3.get_tree()
|
||
term = tree.find_instanced(term_name)
|
||
await asyncio.sleep(0.2)
|
||
tries -= 1
|
||
if not term:
|
||
raise RuntimeError("unable to spawn terminal")
|
||
term = term[0]
|
||
workspaces = await i3.get_workspaces()
|
||
workspace = [ws for ws in workspaces if ws.focused][0]
|
||
ws_x = workspace.rect.x
|
||
ws_y = workspace.rect.y
|
||
ws_width = workspace.rect.width
|
||
ws_height = workspace.rect.height
|
||
width = ws_width
|
||
height = int(ws_height * height)
|
||
posx = ws_x
|
||
posy = ws_y
|
||
command = (
|
||
f"[instance={term_name}] "
|
||
"border none,"
|
||
f"resize set {width} px {height} px,"
|
||
"scratchpad show,"
|
||
f"move absolute position {posx}px {posy}px"
|
||
)
|
||
logger.debug("QuakeConsole: %s", command)
|
||
await i3.command(command)
|
||
|
||
|
||
@on(CommandEvent("container-info"))
|
||
@static(last_id=0)
|
||
async def container_info(i3, event):
|
||
"""Show information about the focused container."""
|
||
tree = await i3.get_tree()
|
||
window = tree.find_focused()
|
||
if not window:
|
||
return
|
||
logger.info(f"window raw information: {window.ipc_data}")
|
||
summary = "About focused container"
|
||
r = window.rect
|
||
w = window
|
||
info = {
|
||
"name": w.name,
|
||
"title": w.window_title,
|
||
"class": w.window_class,
|
||
"instance": w.window_instance,
|
||
"role": w.window_role,
|
||
"type": w.ipc_data["window_type"],
|
||
"sticky": w.sticky,
|
||
"floating": w.floating,
|
||
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
|
||
"layout": w.layout,
|
||
"parcent": w.percent,
|
||
"marks": ", ".join(w.marks) or "(none)",
|
||
}
|
||
body = "\n".join(
|
||
(
|
||
f"<tt>{k:10}</tt> {html.escape(str(v))}"
|
||
for k, v in info.items()
|
||
if v is not None
|
||
)
|
||
)
|
||
result = await notify(
|
||
i3,
|
||
app_icon="system-search",
|
||
expire_timeout=10000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=container_info.last_id,
|
||
)
|
||
container_info.last_id = result[0]
|
||
|
||
|
||
@on(CommandEvent("workspace-info"))
|
||
@static(last_id=0)
|
||
async def workspace_info(i3, event):
|
||
"""Show information about the focused workspace."""
|
||
workspaces = await i3.get_workspaces()
|
||
focused = [w for w in workspaces if w.focused]
|
||
if not focused:
|
||
return
|
||
workspace = focused[0]
|
||
summary = f"Workspace {workspace.num} on {workspace.output}"
|
||
tree = await i3.get_tree()
|
||
workspace = [w for w in tree.workspaces() if w.num == workspace.num]
|
||
|
||
def format(container):
|
||
if container.focused:
|
||
style = 'foreground="#ffaf00"'
|
||
elif not container.window:
|
||
style = 'foreground="#6c98ee"'
|
||
else:
|
||
style = ""
|
||
if container.window:
|
||
content = (
|
||
f"{(container.window_class or '???').lower()}: "
|
||
f"{(container.window_title or '???')}"
|
||
)
|
||
elif container.type == "workspace" and not container.nodes:
|
||
# Empty workspaces use workspace_layout, but when default,
|
||
# this is layout...
|
||
layout = container.ipc_data["workspace_layout"]
|
||
if layout == "default":
|
||
layout = container.layout
|
||
content = f"({layout})"
|
||
else:
|
||
content = f"({container.layout})"
|
||
root = f"<span {style}>{content.lower()}</span>"
|
||
children = []
|
||
for child in container.nodes:
|
||
if child == container.nodes[-1]:
|
||
first = "└─"
|
||
others = " "
|
||
else:
|
||
first = "├─"
|
||
others = "│ "
|
||
content = format(child).replace("\n", f"\n{others}")
|
||
children.append(f"<tt>{first}</tt>{content}")
|
||
children.insert(0, root)
|
||
return "\n".join(children)
|
||
|
||
body = format(workspace[0]).lstrip("\n")
|
||
result = await notify(
|
||
i3,
|
||
app_icon="system-search",
|
||
expire_timeout=20000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=workspace_info.last_id,
|
||
)
|
||
workspace_info.last_id = result[0]
|
||
|
||
|
||
@on(I3Event.OUTPUT, StartEvent)
|
||
@static(last_setup=None)
|
||
@debounce(2)
|
||
async def output_update(i3, event):
|
||
"""React to a XRandR change."""
|
||
|
||
# Grab current setup. Synchronous, but it's short enough
|
||
d = display.Display()
|
||
try:
|
||
screen = d.screen()
|
||
window = screen.root.create_window(0, 0, 1, 1, 1, screen.root_depth)
|
||
screen_resources = randr.get_screen_resources_current(window)
|
||
current_setup = set()
|
||
for output in screen_resources.outputs:
|
||
output_info = randr.get_output_info(
|
||
window, output, screen_resources.timestamp
|
||
)
|
||
if output_info.crtc == 0:
|
||
continue
|
||
crtc_info = randr.get_crtc_info(
|
||
window, output_info.crtc, output_info.timestamp
|
||
)
|
||
current_setup.add(
|
||
(
|
||
output_info.name,
|
||
crtc_info.width,
|
||
crtc_info.height,
|
||
crtc_info.x,
|
||
crtc_info.y,
|
||
)
|
||
)
|
||
finally:
|
||
d.close()
|
||
|
||
# Compare to current setup
|
||
if current_setup == output_update.last_setup:
|
||
logger.debug("current xrandr setup unchanged")
|
||
return
|
||
output_update.last_setup = current_setup
|
||
logger.info("xrandr setup: %s", current_setup)
|
||
if event is StartEvent:
|
||
return
|
||
|
||
# Trigger changes
|
||
logger.info("xrandr change detected")
|
||
cmds = (
|
||
"systemctl --user reload --no-block xsettingsd.service",
|
||
"systemctl --user start --no-block wallpaper.service",
|
||
)
|
||
for cmd in cmds:
|
||
proc = subprocess.run(shlex.split(cmd))
|
||
if proc.returncode != 0:
|
||
logger.warning(f"{cmd} exited with {proc.returncode}")
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
path="/org/bluez",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: args[0] == "org.bluez.Device1" and "Connected" in args[1],
|
||
)
|
||
)
|
||
async def bluetooth_notifications(i3, event, path, interface, changed, invalid):
|
||
"""Display notifications related to Bluetooth state."""
|
||
obj = i3.system_bus["org.bluez"][path]
|
||
obd = await obj.get_async_interface(interface)
|
||
name = await obd.Name
|
||
icon = await obd.Icon
|
||
state = await obd.Connected
|
||
state = "connected" if state else "disconnected"
|
||
await notify(
|
||
i3,
|
||
app_icon=icon,
|
||
summary=name,
|
||
body=f"Bluetooth device {state}",
|
||
)
|
||
|
||
|
||
@on(
|
||
StartEvent,
|
||
DBusSignal(
|
||
path="/org/bluez",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: (
|
||
args[0] == "org.bluez.Device1"
|
||
and "Connected" in args[1]
|
||
or args[0] == "org.bluez.Adapter1"
|
||
and "Powered" in args[1]
|
||
),
|
||
),
|
||
)
|
||
@static(last=None)
|
||
@retry(2)
|
||
@debounce(0.2)
|
||
async def bluetooth_status(i3, event, *args):
|
||
"""Update bluetooth status for polybar."""
|
||
if event is StartEvent:
|
||
# Do we have a bluetooth device?
|
||
if not os.path.exists("/sys/class/bluetooth"):
|
||
logger.info("no bluetooth detected")
|
||
polybar("bluetooth", "")
|
||
return
|
||
|
||
# OK, get the info
|
||
conn = i3.system_bus["org.bluez"]
|
||
om = await conn["/"].get_async_interface("org.freedesktop.DBus.ObjectManager")
|
||
objects = await om.GetManagedObjects()
|
||
objects = objects[0]
|
||
powered = False
|
||
devices = []
|
||
for path, interfaces in objects.items():
|
||
if "org.bluez.Adapter1" in interfaces:
|
||
# We get an adapter!
|
||
adapter = interfaces["org.bluez.Adapter1"]
|
||
if adapter["Powered"][1]:
|
||
powered = True
|
||
elif "org.bluez.Device1" in interfaces:
|
||
# We have a device!
|
||
device = interfaces["org.bluez.Device1"]
|
||
if not device["Connected"][1]:
|
||
continue
|
||
device_class = device["Class"][1]
|
||
major = (device_class & 0x1F00) >> 8
|
||
minor = (device_class & 0xFC) >> 2
|
||
devices.append((major, minor))
|
||
|
||
# Choose appropriate icons for output
|
||
# See: https://btprodspecificationrefs.blob.core.windows.net/assigned-numbers/Assigned%20Number%20Types/Baseband.pdf
|
||
if not powered:
|
||
output = ""
|
||
else:
|
||
output = ["bluetooth"]
|
||
for major, minor in devices:
|
||
classes = {
|
||
1: "laptop",
|
||
2: "phone",
|
||
3: "access-point",
|
||
(4, 1): "headset",
|
||
(4, 2): "headset",
|
||
(4, 4): "microphone",
|
||
(4, 5): "loudspeaker",
|
||
(4, 7): "loudspeaker",
|
||
(4, 10): "loudspeaker",
|
||
(4, 6): "headphones",
|
||
(4, 8): "car",
|
||
(4, 12): "webcam",
|
||
(5, 1): "gamepad",
|
||
(5, 2): "gamepad",
|
||
5: [
|
||
(lambda x: x & 0x10, "keyboard"),
|
||
(lambda x: x & 0x20, "mouse"),
|
||
],
|
||
6: [
|
||
(lambda x: x & 0x8, "camera"),
|
||
(lambda x: x & 0x10, "scanner"),
|
||
(lambda x: x & 0x20, "printer"),
|
||
],
|
||
}
|
||
icon = classes.get((major, minor)) or classes.get(major, "unknown")
|
||
if type(icon) is list:
|
||
for matcher, name in icon:
|
||
if matcher(minor):
|
||
icon = name
|
||
break
|
||
else:
|
||
icon = "unknown"
|
||
output.append(icon)
|
||
output = "|".join(icons[o] for o in output)
|
||
|
||
# Update polybar
|
||
if bluetooth_status.last != output:
|
||
logger.info("updated bluetooth status")
|
||
polybar("bluetooth", output)
|
||
bluetooth_status.last = output
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
system=False,
|
||
path="/org/freedesktop/Notifications",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
onlyif=lambda args: args[0] == "org.dunstproject.cmd0" and "paused" in args[1],
|
||
)
|
||
)
|
||
async def dunst_status_update(i3, event, path, interface, changed, invalid):
|
||
"""Update notification status in polybar."""
|
||
if interface != "org.dunstproject.cmd0":
|
||
return
|
||
if "paused" not in changed:
|
||
return
|
||
polybar(
|
||
"dunst",
|
||
icons[
|
||
changed["paused"][1] and "notifications-disabled" or "notifications-enabled"
|
||
],
|
||
)
|
||
|
||
|
||
@on(StartEvent)
|
||
async def dunst_status_check(i3, event):
|
||
"""Display notification status for polybar."""
|
||
conn = i3.session_bus["org.freedesktop.Notifications"]
|
||
obj = conn["/org/freedesktop/Notifications"]
|
||
dunst = await obj.get_async_interface("org.dunstproject.cmd0")
|
||
paused = await dunst.paused
|
||
polybar(
|
||
"dunst",
|
||
icons[paused and "notifications-disabled" or "notifications-enabled"],
|
||
)
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
)
|
||
)
|
||
async def network_manager_notifications(i3, event, path, state, reason):
|
||
"""Display notifications related to Network Manager state."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
logger.debug("from %s state: %s, reason: %s", path, state, reason)
|
||
if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}:
|
||
# Deactivated state does not contain enough information,
|
||
# unless we maintain state.
|
||
return
|
||
obj = i3.system_bus[ofnm][path]
|
||
try:
|
||
nmca = await obj.get_async_interface(f"{ofnm}.Connection.Active")
|
||
except dbussy.DBusError:
|
||
logger.info(f"interface {path} has vanished")
|
||
return
|
||
kind = await nmca.Type
|
||
id = await nmca.Id
|
||
if kind == "vpn":
|
||
await notify(i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!")
|
||
elif kind == "802-3-ethernet":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wired",
|
||
summary=f"{id}",
|
||
body="Ethernet connection established!",
|
||
)
|
||
elif kind == "802-11-wireless":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wireless",
|
||
summary=f"{id}",
|
||
body="Wireless connection established!",
|
||
)
|
||
|
||
|
||
@on(
|
||
StartEvent,
|
||
DBusSignal(
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
),
|
||
DBusSignal(
|
||
interface="org.freedesktop.NetworkManager.AccessPoint",
|
||
member="PropertiesChanged",
|
||
signature="a{sv}",
|
||
),
|
||
)
|
||
@static(last=None)
|
||
@retry(2)
|
||
@debounce(
|
||
1,
|
||
unless=lambda i3, event, *args: (
|
||
isinstance(event, DBusSignal) and event.interface.endswith(".Active")
|
||
),
|
||
)
|
||
async def network_manager_status(i3, event, *args):
|
||
"""Compute network manager status."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
status = []
|
||
|
||
# Build status from devices
|
||
conn = i3.system_bus[ofnm]
|
||
nm = await conn["/org/freedesktop/NetworkManager"].get_async_interface(ofnm)
|
||
devices = await nm.AllDevices
|
||
for device in devices:
|
||
nmd = await conn[device].get_async_interface(f"{ofnm}.Device")
|
||
kind = await nmd.DeviceType
|
||
state = await nmd.State
|
||
if state == NM_DEVICE_STATE_UNMANAGED:
|
||
continue
|
||
if kind == NM_DEVICE_TYPE_WIFI:
|
||
if state != NM_DEVICE_STATE_ACTIVATED:
|
||
status.append(icons["nowifi"])
|
||
continue
|
||
nmw = await conn[device].get_async_interface(f"{ofnm}.Device.Wireless")
|
||
ap = await nmw.ActiveAccessPoint
|
||
if not ap:
|
||
status.append(icons["nowifi"])
|
||
continue
|
||
network_manager_status.active_ap = ap
|
||
nmap = await conn[ap].get_async_interface(f"{ofnm}.AccessPoint")
|
||
name = await nmap.Ssid
|
||
strength = int(await nmap.Strength)
|
||
status.append(
|
||
[
|
||
icons["wifi-low"],
|
||
icons["wifi-medium"],
|
||
icons["wifi-high"],
|
||
][strength // 34]
|
||
)
|
||
status.append(
|
||
bytes(name).decode("utf-8", errors="replace").replace("%", "%%")
|
||
)
|
||
elif kind == NM_DEVICE_TYPE_ETHERNET and state == NM_DEVICE_STATE_ACTIVATED:
|
||
status.append(icons["wired"])
|
||
|
||
# Build status for VPN connection
|
||
connections = await nm.ActiveConnections
|
||
for connection in connections:
|
||
nma = await conn[connection].get_async_interface(f"{ofnm}.Connection.Active")
|
||
vpn = await nma.Vpn
|
||
if vpn:
|
||
state = await nma.State
|
||
if state == NM_ACTIVE_CONNECTION_STATE_ACTIVATED:
|
||
status.append(icons["vpn"])
|
||
status.append(await nma.Id)
|
||
|
||
# Final status line
|
||
status = " ".join(status)
|
||
|
||
if status != network_manager_status.last:
|
||
logger.info("updated network status")
|
||
polybar("network", status)
|
||
network_manager_status.last = status
|
||
|
||
|
||
# Main function
|
||
|
||
|
||
async def main(options):
|
||
i3 = await Connection(auto_reconnect=True).connect()
|
||
i3.session_bus = await ravel.session_bus_async()
|
||
i3.system_bus = await ravel.system_bus_async()
|
||
|
||
# Regular events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, I3Event):
|
||
|
||
def wrapping(fn, event):
|
||
async def wrapped(i3, event):
|
||
logger.debug("received i3 event %s for %s", event, fn)
|
||
return await fn(i3, event)
|
||
|
||
return wrapped
|
||
|
||
i3.on(event, wrapping(fn, event))
|
||
|
||
# React to some bindings
|
||
async def binding_event(i3, event):
|
||
"""Process a binding event."""
|
||
# We only processes it when it is a nop command and we use
|
||
# this mechanism as an IPC mechanism. The alternative would be
|
||
# to use ticks but we would need to spawn an i3-msg process
|
||
# for that.
|
||
cmd = event.binding.command
|
||
if not cmd.startswith("nop "):
|
||
return
|
||
cmd = cmd[4:].strip("\"'")
|
||
if not cmd:
|
||
return
|
||
kind = cmd.split(":")[0]
|
||
for fn, events in on.functions.items():
|
||
for e in events:
|
||
if isinstance(e, CommandEvent) and e.name == kind:
|
||
logger.debug("received command event %s for %s", event, fn)
|
||
await fn(i3, cmd)
|
||
|
||
i3.on(I3Event.BINDING, binding_event)
|
||
|
||
# Listen to DBus events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, DBusSignal):
|
||
bus = i3.system_bus if event.system else i3.session_bus
|
||
|
||
def wrapping(fn, event):
|
||
@ravel.signal(
|
||
name=event.member,
|
||
in_signature=event.signature,
|
||
path_keyword="path",
|
||
args_keyword="args",
|
||
)
|
||
async def wrapped(path, args):
|
||
if event.onlyif is not None and not event.onlyif(args):
|
||
logger.debug(
|
||
"received DBus event for %s, not interested", fn
|
||
)
|
||
return
|
||
logger.debug("received DBus event %s for %s", event, fn)
|
||
return await fn(i3, event, path, *args)
|
||
|
||
return wrapped
|
||
|
||
bus.listen_signal(
|
||
path=event.path,
|
||
fallback=True,
|
||
interface=event.interface,
|
||
name=event.member,
|
||
func=wrapping(fn, event),
|
||
)
|
||
|
||
# Run events that should run on start
|
||
start_tasks = []
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if event is StartEvent:
|
||
start_tasks.append(asyncio.create_task(fn(i3, event)))
|
||
|
||
await i3.main()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Parse
|
||
description = sys.modules[__name__].__doc__
|
||
for fn, events in on.functions.items():
|
||
description += f" {fn.__doc__}"
|
||
parser = argparse.ArgumentParser(description=description)
|
||
parser.add_argument(
|
||
"--debug",
|
||
"-d",
|
||
action="store_true",
|
||
default=False,
|
||
help="enable debugging",
|
||
)
|
||
options = parser.parse_args()
|
||
|
||
# Logging
|
||
root = logging.getLogger("")
|
||
root.setLevel(logging.WARNING)
|
||
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
|
||
if sys.stderr.isatty():
|
||
ch = logging.StreamHandler()
|
||
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
|
||
root.addHandler(ch)
|
||
else:
|
||
root.addHandler(journal.JournalHandler(SYSLOG_IDENTIFIER=logger.name))
|
||
|
||
try:
|
||
asyncio.run(main(options))
|
||
except Exception as e:
|
||
logger.exception("%s", e)
|
||
sys.exit(1)
|
||
sys.exit(0)
|