2626
2727try :
2828 from typing import Union , Sequence , Optional , List , Tuple
29- from typing_extensions import Literal
29+ from typing_extensions import Literal # type: ignore
3030except ImportError :
3131 pass
3232from micropython import const
@@ -90,7 +90,11 @@ def __init__(
9090 # non-plus variants
9191 self ._open_pipes , self ._is_plus_variant = (0 , False ) # close all RX pipes
9292 self ._features = self ._reg_read (TX_FEATURE )
93- self ._reg_write (0x50 , 0x73 ) # derelict command toggles TX_FEATURE register
93+ # invoke derelict command toggles TX_FEATURE register (test for plus variant)
94+ self ._out [0 ] = 0x50
95+ self ._out [1 ] = 0x73
96+ with self ._spi as _spi :
97+ _spi .write_readinto (self ._out , self ._in , in_end = 2 , out_end = 2 )
9498 after_toggle = self ._reg_read (TX_FEATURE )
9599 if self ._features == after_toggle :
96100 self ._is_plus_variant = True
@@ -102,7 +106,7 @@ def __init__(
102106 self ._features = 5
103107 # init shadow copy of last RX_ADDR_P0 written to pipe 0 needed as
104108 # open_tx_pipe() appropriates pipe 0 for ACK packet
105- self ._pipe0_read_addr : Optional [ Union [ bytes , bytearray ]] = None
109+ self ._is_p0_rx : bool = False
106110 # shadow copy of the TX_ADDRESS
107111 self ._tx_address = self ._reg_read_bytes (TX_ADDRESS )
108112 # pre-configure the SETUP_RETR register
@@ -158,13 +162,19 @@ def ce_pin(self) -> bool:
158162 def ce_pin (self , val : bool ):
159163 self ._ce_pin .value = val
160164
161- def _reg_read (self , reg : int ) -> int :
165+ def _reg_read (self , reg : int , command : bool = False ) -> int :
162166 self ._out [0 ] = reg
167+ len = int (not command ) + 1
163168 with self ._spi as spi :
164169 # time.sleep(0.000005)
165- spi .write_readinto (self ._out , self ._in , out_end = 2 , in_end = 2 )
166- # print("SPI read 1 byte from", ("%02X" % reg), ("%02X" % self._in[1]))
167- return self ._in [1 ]
170+ spi .write_readinto (self ._out , self ._in , out_end = len , in_end = len )
171+ # if command:
172+ # print("SPI command", ("%02X" % reg))
173+ # else:
174+ # print(
175+ # "SPI read", len, "byte from", ("%02X" % reg), ("%02X" % self._in[1])
176+ # )
177+ return self ._in [not command ]
168178
169179 def _reg_read_bytes (self , reg : int , buf_len : int = 5 ) -> bytearray :
170180 self ._out [0 ] = reg
@@ -188,21 +198,15 @@ def _reg_write_bytes(self, reg: int, out_buf: Union[bytes, bytearray]):
188198 # buf_len - 1, ("%02X" % reg), address_repr(self._out[1 : buf_len], 0)
189199 # ))
190200
191- def _reg_write (self , reg : int , value : Optional [int ] = None ):
192- self ._out [0 ] = reg
193- buf_len = 1
194- if value is not None :
195- self ._out [0 ] = (0x20 if reg != 0x50 else 0 ) | reg
196- self ._out [1 ] = value
197- buf_len += 1
201+ def _reg_write (self , reg : int , value : int ):
202+ self ._out [0 ] = 0x20 | reg
203+ self ._out [1 ] = value
204+ buf_len = 2
198205 with self ._spi as spi :
199206 # time.sleep(0.000005)
200207 spi .write_readinto (self ._out , self ._in , out_end = buf_len , in_end = buf_len )
201208 # if reg != 0xFF:
202- # print(
203- # "SPI write", "command" if value is None else "1 byte to",
204- # ("%02X" % reg), "" if value is None else ("%02X" % value)
205- # )
209+ # print("SPI write 1 byte to", ("%02X" % reg), ("%02X" % value))
206210
207211 @property
208212 def address_length (self ) -> int :
@@ -217,21 +221,20 @@ def address_length(self, length: int):
217221
218222 def open_tx_pipe (self , address : Union [bytes , bytearray ]) -> None :
219223 """Open a data pipe for TX transmissions."""
220- if self ._pipe0_read_addr != address and self ._aa & 1 :
221- for i , val in enumerate (address ):
222- self ._pipes [0 ][i ] = val # type: ignore[assignment, index]
223- self ._reg_write_bytes (RX_ADDR_P0 , address )
224- for i , val in enumerate (address ):
225- self ._tx_address [i ] = val
226- self ._reg_write_bytes (TX_ADDRESS , address )
224+ addr_len = min (len (address ), self ._addr_len )
225+ addr = address [:addr_len ]
226+ self ._tx_address [:addr_len ] = addr
227+ if self ._config & 1 == 0 and self ._aa & 1 :
228+ self ._reg_write_bytes (RX_ADDR_P0 , addr )
229+ self ._reg_write_bytes (TX_ADDRESS , addr )
227230
228231 def close_rx_pipe (self , pipe_number : int ) -> None :
229232 """Close a specific data pipe from RX transmissions."""
230233 if pipe_number < 0 or pipe_number > 5 :
231234 raise IndexError ("pipe number must be in range [0, 5]" )
232235 self ._open_pipes = self ._reg_read (OPEN_PIPES ) & ~ (1 << pipe_number )
233236 if not pipe_number :
234- self ._pipe0_read_addr = None
237+ self ._is_p0_rx = False
235238 self ._reg_write (OPEN_PIPES , self ._open_pipes )
236239
237240 def open_rx_pipe (self , pipe_number : int , address : Union [bytes , bytearray ]) -> None :
@@ -240,22 +243,24 @@ def open_rx_pipe(self, pipe_number: int, address: Union[bytes, bytearray]) -> No
240243 raise IndexError ("pipe number must be in range [0, 5]" )
241244 if not address :
242245 raise ValueError ("address length cannot be 0" )
246+ addr_len = min (len (address ), self ._addr_len )
247+ addr = address [:addr_len ]
243248 if pipe_number < 2 :
244249 if not pipe_number :
245- self ._pipe0_read_addr = address
246- for i , val in enumerate ( address ):
247- self ._pipes [ pipe_number ][ i ] = val # type: ignore[assignment, index]
248- self ._reg_write_bytes (RX_ADDR_P0 + pipe_number , address )
250+ self ._is_p0_rx = True
251+ self . _pipes [ pipe_number ][: addr_len ] = addr # type: ignore[assignment, index]
252+ if self ._config & 1 or pipe_number != 0 :
253+ self ._reg_write_bytes (RX_ADDR_P0 + pipe_number , addr )
249254 else :
250- self ._pipes [pipe_number ] = address [0 ]
255+ self ._pipes [pipe_number ] = addr [0 ]
251256 self ._reg_write (RX_ADDR_P0 + pipe_number , address [0 ])
252257 self ._open_pipes = self ._reg_read (OPEN_PIPES ) | (1 << pipe_number )
253258 self ._reg_write (OPEN_PIPES , self ._open_pipes )
254259
255260 @property
256261 def listen (self ) -> bool :
257262 """This attribute is the primary role as a radio."""
258- return self .power and bool ( self . _config & 1 )
263+ return self ._config & 3 == 3
259264
260265 @listen .setter
261266 def listen (self , is_rx : bool ):
@@ -265,22 +270,19 @@ def listen(self, is_rx: bool):
265270 start_timer = time .monotonic_ns ()
266271 if is_rx :
267272 self ._ce_pin .value = True
268- if (
269- self ._pipe0_read_addr is not None
270- and self ._pipe0_read_addr != self .address (0 )
271- ):
272- for i , val in enumerate (self ._pipe0_read_addr ):
273- self ._pipes [0 ][i ] = val # type: ignore[index]
274- self ._reg_write_bytes (RX_ADDR_P0 , self ._pipe0_read_addr )
275- elif self ._pipe0_read_addr is None and self ._open_pipes & 1 :
273+ if self ._is_p0_rx :
274+ self ._reg_write_bytes (RX_ADDR_P0 , self ._pipes [0 ][: self ._addr_len ]) # type: ignore[index]
275+ elif self ._open_pipes & 1 :
276276 self ._open_pipes &= 0x3E # close_rx_pipe(0) is slower
277277 self ._reg_write (OPEN_PIPES , self ._open_pipes )
278278 else :
279- if self ._features & 6 == 6 and (( self . _aa & self . _dyn_pl ) & 1 ) :
279+ if self ._features & 6 == 6 :
280280 self .flush_tx ()
281- if self ._aa & 1 and not self ._open_pipes & 1 :
282- self ._open_pipes |= 1
283- self ._reg_write (OPEN_PIPES , self ._open_pipes )
281+ if self ._is_p0_rx and self ._aa & 1 :
282+ self ._reg_write_bytes (RX_ADDR_P0 , self ._tx_address [: self ._addr_len ])
283+ if not self ._open_pipes & 1 :
284+ self ._open_pipes |= 1
285+ self ._reg_write (OPEN_PIPES , self ._open_pipes )
284286 # mandatory wait time is 130 µs
285287 delta_time = time .monotonic_ns () - start_timer
286288 if delta_time < 150000 :
@@ -372,7 +374,7 @@ def irq_df(self) -> bool:
372374
373375 def update (self ) -> Literal [True ]:
374376 """This function gets an updated status byte over SPI."""
375- self ._reg_write (0xFF )
377+ self ._reg_read (0xFF , command = True )
376378 return True
377379
378380 def clear_status_flags (
@@ -800,7 +802,7 @@ def resend(self, send_only: bool = False):
800802 if not send_only and (self ._in [0 ] >> 1 ) < 6 :
801803 self .flush_rx ()
802804 self .clear_status_flags ()
803- # self._reg_write (0xE3)
805+ # self._reg_read (0xE3, command=True )
804806 up_cnt = 0
805807 self ._ce_pin .value = True
806808 while not self ._in [0 ] & 0x30 :
@@ -839,11 +841,11 @@ def write(
839841
840842 def flush_rx (self ):
841843 """Flush all 3 levels of the RX FIFO."""
842- self ._reg_write (0xE2 )
844+ self ._reg_read (0xE2 , command = True )
843845
844846 def flush_tx (self ):
845847 """Flush all 3 levels of the TX FIFO."""
846- self ._reg_write (0xE1 )
848+ self ._reg_read (0xE1 , command = True )
847849
848850 def fifo (self , about_tx : bool = False , check_empty : Optional [bool ] = None ):
849851 """This provides the status of the TX/RX FIFO buffers. (read-only)"""
0 commit comments