Revert "i3-companion: revert back to using rfcomm to get battery"

This reverts commit 4ac86d1632. Let's
try again with Pipewire this time!
This commit is contained in:
Vincent Bernat 2022-04-30 15:17:23 +02:00
parent 24219e2382
commit 75f5a9cdda

View file

@ -739,18 +739,16 @@ async def bluetooth_notifications(i3, event, path, interface, changed, invalid):
and "Connected" in args[1] and "Connected" in args[1]
or args[0] == "org.bluez.Adapter1" or args[0] == "org.bluez.Adapter1"
and "Powered" in args[1] and "Powered" in args[1]
or args[0] == "org.bluez.Battery1"
and "Percentage" in args[1]
), ),
), ),
) )
@static(scheduled=None)
@retry(2) @retry(2)
@debounce(0.2) @debounce(4)
@polybar("bluetooth") @polybar("bluetooth")
async def bluetooth_status(i3, event, *args): async def bluetooth_status(i3, event, *args):
"""Update bluetooth status for Polybar.""" """Update bluetooth status for Polybar."""
if bluetooth_status.scheduled:
bluetooth_status.scheduled.cancel()
bluetooth_status.scheduled = None
if event is StartEvent: if event is StartEvent:
# Do we have a bluetooth device? # Do we have a bluetooth device?
if not os.path.exists("/sys/class/bluetooth"): if not os.path.exists("/sys/class/bluetooth"):
@ -772,17 +770,18 @@ async def bluetooth_status(i3, event, *args):
powered = True powered = True
elif "org.bluez.Device1" in interfaces: elif "org.bluez.Device1" in interfaces:
# We have a device! # We have a device!
device = interfaces["org.bluez.Device1"] device = types.SimpleNamespace(major=0, minor=0, battery=None)
if not device["Connected"][1]: interface = interfaces["org.bluez.Device1"]
if not interface["Connected"][1]:
continue continue
try: try:
device_class = device["Class"][1] device_class = interface["Class"][1]
major = (device_class & 0x1F00) >> 8 device.major = (device_class & 0x1F00) >> 8
minor = (device_class & 0xFC) >> 2 device.minor = (device_class & 0xFC) >> 2
mac = device["Address"][1] device.battery = interfaces["org.bluez.Battery1"]["Percentage"][1]
devices.append((major, minor, mac))
except KeyError: except KeyError:
devices.append((0, 0, "")) pass
devices.append(device)
# Choose appropriate icons for output # Choose appropriate icons for output
# See: https://btprodspecificationrefs.blob.core.windows.net/assigned-numbers # See: https://btprodspecificationrefs.blob.core.windows.net/assigned-numbers
@ -791,7 +790,7 @@ async def bluetooth_status(i3, event, *args):
output = "" output = ""
else: else:
output = ["bluetooth"] output = ["bluetooth"]
for major, minor, mac in devices: for device in devices:
classes = { classes = {
1: "laptop", 1: "laptop",
2: "phone", 2: "phone",
@ -817,54 +816,20 @@ async def bluetooth_status(i3, event, *args):
(lambda x: x & 0x20, "printer"), (lambda x: x & 0x20, "printer"),
], ],
} }
icon = classes.get((major, minor)) or classes.get(major, "unknown") icon = classes.get((device.major, device.minor)) or classes.get(
device.major, "unknown"
)
if type(icon) is list: if type(icon) is list:
for matcher, name in icon: for matcher, name in icon:
if matcher(minor): if matcher(device.minor):
icon = name icon = name
break break
else: else:
icon = "unknown" icon = "unknown"
output.append(icon) output.append(icon)
if mac.startswith("2C:41:A1"): if device.battery is not None:
# Get battery status for BOSE QC 35 II. This is hacky. icon = f"battery-{(device.battery+12)//25*25}"
# For other headsets, the information could be
# available through upower DBus. See
# https://github.com/Denton-L/based-connect. Starting
# from PA 16, it may be exposed by PA, then to BlueZ.
# See
# https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/482
loop = asyncio.get_event_loop()
sock = socket.socket(
socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM
)
try:
sock.setblocking(False)
# Workaround a bug in asyncio: https://bugs.python.org/issue27929
fut = loop.create_future()
loop._sock_connect(fut, sock, (mac, 8))
await fut
# Get battery
for send, ack, size in (
(b"\0\1\1\0", b"\0\1\3\5", 5), # get firmware
(b"\2\2\1\0", b"\2\2\3\1", 1), # get battery
):
await loop.sock_sendall(sock, send)
rack = await loop.sock_recv(sock, len(ack))
assert rack == ack, "incorrect ack received"
result = await loop.sock_recv(sock, size)
battery = result[0]
finally:
sock.close()
# Choose an icon
icon = f"battery-{(battery+12)//25*25}"
output[-1] = (output[-1], icon) output[-1] = (output[-1], icon)
# Schedule a refresh in 5 minutes
bluetooth_status.scheduled = loop.call_later(
600,
lambda: asyncio.create_task(bluetooth_status(i3, StartEvent)),
)
return "|".join( return "|".join(
(" ".join(icons[oo] for oo in o) if type(o) is tuple else icons[o]) (" ".join(icons[oo] for oo in o) if type(o) is tuple else icons[o])
for o in output for o in output