|
| 1 | +__all__ = ['verify_signature', 'decrypt_content'] |
| 2 | +import json |
| 3 | +import logging |
| 4 | +import re |
| 5 | +from time import time |
| 6 | +from urllib.parse import urlparse |
| 7 | + |
| 8 | +from nacl.encoding import Base64Encoder |
| 9 | +from nacl.exceptions import BadSignatureError |
| 10 | +from nacl.public import Box |
| 11 | +from nacl.public import PrivateKey |
| 12 | +from nacl.public import PublicKey |
| 13 | +from nacl.signing import VerifyKey |
| 14 | + |
| 15 | +logger = logging.getLogger(__name__) |
| 16 | + |
| 17 | +FORMSG_WEBHOOK_PUBLIC_KEY = VerifyKey('3Tt8VduXsjjd4IrpdCd7BAkdZl/vUCstu9UvTX84FWw=', encoder=Base64Encoder) |
| 18 | +ENCRYPTED_CONTENT_REGEX = re.compile(r'^(?P<submission_public_key>[\w\+\/\=]*)\;(?P<nonce>[\w\+\/\=]*)\:(?P<encrypted_message>[\w\+\/\=]*)$') |
| 19 | + |
| 20 | + |
| 21 | +def verify_signature(webhook_uri, signature_header, signature_expiry_seconds=60): |
| 22 | + # v1 is signature, s is submissionId, f is formId, t is submission epoch |
| 23 | + logger.debug(f'X-FormSG-Signature is <{signature_header}>.') |
| 24 | + formsg_signature = dict(part.split('=', 1) for part in signature_header.split(',')) |
| 25 | + formsg_signature['t'] = int(formsg_signature['t']) |
| 26 | + |
| 27 | + # Javascript url.href adds a trailing `/` to root domain urls |
| 28 | + # https://github.com/opengovsg/formsg-javascript-sdk/blob/master/src/webhooks.ts#L25 |
| 29 | + u = urlparse(webhook_uri) |
| 30 | + if not u.path: |
| 31 | + u = u._replace(path='/') |
| 32 | + webhook_uri = u.geturl() |
| 33 | + |
| 34 | + FORMSG_WEBHOOK_PUBLIC_KEY.verify( |
| 35 | + smessage=f'{webhook_uri}.{formsg_signature["s"]}.{formsg_signature["f"]}.{formsg_signature["t"]}'.encode('ascii'), |
| 36 | + signature=Base64Encoder.decode(formsg_signature['v1']), |
| 37 | + ) |
| 38 | + |
| 39 | + if time() - (formsg_signature['t'] / 1000) > signature_expiry_seconds: |
| 40 | + raise BadSignatureError('FormSG signature has expired.') |
| 41 | + |
| 42 | + return formsg_signature |
| 43 | +#end def |
| 44 | + |
| 45 | + |
| 46 | +def decrypt_content(body_json, secret_key): |
| 47 | + if 'data' in body_json: |
| 48 | + encrypted_content = body_json['data']['encryptedContent'] |
| 49 | + else: |
| 50 | + encrypted_content = body_json['encryptedContent'] # old version POST body |
| 51 | + #end if |
| 52 | + |
| 53 | + submission_public_key, nonce, encrypted_message = ENCRYPTED_CONTENT_REGEX.match(encrypted_content).groups() |
| 54 | + |
| 55 | + box = Box( |
| 56 | + PrivateKey(secret_key, encoder=Base64Encoder), |
| 57 | + PublicKey(submission_public_key, encoder=Base64Encoder), |
| 58 | + ) |
| 59 | + |
| 60 | + plaintext = box.decrypt(encrypted_message, Base64Encoder.decode(nonce), encoder=Base64Encoder) |
| 61 | + |
| 62 | + return json.loads(plaintext) |
| 63 | +#end def |
0 commit comments