mirror of
https://github.com/vincentbernat/i3wm-configuration.git
synced 2025-08-31 15:09:23 +02:00
I think this is more correct. Dampen is used in network world, but debounce seems more common (notably, we debounce push buttons). Another word could be throttle but that does not match as we need to react to the last event of a batch instead of the first (and we sure don't want to drop the last).
757 lines
24 KiB
Python
Executable file
757 lines
24 KiB
Python
Executable file
#!/usr/bin/python3
|
||
|
||
"""Personal i3 companion."""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import collections
|
||
import errno
|
||
import functools
|
||
import glob
|
||
import html
|
||
import logging
|
||
import logging.handlers
|
||
import os
|
||
import re
|
||
import shlex
|
||
import subprocess
|
||
import sys
|
||
import types
|
||
|
||
import i3ipc
|
||
from i3ipc.aio import Connection
|
||
from systemd import journal
|
||
import ravel
|
||
import dbussy
|
||
|
||
|
||
def icon(font_number, char):
|
||
"""Turn an icon into a string for Polybar."""
|
||
# Font number is from Polybar configuration.
|
||
# 1: https://fontawesome.com/v6.0/icons?s=solid
|
||
# 2: https://fontawesome.com/v6.0/icons?s=brands
|
||
return "%%{T%d}%s%%{T-}" % (font_number, char)
|
||
|
||
|
||
application_icons = {
|
||
"chromium": icon(2, ""),
|
||
"discord": icon(2, ""),
|
||
"emacs": icon(1, ""),
|
||
"firefox": icon(2, ""),
|
||
"gimp": icon(1, ""),
|
||
"gitg": icon(1, ""),
|
||
"google-chrome": icon(2, ""),
|
||
"inkscape": icon(1, ""),
|
||
"libreoffice": icon(1, ""),
|
||
"mpv": icon(1, ""),
|
||
"pavucontrol": icon(1, ""),
|
||
"signal": icon(1, ""),
|
||
"snes9x-gtk": icon(1, ""),
|
||
"spotify": icon(2, ""),
|
||
"steam": icon(2, ""),
|
||
"vbeterm": icon(1, ""),
|
||
"zathura": icon(1, ""),
|
||
"zoom": icon(1, ""),
|
||
}
|
||
icons = {
|
||
"nowifi": icon(1, ""),
|
||
"vpn": icon(1, ""),
|
||
"wifi-low": icon(1, ""),
|
||
"wifi-medium": icon(1, ""),
|
||
"wifi-high": icon(1, ""),
|
||
"wired": icon(1, ""),
|
||
}
|
||
application_icons_nomatch = icon(1, "")
|
||
application_icons_alone = {application_icons[k] for k in {"vbeterm"}}
|
||
exclusive_apps = {"emacs", "firefox"}
|
||
intrusive_apps = {"vbeterm"}
|
||
|
||
logger = logging.getLogger("i3-companion")
|
||
|
||
# Events for @on decorator
|
||
DBusSignal = collections.namedtuple(
|
||
"DBusSignal", ["path", "interface", "member", "signature"]
|
||
)
|
||
StartEvent = object()
|
||
I3Event = i3ipc.Event
|
||
CommandEvent = collections.namedtuple("CommandEvent", ["name"])
|
||
|
||
NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2
|
||
NM_DEVICE_TYPE_ETHERNET = 1
|
||
NM_DEVICE_TYPE_WIFI = 2
|
||
NM_DEVICE_TYPE_MODEM = 8
|
||
NM_DEVICE_STATE_UNMANAGED = 10
|
||
NM_DEVICE_STATE_ACTIVATED = 100
|
||
|
||
|
||
def on(*events):
|
||
"""Tag events that should be provided to the function."""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(*args, **kwargs):
|
||
return fn(*args, **kwargs)
|
||
|
||
on.functions = getattr(on, "functions", {})
|
||
on.functions[fn] = events
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def debounce(sleep, *, unless=None, retry=0):
|
||
"""Debounce a function call (batch successive calls into only one).
|
||
Optional immediate execution. Optional retry on failure. Ensure
|
||
only one instance is executed. It is assumed the arguments
|
||
provided to the debounced function have no effect on its
|
||
execution.
|
||
|
||
"""
|
||
|
||
def decorator(fn):
|
||
async def worker():
|
||
while True:
|
||
try:
|
||
# Wait for an urgent work or until sleep is elapsed
|
||
await asyncio.wait_for(
|
||
fn.worker.urgent.wait(), timeout=sleep
|
||
)
|
||
logger.debug(f"urgent work received for {fn}")
|
||
except asyncio.TimeoutError:
|
||
pass
|
||
retry, args, kwargs = fn.worker.queue
|
||
fn.worker.queue = None
|
||
fn.worker.urgent.clear()
|
||
|
||
# Execute the work
|
||
logger.debug(f"execute work for {fn}")
|
||
try:
|
||
await fn(*args, **kwargs)
|
||
except Exception as e:
|
||
if not retry:
|
||
logger.exception(f"while executing {fn}: %s", e)
|
||
return
|
||
retry -= 1
|
||
logger.warning(
|
||
f"while executing {fn} (remaining tries: %d): %s",
|
||
retry,
|
||
str(e),
|
||
)
|
||
|
||
# Retry, unless we have something already scheduled
|
||
if fn.worker.queue is not None:
|
||
logger.debug(f"retry now with queued event for {fn}")
|
||
fn.worker.urgent.set()
|
||
continue
|
||
logger.debug(f"reschedule retry for {fn}")
|
||
fn.worker.queue = (retry, args, kwargs)
|
||
if unless is not None and unless(*args, **kwargs):
|
||
logger.debug(f"wake up now for retry of {fn}")
|
||
fn.worker.urgent.set()
|
||
# Do we still have something to do?
|
||
if fn.worker.queue is None:
|
||
break
|
||
|
||
# No more work
|
||
logger.debug(f"no more work for {fn}")
|
||
fn.worker = None
|
||
|
||
@functools.wraps(fn)
|
||
async def wrapper(*args, **kwargs):
|
||
if fn.worker is None:
|
||
logger.debug(f"create new worker for {fn}")
|
||
fn.worker = types.SimpleNamespace()
|
||
fn.worker.task = asyncio.create_task(worker())
|
||
fn.worker.urgent = asyncio.Event()
|
||
fn.worker.queue = (retry, args, kwargs)
|
||
else:
|
||
logger.debug(f"enqueue new work for {fn}")
|
||
if unless is not None and unless(*args, **kwargs):
|
||
logger.debug(f"wake up now for {fn}")
|
||
fn.worker.urgent.set()
|
||
|
||
fn.worker = None
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
async def notify(i3, **kwargs):
|
||
"""Send a notification with notify-send."""
|
||
peer = i3.session_bus["org.freedesktop.Notifications"]
|
||
peer = peer["/org/freedesktop/Notifications"]
|
||
notifications = await peer.get_async_interface(
|
||
"org.freedesktop.Notifications"
|
||
)
|
||
parameters = dict(
|
||
app_name=logger.name,
|
||
replaces_id=0,
|
||
app_icon="dialog-information",
|
||
summary="",
|
||
actions=[],
|
||
hints={},
|
||
expire_timeout=5000,
|
||
)
|
||
parameters.update(kwargs)
|
||
return await notifications.Notify(**parameters)
|
||
|
||
|
||
@on(StartEvent, I3Event.WINDOW_MOVE, I3Event.WINDOW_NEW, I3Event.WINDOW_CLOSE)
|
||
@debounce(0.2)
|
||
async def workspace_rename(i3, event):
|
||
"""Rename workspaces using icons to match what's inside it."""
|
||
tree = await i3.get_tree()
|
||
workspaces = tree.workspaces()
|
||
commands = []
|
||
|
||
def application_icon(window):
|
||
"""Get application icon for a window."""
|
||
for attr in ("window_instance", "window_class"):
|
||
name = getattr(window, attr, None)
|
||
if name is None:
|
||
continue
|
||
for k, v in application_icons.items():
|
||
if re.match(rf"^{k}\b", name, re.IGNORECASE):
|
||
logger.debug(f"in {attr}, found '{name}', matching {k}")
|
||
return v
|
||
return application_icons_nomatch
|
||
|
||
for workspace in workspaces:
|
||
icons = set()
|
||
for window in workspace.leaves():
|
||
if window.sticky:
|
||
continue
|
||
icon = application_icon(window)
|
||
if icon is not None:
|
||
icons.add(icon)
|
||
if any([i not in application_icons_alone for i in icons]):
|
||
icons -= application_icons_alone
|
||
new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":")
|
||
if workspace.name != new_name:
|
||
logger.debug(f"rename workspace {workspace.num}")
|
||
command = f'rename workspace "{workspace.name}" to "{new_name}"'
|
||
commands.append(command)
|
||
await i3.command(";".join(commands))
|
||
|
||
|
||
async def _new_workspace(i3):
|
||
workspaces = await i3.get_workspaces()
|
||
workspace_nums = {w.num for w in workspaces}
|
||
max_num = max(workspace_nums)
|
||
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
|
||
logger.info(f"create new workspace number {available}")
|
||
await i3.command(f'workspace number "{available}"')
|
||
return available
|
||
|
||
|
||
@on(CommandEvent("new-workspace"), CommandEvent("move-to-new-workspace"))
|
||
async def new_workspace(i3, event):
|
||
"""Create a new workspace and optionally move a window to it."""
|
||
# Get the currently focused window
|
||
if event == "move-to-new-workspace":
|
||
tree = await i3.get_tree()
|
||
current = tree.find_focused()
|
||
if not current:
|
||
return
|
||
|
||
num = await _new_workspace(i3)
|
||
|
||
# Move the window to this workspace
|
||
if event == "move-to-new-workspace":
|
||
await current.command(
|
||
f"move container to workspace " f'number "{num}"'
|
||
)
|
||
|
||
|
||
@on(I3Event.WINDOW_NEW)
|
||
async def worksplace_exclusive(i3, event):
|
||
"""Move new windows on a new workspace instead of sharing a workspace
|
||
with an exclusive app."""
|
||
w = event.container
|
||
|
||
def can_intrude(w):
|
||
"""Can this new window intrude any workspace?"""
|
||
if w.floating in {"auto_on", "user_on"}:
|
||
return True
|
||
if w.ipc_data["window_type"] not in {"normal", "splash", "unknown"}:
|
||
return True
|
||
if w.sticky:
|
||
return True
|
||
ids = {
|
||
s is not None and s.lower() or None
|
||
for s in {w.name, w.window_class, w.window_instance}
|
||
}
|
||
if ids.intersection(intrusive_apps):
|
||
return True
|
||
|
||
# Can the new window just intrude?
|
||
if can_intrude(w):
|
||
logger.debug(f"window {w.name} can intrude")
|
||
return
|
||
|
||
# Does the current workspace contains an exclusive app?
|
||
tree = await i3.get_tree()
|
||
workspace = tree.find_focused().workspace()
|
||
if not workspace:
|
||
return
|
||
ids = {
|
||
s is not None and s.lower() or None
|
||
for ow in workspace.leaves()
|
||
for s in {ow.name, ow.window_class, ow.window_instance}
|
||
if w.id != ow.id
|
||
}
|
||
exclusives = ids.intersection(exclusive_apps)
|
||
if not exclusives:
|
||
logger.debug("no exclusive app, {w.name} can go there")
|
||
return
|
||
|
||
# Create a new workspace and move the window here
|
||
num = await _new_workspace(i3)
|
||
logger.info(f"move window {w.name} to workspace {num}")
|
||
await w.command(f'move container to workspace number "{num}"')
|
||
|
||
|
||
@on(CommandEvent("quake-console"))
|
||
async def quake_console(i3, event):
|
||
"""Spawn a quake console or toggle an existing one."""
|
||
try:
|
||
_, term_exec, term_name, height = event.split(":")
|
||
height = float(height)
|
||
except Exception as exc:
|
||
logger.warn(f"unable to parse payload {event}: {exc}")
|
||
return
|
||
|
||
tree = await i3.get_tree()
|
||
term = tree.find_instanced(term_name)
|
||
if not term:
|
||
await i3.command(f"exec exec {term_exec} --name {term_name}")
|
||
tries = 5
|
||
while not term and tries:
|
||
tree = await i3.get_tree()
|
||
term = tree.find_instanced(term_name)
|
||
await asyncio.sleep(0.2)
|
||
tries -= 1
|
||
if not term:
|
||
raise RuntimeError("unable to spawn terminal")
|
||
term = term[0]
|
||
workspaces = await i3.get_workspaces()
|
||
workspace = [ws for ws in workspaces if ws.focused][0]
|
||
ws_x = workspace.rect.x
|
||
ws_y = workspace.rect.y
|
||
ws_width = workspace.rect.width
|
||
ws_height = workspace.rect.height
|
||
width = ws_width
|
||
height = int(ws_height * height)
|
||
posx = ws_x
|
||
posy = ws_y
|
||
command = (
|
||
f"[instance={term_name}] "
|
||
"border none,"
|
||
f"resize set {width} px {height} px,"
|
||
"scratchpad show,"
|
||
f"move absolute position {posx}px {posy}px"
|
||
)
|
||
logger.debug(f"QuakeConsole: {command}")
|
||
await i3.command(command)
|
||
|
||
|
||
@on(CommandEvent("container-info"))
|
||
async def container_info(i3, event):
|
||
"""Show information about the focused container."""
|
||
tree = await i3.get_tree()
|
||
window = tree.find_focused()
|
||
if not window:
|
||
return
|
||
logger.info(f"window raw information: {window.ipc_data}")
|
||
summary = "About focused container"
|
||
r = window.rect
|
||
w = window
|
||
info = {
|
||
"name": w.name,
|
||
"title": w.window_title,
|
||
"class": w.window_class,
|
||
"instance": w.window_instance,
|
||
"role": w.window_role,
|
||
"type": w.ipc_data["window_type"],
|
||
"sticky": w.sticky,
|
||
"floating": w.floating,
|
||
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
|
||
"layout": w.layout,
|
||
"parcent": w.percent,
|
||
"marks": ", ".join(w.marks) or "(none)",
|
||
}
|
||
body = "\n".join(
|
||
(
|
||
f"<tt>{k:10}</tt> {html.escape(str(v))}"
|
||
for k, v in info.items()
|
||
if v is not None
|
||
)
|
||
)
|
||
result = await notify(
|
||
i3,
|
||
app_icon="system-search",
|
||
expire_timeout=10000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=getattr(container_info, "last_id", 0),
|
||
)
|
||
container_info.last_id = result[0]
|
||
|
||
|
||
@on(CommandEvent("workspace-info"))
|
||
async def workspace_info(i3, event):
|
||
"""Show information about the focused workspace."""
|
||
workspaces = await i3.get_workspaces()
|
||
focused = [w for w in workspaces if w.focused]
|
||
if not focused:
|
||
return
|
||
workspace = focused[0]
|
||
summary = f"Workspace {workspace.num} on {workspace.output}"
|
||
tree = await i3.get_tree()
|
||
workspace = [w for w in tree.workspaces() if w.num == workspace.num]
|
||
|
||
def format(container):
|
||
if container.focused:
|
||
style = 'foreground="#ffaf00"'
|
||
elif not container.window:
|
||
style = 'foreground="#6c98ee"'
|
||
else:
|
||
style = ""
|
||
if container.window:
|
||
content = (
|
||
f"{(container.window_class or '???').lower()}: "
|
||
f"{(container.window_title or '???')}"
|
||
)
|
||
elif container.type == "workspace" and not container.nodes:
|
||
# Empty workspaces use workspace_layout, but when default,
|
||
# this is layout...
|
||
layout = container.ipc_data["workspace_layout"]
|
||
if layout == "default":
|
||
layout = container.layout
|
||
content = f"({layout})"
|
||
else:
|
||
content = f"({container.layout})"
|
||
root = f"<span {style}>{content.lower()}</span>"
|
||
children = []
|
||
for child in container.nodes:
|
||
if child == container.nodes[-1]:
|
||
first = "└─"
|
||
others = " "
|
||
else:
|
||
first = "├─"
|
||
others = "│ "
|
||
content = format(child).replace("\n", f"\n{others}")
|
||
children.append(f"<tt>{first}</tt>{content}")
|
||
children.insert(0, root)
|
||
return "\n".join(children)
|
||
|
||
body = format(workspace[0]).lstrip("\n")
|
||
result = await notify(
|
||
i3,
|
||
app_icon="system-search",
|
||
expire_timeout=20000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=getattr(workspace_info, "last_id", 0),
|
||
)
|
||
workspace_info.last_id = result[0]
|
||
|
||
|
||
@on(I3Event.OUTPUT)
|
||
@debounce(2)
|
||
async def output_update(i3, event):
|
||
"""React to a XRandR change."""
|
||
logger.info("XRandR change detected")
|
||
cmds = (
|
||
"systemctl --user reload --no-block xsettingsd.service",
|
||
"systemctl --user start --no-block wallpaper.service",
|
||
)
|
||
for cmd in cmds:
|
||
proc = subprocess.run(shlex.split(cmd))
|
||
if proc.returncode != 0:
|
||
logger.warning(f"{cmd} exited with {proc.returncode}")
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
path="/org/bluez",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
)
|
||
)
|
||
async def bluetooth_notifications(
|
||
i3, event, path, interface, changed, invalid
|
||
):
|
||
"""Display notifications related to Bluetooth state."""
|
||
if interface != "org.bluez.Device1":
|
||
return
|
||
if "Connected" not in changed:
|
||
return
|
||
logger.info(changed["Connected"])
|
||
peer = i3.system_bus["org.bluez"][path]
|
||
obd = await peer.get_async_interface(interface)
|
||
name = await obd.Name
|
||
icon = await obd.Icon
|
||
state = await obd.Connected
|
||
state = "connected" if state else "disconnected"
|
||
await notify(
|
||
i3,
|
||
app_icon=icon,
|
||
summary=name,
|
||
body=f"Bluetooth device {state}",
|
||
)
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
path="/",
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
)
|
||
)
|
||
async def network_manager_notifications(i3, event, path, state, reason):
|
||
"""Display notifications related to Network Manager state."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
logger.debug(f"from {path} state: {state}, reason: {reason}")
|
||
if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}:
|
||
# Deactivated state does not contain enough information,
|
||
# unless we maintain state.
|
||
return
|
||
peer = i3.system_bus[ofnm][path]
|
||
try:
|
||
nmca = await peer.get_async_interface(f"{ofnm}.Connection.Active")
|
||
except dbussy.DBusError:
|
||
logger.info(f"interface {path} has vanished")
|
||
return
|
||
kind = await nmca.Type
|
||
id = await nmca.Id
|
||
if kind == "vpn":
|
||
await notify(
|
||
i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!"
|
||
)
|
||
elif kind == "802-3-ethernet":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wired",
|
||
summary=f"{id}",
|
||
body="Ethernet connection established!",
|
||
)
|
||
elif kind == "802-11-wireless":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wireless",
|
||
summary=f"{id}",
|
||
body="Wireless connection established!",
|
||
)
|
||
|
||
|
||
@on(
|
||
StartEvent,
|
||
DBusSignal(
|
||
path="/",
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
),
|
||
DBusSignal(
|
||
path="/",
|
||
interface="org.freedesktop.NetworkManager.AccessPoint",
|
||
member="PropertiesChanged",
|
||
signature="a{sv}",
|
||
),
|
||
)
|
||
@debounce(
|
||
1,
|
||
unless=lambda i3, event, *args: (
|
||
isinstance(event, DBusSignal) and event.interface.endswith(".Active")
|
||
),
|
||
retry=2,
|
||
)
|
||
async def network_manager_status(i3, event, *args):
|
||
"""Compute network manager status."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
status = []
|
||
|
||
# Build status from devices
|
||
bus = i3.system_bus[ofnm]
|
||
nm = await bus["/org/freedesktop/NetworkManager"].get_async_interface(ofnm)
|
||
devices = await nm.AllDevices
|
||
for device in devices:
|
||
nmd = await bus[device].get_async_interface(f"{ofnm}.Device")
|
||
kind = await nmd.DeviceType
|
||
state = await nmd.State
|
||
if state == NM_DEVICE_STATE_UNMANAGED:
|
||
continue
|
||
if kind == NM_DEVICE_TYPE_WIFI:
|
||
if state != NM_DEVICE_STATE_ACTIVATED:
|
||
status.append(icons["nowifi"])
|
||
continue
|
||
nmw = await bus[device].get_async_interface(
|
||
f"{ofnm}.Device.Wireless"
|
||
)
|
||
ap = await nmw.ActiveAccessPoint
|
||
if not ap:
|
||
status.append(icons["nowifi"])
|
||
continue
|
||
network_manager_status.active_ap = ap
|
||
nmap = await bus[ap].get_async_interface(f"{ofnm}.AccessPoint")
|
||
name = await nmap.Ssid
|
||
strength = int(await nmap.Strength)
|
||
status.append(
|
||
[
|
||
icons["wifi-low"],
|
||
icons["wifi-medium"],
|
||
icons["wifi-high"],
|
||
][strength // 34]
|
||
)
|
||
status.append(
|
||
bytes(name)
|
||
.decode("utf-8", errors="replace")
|
||
.replace("%", "%%")
|
||
)
|
||
elif (
|
||
kind == NM_DEVICE_TYPE_ETHERNET
|
||
and state == NM_DEVICE_STATE_ACTIVATED
|
||
):
|
||
status.append(icons["wired"])
|
||
|
||
# Build status for VPN connection
|
||
connections = await nm.ActiveConnections
|
||
for conn in connections:
|
||
nma = await bus[conn].get_async_interface(f"{ofnm}.Connection.Active")
|
||
vpn = await nma.Vpn
|
||
if vpn:
|
||
state = await nma.State
|
||
if state == NM_ACTIVE_CONNECTION_STATE_ACTIVATED:
|
||
status.append(icons["vpn"])
|
||
status.append(await nma.Id)
|
||
|
||
# Final status line
|
||
status = " ".join(status)
|
||
last = getattr(network_manager_status, "last", None)
|
||
|
||
if status != last:
|
||
logger.info("updated network status")
|
||
|
||
# Update cache file (for when polybar restarts)
|
||
with open(
|
||
f"{os.getenv('XDG_RUNTIME_DIR')}/i3/network.txt", "w"
|
||
) as out:
|
||
out.write(status)
|
||
|
||
# Send it to polybar's module/network
|
||
for name in glob.glob("/tmp/polybar_mqueue.*"):
|
||
try:
|
||
with open(
|
||
os.open(name, os.O_WRONLY | os.O_NONBLOCK), "w"
|
||
) as out:
|
||
cmd = f"action:#network.send.{status}"
|
||
out.write(cmd)
|
||
except OSError as e:
|
||
if e.errno != errno.ENXIO:
|
||
raise
|
||
|
||
network_manager_status.last = status
|
||
|
||
|
||
async def main(options):
|
||
i3 = await Connection().connect()
|
||
i3.session_bus = await ravel.session_bus_async()
|
||
i3.system_bus = await ravel.system_bus_async()
|
||
|
||
# Regular events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, I3Event):
|
||
i3.on(event, fn)
|
||
|
||
# React to some bindings
|
||
async def binding_event(i3, event):
|
||
"""Process a binding event."""
|
||
# We only processes it when it is a nop command and we use
|
||
# this mechanism as an IPC mechanism. The alternative would be
|
||
# to use ticks but we would need to spawn an i3-msg process
|
||
# for that.
|
||
cmd = event.binding.command
|
||
if not cmd.startswith("nop "):
|
||
return
|
||
cmd = cmd[4:].strip("\"'")
|
||
if not cmd:
|
||
return
|
||
kind = cmd.split(":")[0]
|
||
for fn, events in on.functions.items():
|
||
for e in events:
|
||
if isinstance(e, CommandEvent) and e.name == kind:
|
||
await fn(i3, cmd)
|
||
|
||
i3.on(I3Event.BINDING, binding_event)
|
||
|
||
# Listen to DBus events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, DBusSignal):
|
||
for bus in {i3.session_bus, i3.system_bus}:
|
||
|
||
def wrapping(fn, event):
|
||
@ravel.signal(
|
||
name=event.member,
|
||
in_signature=event.signature,
|
||
path_keyword="path",
|
||
args_keyword="args",
|
||
)
|
||
async def wrapped(path, args):
|
||
return await fn(i3, event, path, *args)
|
||
|
||
return wrapped
|
||
|
||
bus.listen_signal(
|
||
path=event.path,
|
||
fallback=True,
|
||
interface=event.interface,
|
||
name=event.member,
|
||
func=wrapping(fn, event),
|
||
)
|
||
|
||
# Run events that should run on start
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if event is StartEvent:
|
||
asyncio.create_task(fn(i3, event))
|
||
|
||
await i3.main()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Parse
|
||
description = sys.modules[__name__].__doc__
|
||
for fn, events in on.functions.items():
|
||
description += f" {fn.__doc__}"
|
||
parser = argparse.ArgumentParser(description=description)
|
||
parser.add_argument(
|
||
"--debug",
|
||
"-d",
|
||
action="store_true",
|
||
default=False,
|
||
help="enable debugging",
|
||
)
|
||
options = parser.parse_args()
|
||
|
||
# Logging
|
||
root = logging.getLogger("")
|
||
root.setLevel(logging.WARNING)
|
||
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
|
||
if sys.stderr.isatty():
|
||
ch = logging.StreamHandler()
|
||
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
|
||
root.addHandler(ch)
|
||
else:
|
||
root.addHandler(journal.JournalHandler(SYSLOG_IDENTIFIER=logger.name))
|
||
|
||
try:
|
||
asyncio.run(main(options))
|
||
except Exception as e:
|
||
logger.exception("%s", e)
|
||
sys.exit(1)
|
||
sys.exit(0)
|