i3-companion: move polybar handling into its own function

This commit is contained in:
Vincent Bernat 2021-07-13 08:10:58 +02:00
parent 71dcb5d6bc
commit 71a3d1d4a4

View file

@ -33,6 +33,8 @@ def icon(font_number, char):
return "%%{T%d}%s%%{T-}" % (font_number, char)
# Configuration
application_icons = {
"chromium": icon(2, ""),
"discord": icon(2, ""),
@ -84,6 +86,9 @@ NM_DEVICE_STATE_UNMANAGED = 10
NM_DEVICE_STATE_ACTIVATED = 100
# Event helpers
def on(*events):
"""Tag events that should be provided to the function."""
@ -176,6 +181,9 @@ def debounce(sleep, *, unless=None, retry=0):
return decorator
# Other helpers
async def notify(i3, **kwargs):
"""Send a notification with notify-send."""
peer = i3.session_bus["org.freedesktop.Notifications"]
@ -196,6 +204,26 @@ async def notify(i3, **kwargs):
return await notifications.Notify(**parameters)
def polybar(module, content):
"""Update Polybar module with the provided content."""
# Update cache file (for when polybar restarts)
with open(f"{os.getenv('XDG_RUNTIME_DIR')}/i3/{module}.txt", "w") as out:
out.write(content)
# Send it to polybar
for name in glob.glob("/tmp/polybar_mqueue.*"):
try:
with open(os.open(name, os.O_WRONLY | os.O_NONBLOCK), "w") as out:
cmd = f"action:#{module}.send.{content}"
out.write(cmd)
except OSError as e:
if e.errno != errno.ENXIO:
raise
# Event handlers
@on(StartEvent, I3Event.WINDOW_MOVE, I3Event.WINDOW_NEW, I3Event.WINDOW_CLOSE)
@debounce(0.2)
async def workspace_rename(i3, event):
@ -634,28 +662,13 @@ async def network_manager_status(i3, event, *args):
if status != last:
logger.info("updated network status")
# Update cache file (for when polybar restarts)
with open(
f"{os.getenv('XDG_RUNTIME_DIR')}/i3/network.txt", "w"
) as out:
out.write(status)
# Send it to polybar's module/network
for name in glob.glob("/tmp/polybar_mqueue.*"):
try:
with open(
os.open(name, os.O_WRONLY | os.O_NONBLOCK), "w"
) as out:
cmd = f"action:#network.send.{status}"
out.write(cmd)
except OSError as e:
if e.errno != errno.ENXIO:
raise
polybar("network", status)
network_manager_status.last = status
# Main function
async def main(options):
i3 = await Connection().connect()
i3.session_bus = await ravel.session_bus_async()