mirror of
https://github.com/vincentbernat/i3wm-configuration.git
synced 2025-07-01 05:44:20 +02:00
This is a good example of using PropertiesChanged. I don't know if it's possible to filter the interface we want to receive.
718 lines
22 KiB
Python
Executable file
718 lines
22 KiB
Python
Executable file
#!/usr/bin/python3
|
||
|
||
"""Personal i3 companion."""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import collections
|
||
import errno
|
||
import functools
|
||
import glob
|
||
import html
|
||
import logging
|
||
import logging.handlers
|
||
import os
|
||
import re
|
||
import shlex
|
||
import subprocess
|
||
import sys
|
||
|
||
import i3ipc
|
||
from i3ipc.aio import Connection
|
||
from systemd import journal
|
||
import ravel
|
||
import dbussy
|
||
|
||
|
||
class Icon(str):
|
||
def __new__(cls, font, char):
|
||
return str.__new__(cls, "%%{T%s}%s%%{T-}" % (font, char))
|
||
|
||
|
||
# See https://fontawesome.com/v6.0/icons, number is the font number in
|
||
# polybar configuration.
|
||
application_icons = {
|
||
"chromium": Icon(2, ""),
|
||
"discord": Icon(2, ""),
|
||
"emacs": Icon(1, ""),
|
||
"firefox": Icon(2, ""),
|
||
"gimp": Icon(1, ""),
|
||
"gitg": Icon(1, ""),
|
||
"google-chrome": Icon(2, ""),
|
||
"inkscape": Icon(1, ""),
|
||
"libreoffice": Icon(1, ""),
|
||
"mpv": Icon(1, ""),
|
||
"pavucontrol": Icon(1, ""),
|
||
"signal": Icon(1, ""),
|
||
"snes9x-gtk": Icon(1, ""),
|
||
"spotify": Icon(2, ""),
|
||
"steam": Icon(2, ""),
|
||
"vbeterm": Icon(1, ""),
|
||
"zathura": Icon(1, ""),
|
||
"zoom": Icon(1, ""),
|
||
}
|
||
icons = {
|
||
"nowifi": Icon(1, ""),
|
||
"vpn": Icon(1, ""),
|
||
"wifi-low": Icon(1, ""),
|
||
"wifi-medium": Icon(1, ""),
|
||
"wifi-high": Icon(1, ""),
|
||
"wired": Icon(1, ""),
|
||
}
|
||
application_icons_nomatch = Icon(1, "")
|
||
application_icons_alone = {application_icons[k] for k in {"vbeterm"}}
|
||
exclusive_apps = {"emacs", "firefox"}
|
||
intrusive_apps = {"vbeterm"}
|
||
|
||
logger = logging.getLogger("i3-companion")
|
||
|
||
# Events for @on decorator
|
||
DBusSignal = collections.namedtuple(
|
||
"DBusSignal", ["path", "interface", "member", "signature"]
|
||
)
|
||
StartEvent = object()
|
||
I3Event = i3ipc.Event
|
||
CommandEvent = collections.namedtuple("CommandEvent", ["name"])
|
||
|
||
NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2
|
||
NM_DEVICE_TYPE_ETHERNET = 1
|
||
NM_DEVICE_TYPE_WIFI = 2
|
||
NM_DEVICE_TYPE_MODEM = 8
|
||
NM_DEVICE_STATE_UNMANAGED = 10
|
||
NM_DEVICE_STATE_ACTIVATED = 100
|
||
|
||
|
||
def on(*events):
|
||
"""Tag events that should be provided to the function."""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(*args, **kwargs):
|
||
return fn(*args, **kwargs)
|
||
|
||
on.functions = getattr(on, "functions", {})
|
||
on.functions[fn] = events
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
async def notify(i3, **kwargs):
|
||
"""Send a notification with notify-send."""
|
||
peer = i3.session_bus["org.freedesktop.Notifications"]
|
||
peer = peer["/org/freedesktop/Notifications"]
|
||
notifications = await peer.get_async_interface(
|
||
"org.freedesktop.Notifications"
|
||
)
|
||
parameters = dict(
|
||
app_name=logger.name,
|
||
replaces_id=0,
|
||
app_icon="dialog-information",
|
||
summary="",
|
||
actions=[],
|
||
hints={},
|
||
expire_timeout=5000,
|
||
)
|
||
parameters.update(kwargs)
|
||
return await notifications.Notify(**parameters)
|
||
|
||
|
||
@on(StartEvent, I3Event.WINDOW_MOVE, I3Event.WINDOW_NEW, I3Event.WINDOW_CLOSE)
|
||
async def workspace_rename(i3, event):
|
||
"""Rename workspaces using icons to match what's inside it."""
|
||
tree = await i3.get_tree()
|
||
workspaces = tree.workspaces()
|
||
commands = []
|
||
|
||
def application_icon(window):
|
||
"""Get application icon for a window."""
|
||
for attr in ("window_instance", "window_class"):
|
||
name = getattr(window, attr, None)
|
||
if name is None:
|
||
continue
|
||
for k, v in application_icons.items():
|
||
if re.match(rf"^{k}\b", name, re.IGNORECASE):
|
||
logger.debug(f"in {attr}, found '{name}', matching {k}")
|
||
return v
|
||
return application_icons_nomatch
|
||
|
||
for workspace in workspaces:
|
||
icons = set()
|
||
for window in workspace.leaves():
|
||
icon = application_icon(window)
|
||
if icon is not None:
|
||
icons.add(icon)
|
||
if any([i not in application_icons_alone for i in icons]):
|
||
icons -= application_icons_alone
|
||
new_name = f"{workspace.num}:{'|'.join(icons)}".rstrip(":")
|
||
if workspace.name != new_name:
|
||
logger.debug(f"rename workspace {workspace.num}")
|
||
command = f'rename workspace "{workspace.name}" to "{new_name}"'
|
||
commands.append(command)
|
||
await i3.command(";".join(commands))
|
||
|
||
|
||
async def _new_workspace(i3):
|
||
workspaces = await i3.get_workspaces()
|
||
workspace_nums = {w.num for w in workspaces}
|
||
max_num = max(workspace_nums)
|
||
available = (set(range(1, max_num + 2)) - workspace_nums).pop()
|
||
logger.info(f"create new workspace number {available}")
|
||
await i3.command(f'workspace number "{available}"')
|
||
return available
|
||
|
||
|
||
@on(CommandEvent("new-workspace"), CommandEvent("move-to-new-workspace"))
|
||
async def new_workspace(i3, event):
|
||
"""Create a new workspace and optionally move a window to it."""
|
||
# Get the currently focused window
|
||
if event == "move-to-new-workspace":
|
||
tree = await i3.get_tree()
|
||
current = tree.find_focused()
|
||
if not current:
|
||
return
|
||
|
||
num = await _new_workspace(i3)
|
||
|
||
# Move the window to this workspace
|
||
if event == "move-to-new-workspace":
|
||
await current.command(
|
||
f"move container to workspace " f'number "{num}"'
|
||
)
|
||
|
||
|
||
@on(I3Event.WINDOW_NEW)
|
||
async def worksplace_exclusive(i3, event):
|
||
"""Move new windows on a new workspace instead of sharing a workspace
|
||
with an exclusive app."""
|
||
w = event.container
|
||
|
||
def can_intrude(w):
|
||
"""Can this new window intrude any workspace?"""
|
||
if w.floating in {"auto_on", "user_on"}:
|
||
return True
|
||
if w.ipc_data["window_type"] not in {"normal", "splash", "unknown"}:
|
||
return True
|
||
if w.sticky:
|
||
return True
|
||
ids = {
|
||
s is not None and s.lower() or None
|
||
for s in {w.name, w.window_class, w.window_instance}
|
||
}
|
||
if ids.intersection(intrusive_apps):
|
||
return True
|
||
|
||
# Can the new window just intrude?
|
||
if can_intrude(w):
|
||
logger.debug("window {w.name} can intrude")
|
||
return
|
||
|
||
# Does the current workspace contains an exclusive app?
|
||
tree = await i3.get_tree()
|
||
workspace = tree.find_focused().workspace()
|
||
if not workspace:
|
||
return
|
||
ids = {
|
||
s is not None and s.lower() or None
|
||
for ow in workspace.leaves()
|
||
for s in {ow.name, ow.window_class, ow.window_instance}
|
||
if w.id != ow.id
|
||
}
|
||
exclusives = ids.intersection(exclusive_apps)
|
||
if not exclusives:
|
||
logger.debug("no exclusive app, {w.name} can go there")
|
||
return
|
||
|
||
# Create a new workspace and move the window here
|
||
num = await _new_workspace(i3)
|
||
logger.info(f"move window {w.name} to workspace {num}")
|
||
await w.command(f'move container to workspace number "{num}"')
|
||
|
||
|
||
@on(CommandEvent("quake-console"))
|
||
async def quake_console(i3, event):
|
||
"""Spawn a quake console or toggle an existing one."""
|
||
try:
|
||
_, term_exec, term_name, height = event.split(":")
|
||
height = float(height)
|
||
except Exception as exc:
|
||
logger.warn(f"unable to parse payload {event}: {exc}")
|
||
return
|
||
|
||
tree = await i3.get_tree()
|
||
term = tree.find_instanced(term_name)
|
||
if not term:
|
||
await i3.command(f"exec exec {term_exec} --name {term_name}")
|
||
tries = 5
|
||
while not term and tries:
|
||
tree = await i3.get_tree()
|
||
term = tree.find_instanced(term_name)
|
||
await asyncio.sleep(0.2)
|
||
tries -= 1
|
||
if not term:
|
||
raise RuntimeError("unable to spawn terminal")
|
||
term = term[0]
|
||
workspaces = await i3.get_workspaces()
|
||
workspace = [ws for ws in workspaces if ws.focused][0]
|
||
ws_x = workspace.rect.x
|
||
ws_y = workspace.rect.y
|
||
ws_width = workspace.rect.width
|
||
ws_height = workspace.rect.height
|
||
width = ws_width
|
||
height = int(ws_height * height)
|
||
posx = ws_x
|
||
posy = ws_y
|
||
command = (
|
||
f"[instance={term_name}] "
|
||
"border none,"
|
||
f"resize set {width} px {height} px,"
|
||
"scratchpad show,"
|
||
f"move absolute position {posx}px {posy}px"
|
||
)
|
||
logger.debug(f"QuakeConsole: {command}")
|
||
await i3.command(command)
|
||
|
||
|
||
@on(CommandEvent("container-info"))
|
||
async def container_info(i3, event):
|
||
"""Show information about the focused container."""
|
||
tree = await i3.get_tree()
|
||
window = tree.find_focused()
|
||
if not window:
|
||
return
|
||
logger.info(f"window raw information: {window.ipc_data}")
|
||
summary = "About focused container"
|
||
r = window.rect
|
||
w = window
|
||
info = {
|
||
"name": w.name,
|
||
"title": w.window_title,
|
||
"class": w.window_class,
|
||
"instance": w.window_instance,
|
||
"role": w.window_role,
|
||
"type": w.ipc_data["window_type"],
|
||
"sticky": w.sticky,
|
||
"floating": w.floating,
|
||
"geometry": f"{r.width}×{r.height}+{r.x}+{r.y}",
|
||
"layout": w.layout,
|
||
"parcent": w.percent,
|
||
"marks": ", ".join(w.marks) or "(none)",
|
||
}
|
||
body = "\n".join(
|
||
(
|
||
f"<tt>{k:10}</tt> {html.escape(str(v))}"
|
||
for k, v in info.items()
|
||
if v is not None
|
||
)
|
||
)
|
||
result = await notify(
|
||
i3,
|
||
app_icon="system-search",
|
||
expire_timeout=10000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=getattr(container_info, "last_id", 0),
|
||
)
|
||
container_info.last_id = result[0]
|
||
|
||
|
||
@on(CommandEvent("workspace-info"))
|
||
async def workspace_info(i3, event):
|
||
"""Show information about the focused workspace."""
|
||
workspaces = await i3.get_workspaces()
|
||
focused = [w for w in workspaces if w.focused]
|
||
if not focused:
|
||
return
|
||
workspace = focused[0]
|
||
summary = f"Workspace {workspace.num} on {workspace.output}"
|
||
tree = await i3.get_tree()
|
||
workspace = [w for w in tree.workspaces() if w.num == workspace.num]
|
||
|
||
def format(container):
|
||
if container.focused:
|
||
style = 'foreground="#ffaf00"'
|
||
elif not container.window:
|
||
style = 'foreground="#6c98ee"'
|
||
else:
|
||
style = ""
|
||
if container.window:
|
||
content = (
|
||
f"{(container.window_class or '???').lower()}: "
|
||
f"{(container.window_title or '???')}"
|
||
)
|
||
elif container.type == "workspace" and not container.nodes:
|
||
# Empty workspaces use workspace_layout, but when default,
|
||
# this is layout...
|
||
layout = container.ipc_data["workspace_layout"]
|
||
if layout == "default":
|
||
layout = container.layout
|
||
content = f"({layout})"
|
||
else:
|
||
content = f"({container.layout})"
|
||
root = f"<span {style}>{content.lower()}</span>"
|
||
children = []
|
||
for child in container.nodes:
|
||
if child == container.nodes[-1]:
|
||
first = "└─"
|
||
others = " "
|
||
else:
|
||
first = "├─"
|
||
others = "│ "
|
||
content = format(child).replace("\n", f"\n{others}")
|
||
children.append(f"<tt>{first}</tt>{content}")
|
||
children.insert(0, root)
|
||
return "\n".join(children)
|
||
|
||
body = format(workspace[0]).lstrip("\n")
|
||
result = await notify(
|
||
i3,
|
||
app_icon="system-search",
|
||
expire_timeout=20000,
|
||
summary=summary,
|
||
body=body,
|
||
replaces_id=getattr(workspace_info, "last_id", 0),
|
||
)
|
||
workspace_info.last_id = result[0]
|
||
|
||
|
||
@on(I3Event.OUTPUT)
|
||
async def output_update(i3, event):
|
||
"""React to a XRandR change."""
|
||
running = getattr(output_update, "running", None)
|
||
if running is not None:
|
||
running.cancel()
|
||
output_update.running = None
|
||
|
||
async def output_update_now():
|
||
try:
|
||
await asyncio.sleep(2)
|
||
except asyncio.CancelledError:
|
||
return
|
||
output_update.running = None
|
||
|
||
logger.info("XRandR change detected")
|
||
cmds = (
|
||
"systemctl --user reload --no-block xsettingsd.service",
|
||
"systemctl --user start --no-block wallpaper.service",
|
||
)
|
||
for cmd in cmds:
|
||
proc = subprocess.run(shlex.split(cmd))
|
||
if proc.returncode != 0:
|
||
logger.warning(f"{cmd} exited with {proc.returncode}")
|
||
|
||
logger.debug("schedule XRandR change")
|
||
output_update.running = asyncio.create_task(output_update_now())
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
path="/org/bluez",
|
||
interface="org.freedesktop.DBus.Properties",
|
||
member="PropertiesChanged",
|
||
signature="sa{sv}as",
|
||
)
|
||
)
|
||
async def bluetooth_notifications(
|
||
i3, event, path, interface, changed, invalid
|
||
):
|
||
"""Display notifications related to Bluetooth state."""
|
||
if interface != "org.bluez.Device1":
|
||
return
|
||
if not "Connected" in changed:
|
||
return
|
||
logger.info(changed["Connected"])
|
||
peer = i3.system_bus["org.bluez"][path]
|
||
obd = await peer.get_async_interface(interface)
|
||
name = await obd.Name
|
||
icon = await obd.Icon
|
||
state = await obd.Connected
|
||
state = "connected" if state else "disconnected"
|
||
await notify(
|
||
i3,
|
||
app_icon=icon,
|
||
summary=name,
|
||
body=f"Bluetooth device {state}",
|
||
)
|
||
|
||
|
||
@on(
|
||
DBusSignal(
|
||
path="/",
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
)
|
||
)
|
||
async def network_manager_notifications(i3, event, path, state, reason):
|
||
"""Display notifications related to Network Manager state."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
logger.debug(f"from {path} state: {state}, reason: {reason}")
|
||
if state not in {NM_ACTIVE_CONNECTION_STATE_ACTIVATED}:
|
||
# We cannot get proper state unless the connection is
|
||
# activated, unless we maintain state.
|
||
return
|
||
peer = i3.system_bus[ofnm][path]
|
||
try:
|
||
nmca = await peer.get_async_interface(f"{ofnm}.Connection.Active")
|
||
except dbussy.DBusError:
|
||
logger.info(f"interface {path} has vanished")
|
||
return
|
||
kind = await nmca.Type
|
||
id = await nmca.Id
|
||
if kind == "vpn":
|
||
await notify(
|
||
i3, app_icon="network-vpn", summary=f"{id}", body="VPN connected!"
|
||
)
|
||
elif kind == "802-3-ethernet":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wired",
|
||
summary=f"{id}",
|
||
body="Ethernet connection established!",
|
||
)
|
||
elif kind == "802-11-wireless":
|
||
await notify(
|
||
i3,
|
||
app_icon="network-wireless",
|
||
summary=f"{id}",
|
||
body="Wireless connection established!",
|
||
)
|
||
|
||
|
||
@on(
|
||
StartEvent,
|
||
DBusSignal(
|
||
path="/",
|
||
interface="org.freedesktop.NetworkManager.Connection.Active",
|
||
member="StateChanged",
|
||
signature="uu",
|
||
),
|
||
DBusSignal(
|
||
path="/",
|
||
interface="org.freedesktop.NetworkManager.AccessPoint",
|
||
member="PropertiesChanged",
|
||
signature="a{sv}",
|
||
),
|
||
)
|
||
async def network_manager_status(i3, event, *args):
|
||
"""Compute network manager status."""
|
||
ofnm = "org.freedesktop.NetworkManager"
|
||
|
||
# Dampen updates
|
||
running = getattr(network_manager_status, "running", None)
|
||
if running is not None:
|
||
running.cancel()
|
||
network_manager_status.running = None
|
||
|
||
async def network_manager_status_now(sleep=1):
|
||
try:
|
||
await asyncio.sleep(sleep)
|
||
except asyncio.CancelledError:
|
||
return
|
||
network_manager_status.running = None
|
||
try:
|
||
await _network_manager_status_now()
|
||
except Exception as e:
|
||
logger.warning("while updating network status: %s", str(e))
|
||
if network_manager_status.running is None:
|
||
network_manager_status.running = asyncio.create_task(
|
||
network_manager_status_now(5)
|
||
)
|
||
|
||
async def _network_manager_status_now():
|
||
status = []
|
||
|
||
# Build status from devices
|
||
bus = i3.system_bus[ofnm]
|
||
nm = await bus["/org/freedesktop/NetworkManager"].get_async_interface(
|
||
ofnm
|
||
)
|
||
devices = await nm.AllDevices
|
||
for device in devices:
|
||
nmd = await bus[device].get_async_interface(f"{ofnm}.Device")
|
||
kind = await nmd.DeviceType
|
||
state = await nmd.State
|
||
if state == NM_DEVICE_STATE_UNMANAGED:
|
||
continue
|
||
if kind == NM_DEVICE_TYPE_WIFI:
|
||
if state != NM_DEVICE_STATE_ACTIVATED:
|
||
status.append(icons["nowifi"])
|
||
continue
|
||
nmw = await bus[device].get_async_interface(
|
||
f"{ofnm}.Device.Wireless"
|
||
)
|
||
ap = await nmw.ActiveAccessPoint
|
||
if not ap:
|
||
status.append(icons["nowifi"])
|
||
continue
|
||
network_manager_status.active_ap = ap
|
||
nmap = await bus[ap].get_async_interface(f"{ofnm}.AccessPoint")
|
||
name = await nmap.Ssid
|
||
strength = int(await nmap.Strength)
|
||
status.append(
|
||
[
|
||
icons["wifi-low"],
|
||
icons["wifi-medium"],
|
||
icons["wifi-high"],
|
||
][strength // 34]
|
||
)
|
||
status.append(
|
||
bytes(name)
|
||
.decode("utf-8", errors="replace")
|
||
.replace("%", "%%")
|
||
)
|
||
elif (
|
||
kind == NM_DEVICE_TYPE_ETHERNET
|
||
and state == NM_DEVICE_STATE_ACTIVATED
|
||
):
|
||
status.append(icons["wired"])
|
||
|
||
# Build status for VPN connection
|
||
connections = await nm.ActiveConnections
|
||
for conn in connections:
|
||
nma = await bus[conn].get_async_interface(
|
||
f"{ofnm}.Connection.Active"
|
||
)
|
||
vpn = await nma.Vpn
|
||
if vpn:
|
||
state = await nma.State
|
||
if state == NM_ACTIVE_CONNECTION_STATE_ACTIVATED:
|
||
status.append(icons["vpn"])
|
||
status.append(await nma.Id)
|
||
|
||
# Final status line
|
||
status = " ".join(status)
|
||
last = getattr(network_manager_status, "last", None)
|
||
|
||
if status != last:
|
||
logger.info(f"network status: {status}")
|
||
|
||
# Update cache file (for when polybar restarts)
|
||
with open(
|
||
f"{os.getenv('XDG_RUNTIME_DIR')}/i3/network.txt", "w"
|
||
) as out:
|
||
out.write(status)
|
||
|
||
# Send it to polybar's module/network
|
||
for name in glob.glob("/tmp/polybar_mqueue.*"):
|
||
try:
|
||
with open(
|
||
os.open(name, os.O_WRONLY | os.O_NONBLOCK), "w"
|
||
) as out:
|
||
cmd = f"action:#network.send.{status}"
|
||
out.write(cmd)
|
||
except OSError as e:
|
||
if e.errno != errno.ENXIO:
|
||
raise
|
||
|
||
network_manager_status.last = status
|
||
|
||
if (
|
||
isinstance(event, DBusSignal)
|
||
and event.interface == f"{ofnm}.Connection.Active"
|
||
):
|
||
await network_manager_status_now(0)
|
||
else:
|
||
network_manager_status.running = asyncio.create_task(
|
||
network_manager_status_now()
|
||
)
|
||
|
||
|
||
async def main(options):
|
||
i3 = await Connection().connect()
|
||
i3.session_bus = await ravel.session_bus_async()
|
||
i3.system_bus = await ravel.system_bus_async()
|
||
|
||
# Regular events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, I3Event):
|
||
i3.on(event, fn)
|
||
|
||
# React to some bindings
|
||
async def binding_event(i3, event):
|
||
"""Process a binding event."""
|
||
# We only processes it when it is a nop command and we use
|
||
# this mechanism as an IPC mechanism. The alternative would be
|
||
# to use ticks but we would need to spawn an i3-msg process
|
||
# for that.
|
||
cmd = event.binding.command
|
||
if not cmd.startswith("nop "):
|
||
return
|
||
cmd = cmd[4:].strip("\"'")
|
||
if not cmd:
|
||
return
|
||
kind = cmd.split(":")[0]
|
||
for fn, events in on.functions.items():
|
||
for e in events:
|
||
if isinstance(e, CommandEvent) and e.name == kind:
|
||
await fn(i3, cmd)
|
||
|
||
i3.on(I3Event.BINDING, binding_event)
|
||
|
||
# Listen to DBus events
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if isinstance(event, DBusSignal):
|
||
for bus in {i3.session_bus, i3.system_bus}:
|
||
|
||
def wrapping(fn, event):
|
||
@ravel.signal(
|
||
name=event.member,
|
||
in_signature=event.signature,
|
||
path_keyword="path",
|
||
args_keyword="args",
|
||
)
|
||
async def wrapped(path, args):
|
||
return await fn(i3, event, path, *args)
|
||
|
||
return wrapped
|
||
|
||
bus.listen_signal(
|
||
path=event.path,
|
||
fallback=True,
|
||
interface=event.interface,
|
||
name=event.member,
|
||
func=wrapping(fn, event),
|
||
)
|
||
|
||
# Run events that should run on start
|
||
for fn, events in on.functions.items():
|
||
for event in events:
|
||
if event is StartEvent:
|
||
asyncio.create_task(fn(i3, event))
|
||
|
||
await i3.main()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Parse
|
||
description = sys.modules[__name__].__doc__
|
||
for fn, events in on.functions.items():
|
||
description += f" {fn.__doc__}"
|
||
parser = argparse.ArgumentParser(description=description)
|
||
parser.add_argument(
|
||
"--debug",
|
||
"-d",
|
||
action="store_true",
|
||
default=False,
|
||
help="enable debugging",
|
||
)
|
||
options = parser.parse_args()
|
||
|
||
# Logging
|
||
root = logging.getLogger("")
|
||
root.setLevel(logging.WARNING)
|
||
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
|
||
if sys.stdin.isatty():
|
||
ch = logging.StreamHandler()
|
||
ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
|
||
else:
|
||
root.addHandler(journal.JournalHandler(SYSLOG_IDENTIFIER=logger.name))
|
||
|
||
try:
|
||
asyncio.run(main(options))
|
||
except Exception as e:
|
||
logger.exception("%s", e)
|
||
sys.exit(1)
|
||
sys.exit(0)
|