Source code for srslib

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
#                                                                             #
#    Copyright (C) 2017 Chuan Ji <jichu4n@gmail.com>                          #
#                                                                             #
#    Licensed under the Apache License, Version 2.0 (the "License");          #
#    you may not use this file except in compliance with the License.         #
#    You may obtain a copy of the License at                                  #
#                                                                             #
#     http://www.apache.org/licenses/LICENSE-2.0                              #
#                                                                             #
#    Unless required by applicable law or agreed to in writing, software      #
#    distributed under the License is distributed on an "AS IS" BASIS,        #
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
#    See the License for the specific language governing permissions and      #
#    limitations under the License.                                           #
#                                                                             #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
"""Sender Rewriting Scheme (SRS) library for Python."""

from typing import List, Tuple, Union
import base64
import datetime
import hashlib
import hmac
import re
import string
import time


[docs]class Error(Exception): """Base class for SRS errors."""
[docs]class InvalidAddressError(Error, ValueError): """Invalid email address."""
[docs]class InvalidHashError(Error): """Invalid hash in an SRS address."""
[docs]class InvalidTimestampError(Error): """Invalid timestamp in an SRS address."""
[docs]class SRS(object): """A Sender Rewriting Scheme (SRS) instance. This class implements the Guarded scheme described in the original SRS paper at http://www.libsrs2.org/srs/srs.pdf, with sensible defaults derived from the the canonical libsrs2 C implementation. Example usage: .. code-block:: python srs = SRS('secret_key') # Rewrites an email from alice@A.com to B.com rewritten_addr = srs.forward('alice@A.com', 'B.com') # Reverse it to get the address to bounce to. bounce_addr = srs.reverse(rewritten_addr) """ # Regular expression matching SRS0 and SRS1 addresses. _SRS0_OPAQUE = re.compile(r""" SRS0 # Prefix tag ([-+=].+) # Opaque part w/ leading separator, generated by 1st hop """, re.IGNORECASE | re.VERBOSE) _SRS0 = re.compile(r""" SRS0 # Prefix tag [-+=] ([^=]+) # Hash = ([^=]+) # Timestamp = ([^=]+) # Envelope sender host = (.+) # Envelope sender local part """, re.IGNORECASE | re.VERBOSE) _SRS1 = re.compile(r""" SRS1 # Prefix tag [-+=] ([^=]+) # Hash = ([^=]+) # 1st hop host = ([-+=].+) # Opaque part w/ leading separator, generated by 1st hop """, re.IGNORECASE | re.VERBOSE) # 5-bit / base 32 alphabet for timestamp encoding as described in the spec. # Note that this is NOT the same as RFC4648 or RFC3548 Base32 encoding, which # are both 8-bit / base 256 encodings. _TS_ALPHABET = string.ascii_uppercase + '234567' # Reverse lookup table for _TS_ALPHABET. Defined in __init__() as list # comprehensions in class scope cannot access class scope. _TS_REVERSE = {} _SECONDS_IN_DAY = datetime.timedelta(days=1).total_seconds()
[docs] def __init__( self, secret, prev_secrets=[], validity_days=21, hash_length=4): # type: (Union[str, bytes], List[Union[str, bytes]], int, int) """Creates a new SRS configuration instance. Args: secret (str|bytes): Cryptographic secret for creating / rwversing rewritten addresses. prev_secrets (list(str|bytes)): Previously used secrets that are still considered valid for reversing rewritten addresses. validity_days (int): Number of days after which rewritten addresses cannot be reversed. hash_length (int): Length to truncate hash digest to. """ self._TS_REVERSE = { self._TS_ALPHABET[i]: i for i in range(len(self._TS_ALPHABET)) } self._secret = self._to_bytes(secret) self._prev_secrets = [self._to_bytes(secret) for secret in prev_secrets] self._validity_days = validity_days self._hash_length = hash_length # Cached list of all valid timestamps. The first element is the current # timestamp. self._valid_ts_cache = None # Used for testing timestamp checks. self._time_fn = time.time
[docs] def forward(self, from_addr, alias_host): # type: (str, str) -> str """Rewrites sender address `from_addr` to `alias_host`. As described in the SRS specification, the algorithm is: - If the envelope sender address (`from_addr`) is an SRS1 address rewritten by 1stHop.com to SRS0 and later by nthHop.com to SRS1, rewrite to a new SRS1 address such that bounces will go to us then 1stHop.com. - If `from_addr` is an SRS0 address rewritten by 1stHop.com, rewrite to an SRS1 address such that bounces will go to us then back to 1stHop.com. - If `from_addr` is neither an SRS0 address nor an SRS1 address, rewrite to an SRS0 address such that bounces will go to us then back to `from_addr`. Args: from_addr (str): The original envelope sender address. alias_host (str): The host to rewrite to (current host). Returns: str: The envelope sender address rewritten to `alias_host`. Raises: :obj:`srslib.InvalidAddressError`: `from_addr` is not a valid email address. """ from_local_part, from_host = self._split_addr(from_addr) # Case 1: Address is an SRS1 address. We are hop > 2, and we replace the # hash with our own to generate a new SRS1 address that also bounces to the # 1st hop. m = self._SRS1.match(from_local_part) if m: return self.generate_srs1_address(m.group(2), m.group(3), alias_host) # Case 2: Address is an SRS0 address. We are the 2nd hop, and we will return # an SRS1 address that bounces to the 1st hop. m = self._SRS0_OPAQUE.match(from_local_part) if m: return self.generate_srs1_address(from_host, m.group(1), alias_host) # Case 3: We are the 1st hop. We will return an SRS0 address. return self.generate_srs0_address(from_host, from_local_part, alias_host)
[docs] def reverse(self, srs_addr): # type: (str) -> str """Reverses a rewritten address. As described in the SRS specification, the algorithm is: - If `srs_addr` is an SRS0 address rewritten by us, bounce to the original envelope sender address. - If `srs_addr` is an SRS1 address rewritten by 1stHop.com and then us, bounce to the SRS0 address rewritten by 1stHop.com. Args: srs_addr (str): An SRS0 or SRS1 address. Returns: str: The address to bounce to. Raises: :obj:`srslib.InvalidAddressError`: `srs_addr` is not a valid email address. :obj:`srslib.InvalidHashError`: The hash string in `srs_addr` is invalid. :obj:`srslib.InvalidTimestampError`: The timestamp string in `srs_addr` is invalid or expired. """ from_local_part, from_host = self._split_addr(srs_addr) # Case 1: Address is an SRS1 address. We were hop n >= 2 in the forwarding # chain, and we will bounce back to hop 1. m = self._SRS1.match(from_local_part) if m: self.check_hash(m.group(1), m.group(2) + m.group(3), srs_addr) return 'SRS0%s@%s' % (m.group(3), m.group(2)) # Case 2: Address is an SRS0 address. We were the first hop in the # forwarding chain, and we will bounce back to the original envelope sender. m = self._SRS0.match(from_local_part) if m: self.check_hash( m.group(1), m.group(2) + m.group(3) + m.group(4), srs_addr) self.check_ts(m.group(2), srs_addr) return '%s@%s' % (m.group(4), m.group(3)) raise InvalidAddressError('Unrecognized SRS address: "%s"' % srs_addr)
[docs] @classmethod def is_srs_address(cls, addr, strict=True): # type: (str, bool) -> bool """Checks if an address is a valid SRS address. If strict is True, this function will only consider SRS0 addresses formatted according to the Guarded scheme as valid. If strict is False, any address with an SRS0 prefix and separator is considered valid. Args: addr (str): An email address, e.g. `foo@example.com`. strict (bool): Whether to check SRS0 addresses in strict mode. Raises: :obj:`srslib.InvalidAddressError`: `addr` is not a valid email address. """ local_part, host = cls._split_addr(addr) srs0 = cls._SRS0 if strict else cls._SRS0_OPAQUE return bool(srs0.match(addr) or cls._SRS1.match(addr))
[docs] def generate_srs0_address( self, orig_host, orig_local_part, alias_host): # type: (str, str, str) -> str """Produces an SRS0 address. Args: orig_host (str): Host part of the original envelope sender address. orig_local_part (str): Local part of the original envelope sender address. alias_host (str): The host to rewrite to (current host). Returns: str: The rewritten SRS0 address. """ ts = self.generate_ts() return 'SRS0=%s=%s=%s=%s@%s' % ( self.generate_hash( ts + orig_host + orig_local_part, self._secret, self._hash_length), ts, orig_host, orig_local_part, alias_host)
[docs] def generate_srs1_address( self, first_hop_host, first_hop_local_part, alias_host): # type: (str, str, str) -> str """Produces an SRS1 address. Args: first_hop_host (str): Address of the 1st hop (SRS0) host. first_hop_local_part (str): Local part generated by 1st hop host (w/o the "SRS0" prefix) alias_host (str): The host to rewrite to (current host). Returns: str: The rewritten SRS1 address. """ return 'SRS1=%s=%s=%s@%s' % ( self.generate_hash( first_hop_host + first_hop_local_part, self._secret, self._hash_length), first_hop_host, first_hop_local_part, alias_host)
@classmethod def _split_addr(cls, addr): # type: (str) -> Tuple[str, str] """Splits an email address to (local_part, host).""" try: local_part, host = addr.split('@') except ValueError: raise InvalidAddressError('Invalid from_addr address: "%s"' % addr) else: return (local_part, host)
[docs] def generate_hash(self, s, secret, hash_length): # type: (str, bytes, int) -> str """Produces a hash string for use in an SRS address. As recommended in the specification, this function yields a base64-encoded hash of the provided string in lower case using the HMAC-SHA1 algorithm, and truncates it to hash_length characters. Args: s (str): Input string to hash. secret (bytes): The cryptographic secret to use. hash_length (int): Length to truncate the generated hash digest to. Returns: str: SRS hash string, truncated to `hash_length`. """ return ( base64.b64encode( hmac.new(secret, s.lower().encode('utf-8'), hashlib.sha1).digest()) [:hash_length] .decode('utf-8'))
[docs] def check_hash(self, h, s, addr): # type: (str, str, str) -> None """Checks a hash (`h`) against an input string (`s`). Following the canonical implementation (libsrs2), hashes are compared case-insensively. Args: h (str): A hash string possibly generated by the algorithm described in `generate_hash`. s (str): Original hashed string. addr (str): The full address being reversed. Raises: :obj:`srslib.InvalidHashError`: Hash is invalid. """ if not any( h.lower() == self.generate_hash(s, secret, len(h)).lower() for secret in [self._secret] + self._prev_secrets): raise InvalidHashError('Invalid hash in SRS address: "%s"' % addr)
[docs] def generate_ts(self, t=None): # type: (float) -> str """Produces a timestamp for use in an SRS0 address. Following the algorithm in the original paper, this function yields the UNIX timestamp of the current date modded by 2^10, encoded in base32. Args: t (float): If not None, specifies the UNIX timestamp to use instead of the current time. """ t = int((t or self._time_fn()) // self._SECONDS_IN_DAY) return ''.join( self._TS_ALPHABET[x] for x in ( (t >> 5) & 0b11111, t & 0b11111, ))
[docs] def check_ts(self, ts, addr): # type: (str, str) -> None """Checks an encoded timestamp string against current time. Args: ts (str): A timestamp possibly generated by the algorithm described in `generate_ts`. addr (str): The full address being reversed. Raises: :obj:`srslib.InvalidTimestampError`: timestamp is invalid. """ if (self._valid_ts_cache is None or self._valid_ts_cache[0] != self.generate_ts()): now = self._time_fn() self._valid_ts_cache = [ self.generate_ts(now - i * self._SECONDS_IN_DAY) for i in range(self._validity_days) ] if ts.upper() not in self._valid_ts_cache: raise InvalidTimestampError( 'Invalid timestamp in SRS address: "%s"' % addr)
def _to_bytes(self, secret): # type: (Union[str, bytes]) -> bytes """Ensures that a client-provided secret is in bytes.""" if isinstance(secret, bytes): return secret elif isinstance(secret, str): return secret.encode('utf-8') else: raise Error('SRS secret must be bytes or str, got %s' % type(secret))