1- from typing import Any , Dict , List , Optional , TypeVar
1+ from __future__ import annotations
2+
3+ from typing import Any , Dict , List , Optional , Tuple , TypeVar
24
35from cbor2 import CBORTag , loads
46
@@ -98,6 +100,20 @@ def __init__(self, type: COSETypes, msg: List[Any]):
98100 raise ValueError (f"Invalid COSETypes({ type } ) for COSE message." )
99101 return
100102
103+ def __eq__ (self : COSEMessage , other : object ) -> bool :
104+ if not isinstance (other , COSEMessage ):
105+ return NotImplemented
106+ return (
107+ self ._type == other ._type
108+ and self ._protected == other ._protected
109+ and self ._unprotected == other ._unprotected
110+ and self ._payload == other ._payload
111+ and self ._other_fields == other ._other_fields
112+ )
113+
114+ def __ne__ (self : COSEMessage , other : object ) -> bool :
115+ return not self .__eq__ (other )
116+
101117 @classmethod
102118 def loads (cls , msg : bytes ):
103119 tagged = loads (msg )
@@ -266,8 +282,8 @@ def _validate_cose_message(self, msg: List[Any]):
266282 raise ValueError ("The protected headers should be bytes." )
267283 if not isinstance (msg [1 ], dict ):
268284 raise ValueError ("The unprotected headers should be Dict[int, Any]." )
269- if not isinstance (msg [2 ], bytes ):
270- raise ValueError ("The payload should be bytes." )
285+ if not isinstance (msg [2 ], bytes ) and msg [ 2 ] is not None :
286+ raise ValueError ("The payload should be bytes or null ." )
271287
272288 countersignatures = msg [1 ].get (11 , None )
273289 if countersignatures is None :
@@ -287,3 +303,36 @@ def _validate_cose_message(self, msg: List[Any]):
287303 def _get_kid (self , sig : list ) -> Optional [bytes ]:
288304 kid = sig [1 ].get (4 , None )
289305 return kid if kid else self ._loads (sig [0 ]).get (4 , None )
306+
307+ def detach_payload (self : Self ) -> Tuple [COSEMessage , bytes ]:
308+ """
309+ Detach a payload from the COSE message
310+
311+ Returns:
312+ Tuple[COSEMessage, bytes]: A byte string of the encoded COSE or a
313+ cbor2.CBORTag object, and a byte string of the detached payload.
314+ Raises:
315+ ValueError: The payload does not exist.
316+ """
317+
318+ if not isinstance (self ._payload , bytes ):
319+ raise ValueError ("The payload does not exist." )
320+
321+ return COSEMessage (self ._type , [self ._msg [0 ], self ._msg [1 ], None , * self ._msg [3 :]]), self ._payload
322+
323+ def attach_payload (self : Self , payload : bytes ) -> COSEMessage :
324+ """
325+ Attach a detached content to the COSE message
326+
327+ Args:
328+ payload (bytes): A byte string of detached payload.
329+ Returns:
330+ COSEMessage: The COSE message (self).
331+ Raises:
332+ ValueError: The payload already exist.
333+ """
334+
335+ if self ._payload is not None :
336+ raise ValueError ("The payload already exist." )
337+
338+ return COSEMessage (self ._type , [self ._msg [0 ], self ._msg [1 ], payload , * self ._msg [3 :]])
0 commit comments