|
| 1 | +#!/usr/bin/env python3 |
| 2 | +import argparse |
| 3 | +import asyncio |
| 4 | +import logging |
| 5 | +import sys |
| 6 | +import threading |
| 7 | +import tkinter |
| 8 | +import tkinter.font |
| 9 | +import tkinter.ttk |
| 10 | + |
| 11 | +from typing_extensions import TypedDict |
| 12 | + |
| 13 | +import kasa |
| 14 | + |
| 15 | +logger = logging.getLogger(__name__) |
| 16 | + |
| 17 | + |
| 18 | +class LightState(TypedDict): |
| 19 | + on_off: int |
| 20 | + mode: str |
| 21 | + hue: int |
| 22 | + saturation: int |
| 23 | + color_temp: int |
| 24 | + brightness: int |
| 25 | + |
| 26 | + |
| 27 | +async def update_bulb(bulb, brightness=None, hue=None, saturation=None): |
| 28 | + if brightness != None: |
| 29 | + await bulb.set_brightness(brightness) |
| 30 | + if hue != None or saturation != None: |
| 31 | + state: LightState = await bulb.get_light_state() |
| 32 | + await bulb.set_hsv( |
| 33 | + hue if hue != None else state["hue"], |
| 34 | + saturation if saturation != None else state["saturation"], |
| 35 | + brightness if brightness != None else state["brightness"], |
| 36 | + ) |
| 37 | + |
| 38 | + |
| 39 | +class ScrollableFrame(tkinter.ttk.Frame): |
| 40 | + def __init__(self, container, *args, **kwargs): |
| 41 | + super().__init__(container, *args, **kwargs) |
| 42 | + self.canvas = tkinter.Canvas(self) |
| 43 | + self.scrollable_frame = tkinter.ttk.Frame(self.canvas) |
| 44 | + self.scrollable_frame.bind( |
| 45 | + "<Configure>", |
| 46 | + lambda event: self.canvas.configure(scrollregion=self.canvas.bbox("all")), |
| 47 | + ) |
| 48 | + self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw") |
| 49 | + self.scrollbar = tkinter.ttk.Scrollbar( |
| 50 | + self, orient="vertical", command=self.canvas.yview |
| 51 | + ) |
| 52 | + self.canvas.configure(yscrollcommand=self.scrollbar.set) |
| 53 | + self.canvas.pack(side="left", fill="both", expand=True) |
| 54 | + self.scrollbar.pack(side="right", fill="y") |
| 55 | + |
| 56 | + # bind mouse scroll only when the mouse is over our element |
| 57 | + self.canvas.bind("<Enter>", self._bind_mouse) |
| 58 | + self.canvas.bind("<Leave>", self._unbind_mouse) |
| 59 | + |
| 60 | + def _bind_mouse(self, event): |
| 61 | + if sys.platform == "linux": |
| 62 | + self.canvas.bind_all("<Button-4>", self._on_mouse_scroll) |
| 63 | + self.canvas.bind_all("<Button-5>", self._on_mouse_scroll) |
| 64 | + else: |
| 65 | + self.canvas.bind_all("<MouseWheel>", self._on_mouse_scroll) |
| 66 | + |
| 67 | + def _unbind_mouse(self, event): |
| 68 | + self.canvas.unbind_all("<Button-4>") |
| 69 | + self.canvas.unbind_all("<Button-5>") |
| 70 | + self.canvas.unbind_all("<MouseWheel>") |
| 71 | + |
| 72 | + def _on_mouse_scroll(self, event): |
| 73 | + logger.info("ScrollableFrame._on_mousewheel()") |
| 74 | + |
| 75 | + # Button 5 is "scroll up" in Linux. It seems that event.delta may just |
| 76 | + # be zero on Linux. |
| 77 | + if sys.platform == "linux" and event.num == 5: |
| 78 | + self.canvas.yview_scroll(1, "units") |
| 79 | + # Button 4 is "scroll down" in Linux. It seems that event.delta may |
| 80 | + # just be zero on Linux. |
| 81 | + elif sys.platform == "linux" and event.num == 4: |
| 82 | + self.canvas.yview_scroll(-1, "units") |
| 83 | + |
| 84 | + if sys.platform != "linux" and event.delta != 0: |
| 85 | + self.canvas.yview_scroll(-1 * (event.delta / 120), "units") |
| 86 | + |
| 87 | + |
| 88 | +class EditableText(tkinter.Frame): |
| 89 | + def __init__(self, container, initial_text_value, callback): |
| 90 | + super(self.__class__, self).__init__(container) |
| 91 | + |
| 92 | + self.callback = callback |
| 93 | + |
| 94 | + self.text = tkinter.StringVar(value=initial_text_value) |
| 95 | + self.child_widgets = [] |
| 96 | + self._render_static_mode() |
| 97 | + |
| 98 | + def _edit_mode_start(self, event=None): |
| 99 | + for widget in self.child_widgets: |
| 100 | + widget.destroy() |
| 101 | + self._render_edit_mode() |
| 102 | + |
| 103 | + def _edit_mode_finish(self, event=None): |
| 104 | + self.callback(self.text.get()) |
| 105 | + for widget in self.child_widgets: |
| 106 | + widget.destroy() |
| 107 | + self._render_static_mode() |
| 108 | + |
| 109 | + def _render_static_mode(self): |
| 110 | + """render non-editable text""" |
| 111 | + label_font = tkinter.font.Font( |
| 112 | + family="Helvetica", name="appHighlightFont", size=12, weight="bold" |
| 113 | + ) |
| 114 | + |
| 115 | + text_label = tkinter.Label(self, text=self.text.get(), font=label_font) |
| 116 | + text_label.pack(side=tkinter.LEFT) |
| 117 | + |
| 118 | + edit_label = tkinter.Label(self, image=self.pencil_icon) |
| 119 | + edit_label.bind("<ButtonRelease-1>", self._edit_mode_start) |
| 120 | + edit_label.pack(side=tkinter.LEFT) |
| 121 | + |
| 122 | + self.child_widgets.append(text_label) |
| 123 | + self.child_widgets.append(edit_label) |
| 124 | + |
| 125 | + def _render_edit_mode(self): |
| 126 | + text_entry = tkinter.Entry(self, textvariable=self.text) |
| 127 | + text_entry.pack(side=tkinter.LEFT) |
| 128 | + text_entry.bind("<Return>", self._edit_mode_finish) |
| 129 | + |
| 130 | + self.child_widgets.append(text_entry) |
| 131 | + |
| 132 | + @property |
| 133 | + def pencil_icon(self): |
| 134 | + """ ... note:: Make sure to store a reference to the result, as the |
| 135 | + BitmapImage may otherwise get garbage collected. Passing it to |
| 136 | + tkinter.Label is not sufficient. |
| 137 | + (https://stackoverflow.com/a/31959529/2796349) |
| 138 | + """ |
| 139 | + if not hasattr(self, "_pencil_icon"): |
| 140 | + self._pencil_icon = tkinter.BitmapImage( |
| 141 | + data=b"#define image_width 16\n#define image_height 16\nstatic char image_bits[] = {\n0x00,0x1c,0x00,0x3e,0x00,0x7f,0x80,0xf7,0xc0,0xf3,0xe0,0x79,0xf0,0x3c,0x78,\n0x1e,0x3c,0x0f,0x9c,0x07,0xcc,0x03,0xfc,0x01,0xfc,0x00,0x7c,0x00,0xff,0xff,\n0xff,0xff\n};" |
| 142 | + ) |
| 143 | + return self._pencil_icon |
| 144 | + |
| 145 | + |
| 146 | +class BulbFrame(tkinter.Frame): |
| 147 | + def _hue_callback(self, event): |
| 148 | + asyncio.run(update_bulb(self.bulb, None, self.hue_slider.get())) |
| 149 | + |
| 150 | + def _saturation_callback(self, event): |
| 151 | + asyncio.run(update_bulb(self.bulb, saturation=self.saturation_slider.get())) |
| 152 | + |
| 153 | + def _brightness_callback(self, event): |
| 154 | + asyncio.run(update_bulb(self.bulb, self.brightness_slider.get())) |
| 155 | + |
| 156 | + @classmethod |
| 157 | + def for_bulb(cls, bulb: kasa.SmartBulb, config, *args, **kwargs): |
| 158 | + """Create a new bulb frame given a SmartBulb |
| 159 | +
|
| 160 | + ... note:: I think using a classmethod here is a better approach than |
| 161 | + breaking the initializer interface of the parent Frame class. |
| 162 | + """ |
| 163 | + self = cls(*args, **kwargs) |
| 164 | + self.bulb = bulb |
| 165 | + |
| 166 | + self.hue_label = tkinter.Label(self, text="hue") |
| 167 | + self.saturation_label = tkinter.Label(self, text="saturation") |
| 168 | + self.brightness_label = tkinter.Label(self, text="brightness") |
| 169 | + |
| 170 | + self.hue_slider = tkinter.Scale( |
| 171 | + self, from_=0, to=360, orient=tkinter.HORIZONTAL |
| 172 | + ) |
| 173 | + self.hue_slider.bind("<ButtonRelease-1>", self._hue_callback) |
| 174 | + self.hue_slider.set(self.bulb.hsv[0]) |
| 175 | + |
| 176 | + self.saturation_slider = tkinter.Scale( |
| 177 | + self, from_=0, to=100, orient=tkinter.HORIZONTAL |
| 178 | + ) |
| 179 | + self.saturation_slider.bind("<ButtonRelease-1>", self._saturation_callback) |
| 180 | + self.saturation_slider.set(self.bulb.hsv[1]) |
| 181 | + |
| 182 | + self.brightness_slider = tkinter.Scale( |
| 183 | + self, from_=0, to=100, orient=tkinter.HORIZONTAL |
| 184 | + ) |
| 185 | + self.brightness_slider.bind("<ButtonRelease-1>", self._brightness_callback) |
| 186 | + self.brightness_slider.set(self.bulb.brightness) |
| 187 | + |
| 188 | + bulb_name = getattr(self.bulb, "alias", None) or self.bulb.mac |
| 189 | + |
| 190 | + # TODO See if we can update the device alias instead of just logging it |
| 191 | + # here |
| 192 | + EditableText( |
| 193 | + self, bulb_name, lambda new_device_name: logger.info(new_device_name) |
| 194 | + ).grid(column=0, row=0, columnspan=3) |
| 195 | + |
| 196 | + self.hue_label.grid(column=0, row=1) |
| 197 | + self.hue_slider.grid(column=0, row=2) |
| 198 | + |
| 199 | + self.saturation_label.grid(column=1, row=1) |
| 200 | + self.saturation_slider.grid(column=1, row=2) |
| 201 | + |
| 202 | + self.brightness_label.grid(column=2, row=1) |
| 203 | + self.brightness_slider.grid(column=2, row=2) |
| 204 | + |
| 205 | + return self |
| 206 | + |
| 207 | + |
| 208 | +class KasaDevices(tkinter.Frame): |
| 209 | + def __init__(self, *args, **kwargs): |
| 210 | + super(self.__class__, self).__init__(*args, **kwargs) |
| 211 | + |
| 212 | + self.device_lock = asyncio.Lock() |
| 213 | + # list of kasa devices |
| 214 | + self.kasa_devices = [] |
| 215 | + # mapping from mac address to widget |
| 216 | + self.device_widgets = {} |
| 217 | + |
| 218 | + self.refresh_button = tkinter.Button( |
| 219 | + self, text="Refresh", command=self.start_refresh |
| 220 | + ) |
| 221 | + self.refresh_button.pack(fill=tkinter.X) |
| 222 | + |
| 223 | + @property |
| 224 | + def _bulbs(self): |
| 225 | + return list( |
| 226 | + filter(lambda d: d.device_type == kasa.DeviceType.Bulb, self.kasa_devices) |
| 227 | + ) |
| 228 | + |
| 229 | + async def update_widgets(self): |
| 230 | + for bulb in self._bulbs: |
| 231 | + if bulb.mac not in self.device_widgets: |
| 232 | + w = BulbFrame.for_bulb(bulb, self.config, master=self) |
| 233 | + w.pack() |
| 234 | + self.device_widgets[bulb.mac] = w |
| 235 | + |
| 236 | + async def add_device(self, device): |
| 237 | + await device.update() |
| 238 | + logger.info("add_device(device={})".format(repr(device))) |
| 239 | + async with self.device_lock: |
| 240 | + mac_addrs = [d.mac for d in self.kasa_devices] |
| 241 | + device_exists = mac_addrs.count(device.mac) > 0 |
| 242 | + if device_exists: |
| 243 | + return |
| 244 | + self.kasa_devices.append(device) |
| 245 | + await self.update_widgets() |
| 246 | + |
| 247 | + async def _do_refresh(self): |
| 248 | + logger.info("KasaDevices._do_refresh() called") |
| 249 | + self.refresh_button["state"] = tkinter.DISABLED |
| 250 | + self.refresh_button["text"] = "Refreshing..." |
| 251 | + await self.clear_devices() |
| 252 | + await kasa.Discover.discover(on_discovered=self.add_device) |
| 253 | + self.refresh_button["state"] = tkinter.NORMAL |
| 254 | + self.refresh_button["text"] = "Refresh" |
| 255 | + |
| 256 | + async def clear_devices(self): |
| 257 | + async with self.device_lock: |
| 258 | + for mac, widget in self.device_widgets.items(): |
| 259 | + widget.destroy() |
| 260 | + self.device_widgets.clear() |
| 261 | + self.kasa_devices.clear() |
| 262 | + |
| 263 | + def start_refresh(self): |
| 264 | + def thread_loop(): |
| 265 | + asyncio.run(self._do_refresh()) |
| 266 | + |
| 267 | + logger.info("KasaDevices.start_refresh() called") |
| 268 | + threading.Thread(target=thread_loop).start() |
| 269 | + logger.info("KasaDevices.start_refresh() completed") |
| 270 | + |
| 271 | + |
| 272 | +def main(): |
| 273 | + root = tkinter.Tk() |
| 274 | + root.title("Kasa Devices") |
| 275 | + root.geometry("500x400") |
| 276 | + scroll_frame = ScrollableFrame(root) |
| 277 | + KasaDevices(scroll_frame.scrollable_frame).pack() |
| 278 | + scroll_frame.pack(fill=tkinter.BOTH, expand=True) |
| 279 | + root.mainloop() |
| 280 | + |
| 281 | + |
| 282 | +if __name__ == "__main__": |
| 283 | + parser = argparse.ArgumentParser() |
| 284 | + parser.add_argument( |
| 285 | + "-v", |
| 286 | + "--verbose", |
| 287 | + action="count", |
| 288 | + dest="verbosity", |
| 289 | + default=0, |
| 290 | + help="show verbose logging (repeat flag for increased verbosity)", |
| 291 | + ) |
| 292 | + args = parser.parse_args() |
| 293 | + log_level = logging.CRITICAL - args.verbosity * 10 |
| 294 | + logging.basicConfig(level=log_level) |
| 295 | + main() |
0 commit comments