i3-companion: decorator to declare static variables

This seems cleaner than using `getattr()` to initialize the variable
inside the function.

Also, the debouncer function does not need this and can work with a
dictionary mapping functions to their worker.
This commit is contained in:
Vincent Bernat 2021-07-14 11:08:39 +02:00
parent 4b1af45c1b
commit 0e95635691

View file

@ -89,6 +89,18 @@ NM_DEVICE_STATE_ACTIVATED = 100
# Event helpers
def static(**kwargs):
"""Define static variables for the event handler."""
def decorator(fn):
for k, v in kwargs.items():
setattr(fn, k, v)
return fn
return decorator
@static(functions={})
def on(*events):
"""Tag events that should be provided to the function."""
@ -97,7 +109,6 @@ def on(*events):
def wrapper(*args, **kwargs):
return fn(*args, **kwargs)
on.functions = getattr(on, "functions", {})
on.functions[fn] = events
return wrapper
@ -119,14 +130,14 @@ def debounce(sleep, *, unless=None, retry=0):
try:
# Wait for an urgent work or until sleep is elapsed
await asyncio.wait_for(
fn.worker.urgent.wait(), timeout=sleep
workers[fn].urgent.wait(), timeout=sleep
)
logger.debug(f"urgent work received for {fn}")
except asyncio.TimeoutError:
pass
retry, args, kwargs = fn.worker.queue
fn.worker.queue = None
fn.worker.urgent.clear()
retry, args, kwargs = workers[fn].queue
workers[fn].queue = None
workers[fn].urgent.clear()
# Execute the work
logger.debug(f"execute work for {fn}")
@ -144,40 +155,41 @@ def debounce(sleep, *, unless=None, retry=0):
)
# Retry, unless we have something already scheduled
if fn.worker.queue is not None:
if workers[fn].queue is not None:
logger.debug(f"retry now with queued event for {fn}")
fn.worker.urgent.set()
workers[fn].urgent.set()
continue
logger.debug(f"reschedule retry for {fn}")
fn.worker.queue = (retry, args, kwargs)
workers[fn].queue = (retry, args, kwargs)
if unless is not None and unless(*args, **kwargs):
logger.debug(f"wake up now for retry of {fn}")
fn.worker.urgent.set()
workers[fn].urgent.set()
# Do we still have something to do?
if fn.worker.queue is None:
if workers[fn].queue is None:
break
# No more work
logger.debug(f"no more work for {fn}")
fn.worker = None
workers[fn] = None
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
if fn.worker is None:
if workers[fn] is None:
logger.debug(f"create new worker for {fn}")
fn.worker = types.SimpleNamespace()
fn.worker.task = asyncio.create_task(worker())
fn.worker.urgent = asyncio.Event()
fn.worker.queue = (retry, args, kwargs)
workers[fn] = types.SimpleNamespace()
workers[fn].task = asyncio.create_task(worker())
workers[fn].urgent = asyncio.Event()
workers[fn].queue = (retry, args, kwargs)
else:
logger.debug(f"enqueue new work for {fn}")
if unless is not None and unless(*args, **kwargs):
logger.debug(f"wake up now for {fn}")
fn.worker.urgent.set()
workers[fn].urgent.set()
fn.worker = None
workers[fn] = None
return wrapper
workers = {}
return decorator
@ -389,6 +401,7 @@ async def quake_console(i3, event):
@on(CommandEvent("container-info"))
@static(last_id=0)
async def container_info(i3, event):
"""Show information about the focused container."""
tree = await i3.get_tree()
@ -426,12 +439,13 @@ async def container_info(i3, event):
expire_timeout=10000,
summary=summary,
body=body,
replaces_id=getattr(container_info, "last_id", 0),
replaces_id=container_info.last_id,
)
container_info.last_id = result[0]
@on(CommandEvent("workspace-info"))
@static(last_id=0)
async def workspace_info(i3, event):
"""Show information about the focused workspace."""
workspaces = await i3.get_workspaces()
@ -485,7 +499,7 @@ async def workspace_info(i3, event):
expire_timeout=20000,
summary=summary,
body=body,
replaces_id=getattr(workspace_info, "last_id", 0),
replaces_id=workspace_info.last_id,
)
workspace_info.last_id = result[0]
@ -594,6 +608,7 @@ async def network_manager_notifications(i3, event, path, state, reason):
signature="a{sv}",
),
)
@static(last=None)
@debounce(
1,
unless=lambda i3, event, *args: (
@ -662,9 +677,8 @@ async def network_manager_status(i3, event, *args):
# Final status line
status = " ".join(status)
last = getattr(network_manager_status, "last", None)
if status != last:
if status != network_manager_status.last:
logger.info("updated network status")
polybar("network", status)
network_manager_status.last = status