summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/_utils.py421
-rw-r--r--tests/api.py252
-rw-r--r--tests/conftest.py115
-rw-r--r--tests/integration.rs73
-rw-r--r--tests/smtp.py27
-rw-r--r--tests/test_auth_account.py348
-rw-r--r--tests/test_auth_device.py434
-rw-r--r--tests/test_auth_email.py96
-rw-r--r--tests/test_auth_oauth.py369
-rw-r--r--tests/test_auth_password.py211
-rw-r--r--tests/test_auth_session.py69
-rw-r--r--tests/test_oauth.py97
-rw-r--r--tests/test_profile.py134
-rw-r--r--tests/test_push.py147
14 files changed, 2793 insertions, 0 deletions
diff --git a/tests/_utils.py b/tests/_utils.py
new file mode 100644
index 0000000..6ccd99c
--- /dev/null
+++ b/tests/_utils.py
@@ -0,0 +1,421 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+"""
+
+fxa._utils: miscellaneous low-level utilities for PyFxA
+
+This private-api stuff that will most likely change, move, refactor
+etc as we go. So don't import any of it outside of this package.
+
+"""
+from __future__ import absolute_import
+import os
+import time
+import hashlib
+import hmac
+import six
+from binascii import hexlify, unhexlify
+from base64 import b64encode
+try:
+ import cPickle as pickle
+except ImportError: # pragma: no cover
+ import pickle
+
+from six import PY3
+from six.moves.urllib.parse import urlparse, urljoin
+
+import requests
+import requests.auth
+import requests.utils
+import hawkauthlib
+
+import fxa
+import fxa.errors
+import fxa.crypto
+
+
+# Send a custom user-agent header
+# so we're easy to identify in server logs etc.
+
+USER_AGENT_HEADER = ' '.join((
+ 'Mozilla/5.0 (Mobile; Firefox Accounts; rv:1.0)',
+ 'PyFxA/%s' % (fxa.__version__),
+ requests.utils.default_user_agent(),
+))
+
+
+if not PY3:
+ hexstr = hexlify
+else: # pragma: no cover
+ def hexstr(data):
+ """Like binascii.hexlify, but always returns a str instance."""
+ return hexlify(data).decode("ascii")
+
+
+def uniq(size=10):
+ """Generate a short random hex string."""
+ return hexstr(os.urandom(size // 2 + 1))[:size]
+
+
+def get_hmac(data, secret, algorithm=hashlib.sha256):
+ """Generate an hexdigest hmac for given data, secret and algorithm."""
+ return hmac.new(secret.encode('utf-8'),
+ data.encode('utf-8'),
+ algorithm).hexdigest()
+
+
+def scope_matches(provided, required):
+ """Check that required scopes match the ones provided. This is used during
+ token verification to raise errors if expected scopes are not met.
+
+ :note:
+
+ The rules for parsing and matching scopes in FxA are documented at
+ https://github.com/mozilla/fxa-oauth-server/blob/master/docs/scopes.md
+
+ :param provided: list of scopes provided for the current token.
+ :param required: the scope required (e.g. by the application).
+ :returns: ``True`` if all required scopes are provided, ``False`` if not.
+ """
+ if isinstance(provided, six.string_types):
+ raise ValueError("Provided scopes must be a list, not a single string")
+
+ if not isinstance(required, (list, tuple)):
+ required = [required]
+
+ for req in required:
+ if not any(_match_single_scope(prov, req) for prov in provided):
+ return False
+
+ return True
+
+
+def _match_single_scope(provided, required):
+ if provided.startswith('https:'):
+ return _match_url_scope(provided, required)
+ else:
+ return _match_shortname_scope(provided, required)
+
+
+def _match_shortname_scope(provided, required):
+ if required.startswith('https:'):
+ return False
+ prov_names = provided.split(':')
+ req_names = required.split(':')
+ # If we require :write, it must be provided.
+ if req_names[-1] == 'write':
+ if prov_names[-1] != 'write':
+ return False
+ req_names.pop()
+ prov_names.pop()
+ elif prov_names[-1] == 'write':
+ prov_names.pop()
+ # Provided names must be a prefix of required names.
+ if len(prov_names) > len(req_names):
+ return False
+ for (p, r) in zip(prov_names, req_names):
+ if p != r:
+ return False
+ # It matches!
+ return True
+
+
+def _match_url_scope(provided, required):
+ if not required.startswith('https:'):
+ return False
+ # Pop the hash fragments
+ (prov_url, prov_hash) = (provided.rsplit('#', 1) + [None])[:2]
+ (req_url, req_hash) = (required.rsplit('#', 1) + [None])[:2]
+ # Provided URL must be a prefix of required.
+ if req_url != prov_url:
+ if not (req_url.startswith(prov_url + '/')):
+ return False
+ # If hash is provided, it must match that required.
+ if prov_hash:
+ if not req_hash or req_hash != prov_hash:
+ return False
+ # It matches!
+ return True
+
+
+class APIClient(object):
+ """A requests.Session wrapper specialized for FxA API access.
+
+ An instance of this class should be used for making requests to an FxA
+ web API endpoint. It wraps a requests.Session instance and provides
+ a broadly similar interface, with some additional functionality that's
+ specific to Firefox Accounts:
+
+ * default base server URL
+ * backoff protocol support
+ * sensible request timeouts
+ * timestamp skew tracking with automatic retry on clockskew error
+
+ """
+
+ def __init__(self, server_url, session=None):
+ if session is None:
+ session = requests.Session()
+ # Properties that can be customized to change behaviour.
+ self.server_url = server_url
+ self.timeout = 30
+ self.max_retry_after = None
+ # Internal state.
+ self._session = session
+ self._backoff_until = 0
+ self._backoff_response = None
+ self._clockskew = None
+
+ # Reflect useful properties of the wrapped Session object.
+
+ @property
+ def headers(self):
+ return self._session.headers
+
+ @headers.setter
+ def headers(self, value):
+ self._session.headers = value
+
+ @property
+ def auth(self):
+ return self._session.auth
+
+ @auth.setter
+ def auth(self, value):
+ if getattr(value, "apiclient", None) is None:
+ value.apiclient = self
+ self._session.auth = value
+
+ @property
+ def hooks(self):
+ return self._session.hooks
+
+ @hooks.setter
+ def hooks(self, value):
+ self._session.hooks = value
+
+ @property
+ def verify(self):
+ return self._session.verify
+
+ @verify.setter
+ def verify(self, value):
+ self._session.verify = value
+
+ # Add some handy utility methods of our own.
+
+ def client_curtime(self):
+ """Get the current timestamp, as seen by the client.
+
+ This is a helper function that returns the current local time.
+ It's mostly here for symmetry with server_curtime() and to assist
+ in testability of this class.
+ """
+ return time.time()
+
+ def server_curtime(self):
+ """Get the current timestamp, as seen by the server.
+
+ This is a helper function that automatically applies any detected
+ clock-skew, to report what the current timestamp is on the server
+ instead of on the client.
+ """
+ return self.client_curtime() + (self._clockskew or 0)
+
+ # The actual request-making stuff.
+
+ def request(self, method, url, json=None, retry_auth_errors=True, **kwds):
+ """Make a request to the API and process the response.
+
+ This method implements the low-level details of interacting with an
+ FxA Web API, stripping away most of the details of HTTP. It will
+ return the parsed JSON of a successful responses, or raise an exception
+ for an error response. It's also responsible for backoff handling
+ and clock-skew tracking.
+ """
+ # Don't make requests if we're in backoff.
+ # Instead just synthesize a backoff response.
+ if self._backoff_response is not None:
+ if self._backoff_until >= self.client_curtime():
+ resp = pickle.loads(self._backoff_response)
+ resp.request = None
+ resp.headers["Timestamp"] = str(int(self.server_curtime()))
+ return resp
+ else:
+ self._backoff_until = 0
+ self._backoff_response = None
+
+ # Apply defaults and perform the request.
+ while url.startswith("/"):
+ url = url[1:]
+ if self.server_url.endswith("/"):
+ url = urljoin(self.server_url, url)
+ else:
+ url = urljoin(self.server_url + "/", url)
+ if self.timeout is not None:
+ kwds.setdefault("timeout", self.timeout)
+
+ # Configure the user agent
+ headers = kwds.get('headers', {})
+ headers.setdefault('User-Agent', USER_AGENT_HEADER)
+ kwds['headers'] = headers
+
+ resp = self._session.request(method, url, json=json, **kwds)
+
+ # Everything should return a valid JSON response. Even errors.
+ content_type = resp.headers.get("content-type", "")
+ if not content_type.startswith("application/json"):
+ msg = "API responded with non-json content-type: {0}"
+ raise fxa.errors.OutOfProtocolError(msg.format(content_type))
+ try:
+ body = resp.json()
+ except ValueError as e:
+ msg = "API responded with invalid json: {0}"
+ raise fxa.errors.OutOfProtocolError(msg.format(e))
+
+ # Check for backoff indicator from the server.
+ # If found, backoff up to the client-specified max time.
+ if resp.status_code in (429, 500, 503):
+ try:
+ retry_after = int(resp.headers["retry-after"])
+ except (KeyError, ValueError):
+ pass
+ else:
+ if self.max_retry_after is not None:
+ retry_after = max(retry_after, self.max_retry_after)
+ self._backoff_until = self.client_curtime() + retry_after
+ self._backoff_response = pickle.dumps(resp)
+
+ # If we get a 401 with "serverTime" field in the body, then we're
+ # probably out of sync with the server's clock. Check our skew,
+ # adjust if necessary and try again.
+ if retry_auth_errors:
+ if resp.status_code == 401 and "serverTime" in body:
+ try:
+ server_timestamp = int(body["serverTime"])
+ except ValueError:
+ msg = "API responded with non-integer serverTime: {0}"
+ msg = msg.format(body["serverTime"])
+ raise fxa.errors.OutOfProtocolError(msg)
+ # If our guestimate is more than 30 seconds out, try again.
+ # This assumes the auth hook will use the updated clockskew.
+ if abs(server_timestamp - self.server_curtime()) > 30:
+ self._clockskew = server_timestamp - self.client_curtime()
+ return self.request(method, url, json, False, **kwds)
+
+ # See if we need to adjust for clock skew between client and server.
+ # We do this automatically once per session in the hopes of avoiding
+ # having to retry subsequent auth failures. We do it *after* the retry
+ # checking above, because it wrecks the "were we out of sync?" check.
+ if self._clockskew is None and "timestamp" in resp.headers:
+ try:
+ server_timestamp = int(resp.headers["timestamp"])
+ except ValueError:
+ msg = "API responded with non-integer timestamp: {0}"
+ msg = msg.format(resp.headers["timestamp"])
+ raise fxa.errors.OutOfProtocolError(msg)
+ else:
+ self._clockskew = server_timestamp - self.client_curtime()
+
+ # Raise exceptions for any error responses.
+ # XXX TODO: hooks for raising error subclass based on errno.
+ if 400 <= resp.status_code < 500:
+ raise fxa.errors.ClientError(body)
+ if 500 <= resp.status_code < 600:
+ raise fxa.errors.ServerError(body)
+ if resp.status_code < 200 or resp.status_code >= 300:
+ msg = "API responded with unexpected status code: {0}"
+ raise fxa.errors.OutOfProtocolError(msg.format(resp.status_code))
+
+ # Return the parsed JSON body for successful responses.
+ return body
+
+ def get(self, url, **kwds):
+ return self.request("GET", url, **kwds)
+
+ def post(self, url, json=None, **kwds):
+ return self.request("POST", url, json, **kwds)
+
+ def put(self, url, json=None, **kwds):
+ return self.request("PUT", url, json, **kwds)
+
+ def delete(self, url, **kwds):
+ return self.request("DELETE", url, **kwds)
+
+
+class HawkTokenAuth(requests.auth.AuthBase):
+ """A requests auth hook implementing token-based hawk auth.
+
+ This auth hook implements the hkdf-derived-hawk-token auth scheme
+ as used by the Firefox Accounts auth server. It uses HKDF to derive
+ an id and secret key from a random 32-byte token, then signs the request
+ with those credentials using the Hawk request-signing scheme.
+ """
+
+ def __init__(self, token, tokentype, apiclient=None):
+ tokendata = unhexlify(token)
+ key_material = fxa.crypto.derive_key(tokendata, tokentype, 3*32)
+ self.id = hexstr(key_material[:32])
+ self.auth_key = key_material[32:64]
+ self.bundle_key = key_material[64:]
+ self.apiclient = apiclient
+
+ def __call__(self, req):
+ # Requests doesn't include the port in the Host header by default.
+ # Ensure a fully-correct value so that signatures work properly.
+ req.headers["Host"] = urlparse(req.url).netloc
+ params = {}
+ if req.body:
+ body = _encoded(req.body, 'utf-8')
+ hasher = hashlib.sha256()
+ hasher.update(b"hawk.1.payload\napplication/json\n")
+ hasher.update(body)
+ hasher.update(b"\n")
+ hash = b64encode(hasher.digest())
+ if PY3:
+ hash = hash.decode("ascii")
+ params["hash"] = hash
+ if self.apiclient is not None:
+ params["ts"] = str(int(self.apiclient.server_curtime()))
+ hawkauthlib.sign_request(req, self.id, self.auth_key, params=params)
+ return req
+
+ def bundle(self, namespace, payload):
+ """Bundle encrypted response data."""
+ return fxa.crypto.bundle(self.bundle_key, namespace, payload)
+
+ def unbundle(self, namespace, payload):
+ """Unbundle encrypted response data."""
+ return fxa.crypto.unbundle(self.bundle_key, namespace, payload)
+
+
+class BearerTokenAuth(requests.auth.AuthBase):
+ """A requests auth hook implementing OAuth bearer-token-based auth.
+
+ This auth hook implements the simple "bearer token" auth scheme.
+ The provided token is passed directly in the Authorization header.
+ """
+
+ def __init__(self, token, apiclient=None):
+ self.token = token
+
+ def __call__(self, req):
+ req.headers["Authorization"] = "Bearer {0}".format(self.token)
+ return req
+
+
+def _decoded(value, encoding='utf-8'):
+ """Make sure the value is of type ``unicode`` in both PY2 and PY3."""
+ value_type = type(value)
+ if value_type != six.text_type:
+ value = value.decode(encoding)
+ return value
+
+
+def _encoded(value, encoding='utf-8'):
+ """Make sure the value is of type ``bytes`` in both PY2 and PY3."""
+ value_type = type(value)
+ if value_type != six.binary_type:
+ return value.encode(encoding)
+ return value
diff --git a/tests/api.py b/tests/api.py
new file mode 100644
index 0000000..9d2f70d
--- /dev/null
+++ b/tests/api.py
@@ -0,0 +1,252 @@
+import asyncio
+import base64
+import binascii
+import http.server
+import http_ece
+import json
+import os
+import queue
+import quopri
+import threading
+from _utils import HawkTokenAuth, APIClient, hexstr
+from aiosmtpd.controller import Controller
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import ec
+from fxa.crypto import quick_stretch_password, derive_key, xor
+
+AUTH_URL = "http://localhost:8000/auth"
+PROFILE_URL = "http://localhost:8000/profile"
+OAUTH_URL = "http://localhost:8000/oauth"
+INVITE_URL = "http://localhost:8000/_invite"
+PUSH_PORT = 10264
+SMTP_PORT = 2525
+
+def auth_pw(email, pw):
+ return derive_key(quick_stretch_password(email, pw), "authPW").hex()
+
+class AuthClient:
+ def __init__(self, /, email=None, session=None, bearer=None, props=None):
+ self.password = ""
+ self.client = APIClient(f"{AUTH_URL}/v1")
+ self.email = email
+ assert int(session is not None) + int(bearer is not None) < 2
+ self.session = session
+ self.auth = HawkTokenAuth(session, "sessionToken", self.client) if session else None
+ self.bearer = bearer
+ self.headers = { 'authorization': f'bearer {bearer}' } if bearer else {}
+ self.props = props
+
+ def post(self, url, json=None, **kwds):
+ return self.client.post(url, json, **kwds)
+ def post_a(self, url, json=None, **kwds):
+ kwds.setdefault('headers', {})
+ kwds['headers'] |= self.headers
+ return self.client.post(url, json, auth=self.auth, **kwds)
+
+ def get(self, url, **kwds):
+ return self.client.get(url, **kwds)
+ def get_a(self, url, **kwds):
+ kwds.setdefault('headers', {})
+ kwds['headers'] |= self.headers
+ return self.client.get(url, auth=self.auth, **kwds)
+
+ def delete(self, url, **kwds):
+ return self.client.delete(url, **kwds)
+ def delete_a(self, url, **kwds):
+ kwds.setdefault('headers', {})
+ kwds['headers'] |= self.headers
+ return self.client.delete(url, auth=self.auth, **kwds)
+
+ def create_account(self, email, pw, keys=None, invite=None, **kwds):
+ body = {
+ "email": email,
+ "authPW": hexstr(derive_key(quick_stretch_password(email, pw), "authPW")),
+ "style": invite,
+ }
+ params = { 'keys': str(keys).lower() } if keys is not None else {}
+ resp = self.client.post("/account/create", body, params=params, **kwds)
+ return AuthClient(email=email, session=resp['sessionToken'], props=resp)
+ def destroy_account(self, email, pw, **kwds):
+ body = { "email": email, "authPW": hexstr(derive_key(quick_stretch_password(email, pw), "authPW")) }
+ return self.client.post("/account/destroy", body)
+
+ def fetch_keys(self, key_fetch_token, pw):
+ pw = quick_stretch_password(self.email, pw)
+ auth = HawkTokenAuth(key_fetch_token, "keyFetchToken", self.client)
+ resp = self.client.get("/account/keys", auth=auth)
+ bundle = binascii.unhexlify(resp["bundle"])
+ keys = auth.unbundle("account/keys", bundle)
+ unwrap_key = derive_key(pw, "unwrapBkey")
+ return (keys[:32], xor(keys[32:], unwrap_key))
+
+ def login(self, email, pw, keys=None, **kwds):
+ body = { "email": email, "authPW": hexstr(derive_key(quick_stretch_password(email, pw), "authPW")) }
+ params = { "keys": str(keys).lower() } if keys is not None else {}
+ resp = self.client.post("/account/login", body, params=params)
+ return AuthClient(email=email, session=resp['sessionToken'], props=resp)
+ def destroy_session(self, **kwds):
+ return self.post_a("/session/destroy", kwds)
+
+ def profile(self):
+ token = self.post_a("/oauth/token", {
+ "client_id": "5882386c6d801776",
+ "ttl": 60,
+ "grant_type": "fxa-credentials",
+ "access_type": "online",
+ "scope": "profile:write",
+ })
+ return Profile(token['access_token'])
+
+class Invite:
+ def __init__(self, token):
+ self.client = APIClient(f"{INVITE_URL}/v1")
+ self.auth = HawkTokenAuth(token, "sessionToken", self.client)
+
+ def post(self, url, json=None, **kwds):
+ return self.client.post(url, json, **kwds)
+ def post_a(self, url, json=None, **kwds):
+ return self.client.post(url, json, auth=self.auth, **kwds)
+
+class PasswordChange:
+ def __init__(self, client, token, hkdf='passwordChangeToken'):
+ self.client = client
+ self.auth = HawkTokenAuth(token, hkdf, self.client)
+
+ def post(self, url, json=None, **kwds):
+ return self.client.post(url, json, **kwds)
+ def post_a(self, url, json=None, **kwds):
+ return self.client.post(url, json, auth=self.auth, **kwds)
+
+class AccountReset:
+ def __init__(self, client, token):
+ self.client = client
+ self.auth = HawkTokenAuth(token, 'accountResetToken', self.client)
+
+ def post(self, url, json=None, **kwds):
+ return self.client.post(url, json, **kwds)
+ def post_a(self, url, json=None, **kwds):
+ return self.client.post(url, json, auth=self.auth, **kwds)
+
+class Profile:
+ def __init__(self, token):
+ self.client = APIClient(f"{PROFILE_URL}/v1")
+ self.token = token
+
+ def get(self, url, **kwds):
+ return self.client.get(url, **kwds)
+ def get_a(self, url, **kwds):
+ kwds.setdefault('headers', {})
+ kwds['headers']['authorization'] = f'bearer {self.token}'
+ return self.client.get(url, **kwds)
+
+ def post(self, url, json=None, **kwds):
+ return self.client.post(url, json, **kwds)
+ def post_a(self, url, json=None, **kwds):
+ kwds.setdefault('headers', {})
+ kwds['headers']['authorization'] = f'bearer {self.token}'
+ return self.client.post(url, json, **kwds)
+
+ def delete(self, url, **kwds):
+ return self.client.delete(url, **kwds)
+ def delete_a(self, url, **kwds):
+ kwds.setdefault('headers', {})
+ kwds['headers']['authorization'] = f'bearer {self.token}'
+ return self.client.delete(url, **kwds)
+
+class Oauth:
+ def __init__(self):
+ self.client = APIClient(f"{OAUTH_URL}/v1")
+
+ def post(self, url, json=None, **kwds):
+ return self.client.post(url, json, **kwds)
+
+class Device:
+ def __init__(self, auth, name, type="desktop", commands={}, pcb=None):
+ self.auth = auth
+ dev = auth.post_a("/account/device", {
+ "name": name,
+ "type": type,
+ "availableCommands": commands,
+ } | self._mk_push(pcb))
+ self.id = dev['id']
+ self.props = dev
+
+ def _mk_push(self, pcb):
+ if not pcb:
+ return {}
+
+ self.priv = ec.generate_private_key(ec.SECP256R1, default_backend())
+ self.public = self.priv.public_key().public_bytes(
+ encoding=serialization.Encoding.X962,
+ format=serialization.PublicFormat.UncompressedPoint)
+ self.authkey = os.urandom(16)
+ return {
+ "pushCallback": pcb,
+ "pushPublicKey": base64.urlsafe_b64encode(self.public).decode('utf8'),
+ "pushAuthKey": base64.urlsafe_b64encode(self.authkey).decode('utf8'),
+ }
+
+ def update_pcb(self, pcb):
+ self.props = self.auth.post_a("/account/device", { "id": self.id } | self._mk_push(pcb))
+
+ def decrypt(self, data):
+ raw = http_ece.decrypt(data, private_key=self.priv, auth_secret=self.authkey)
+ return json.loads(raw.decode('utf8'))
+
+class PushServer:
+ def __init__(self):
+ q = self.q = queue.Queue()
+
+ class Handler(http.server.BaseHTTPRequestHandler):
+ def do_POST(self):
+ if self.path.startswith("/err/"):
+ self.send_response(410)
+ self.end_headers()
+ else:
+ self.send_response(200)
+ self.end_headers()
+
+ q.put((self.path, self.headers, self.rfile.read(int(self.headers['content-length']))))
+
+ server = self.server = http.server.ThreadingHTTPServer(("localhost", PUSH_PORT), Handler)
+ threading.Thread(target=server.serve_forever).start()
+
+ def wait(self, timeout=2):
+ return self.q.get(timeout=timeout)
+ def done(self, timeout=2):
+ try:
+ self.q.get(timeout=timeout)
+ return False
+ except queue.Empty:
+ return True
+
+ def good(self, id):
+ return f"http://localhost:{PUSH_PORT}/{id}"
+ def bad(self, id):
+ return f"http://localhost:{PUSH_PORT}/err/{id}"
+
+class MailServer:
+ def __init__(self):
+ q = self.q = queue.Queue()
+
+ class Handler:
+ async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
+ envelope.rcpt_tos.append(address)
+ return '250 OK'
+
+ async def handle_DATA(self, server, session, envelope):
+ headers, body = envelope.content.decode('utf8').split("\r\n\r\n", maxsplit=1)
+ if "Content-Transfer-Encoding: quoted-printable" in headers:
+ body = quopri.decodestring(body).decode('utf8')
+ q.put((envelope.rcpt_tos, body))
+ return '250 Message accepted for delivery'
+
+ self.controller = Controller(Handler(), hostname="localhost", port=SMTP_PORT)
+ self.controller.start()
+
+ def stop(self):
+ self.controller.stop()
+
+ def wait(self, timeout=2):
+ return self.q.get(timeout=timeout)
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..15149cb
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,115 @@
+import base64
+import json
+import pytest
+from fxa.errors import ClientError
+
+from api import *
+
+@pytest.fixture
+def push_server():
+ s = PushServer()
+ yield s
+ s.server.shutdown()
+ s.server.server_close()
+
+@pytest.fixture
+def mail_server():
+ s = MailServer()
+ yield s
+ s.stop()
+
+@pytest.fixture
+def client():
+ return AuthClient()
+
+def _login(client, email, mail_server):
+ # unverified accounts and unverified session behave the same, so we don't bother
+ # with dedicated unverified-session tests and just always verify.
+ c = client.login(email, "")
+ (to, body) = mail_server.wait()
+ assert to == [email]
+ c.post_a('/session/verify_code', { 'code': body.strip() })
+ return c
+
+def _account(client, primary, email, mail_server):
+ s = client.create_account(email, "")
+ try:
+ (to, body) = mail_server.wait()
+ assert to == [email]
+ data = json.loads(base64.urlsafe_b64decode(body.split("#/verify/", maxsplit=1)[1]).decode('utf8'))
+ s.post_a('/recovery_email/verify_code', { 'uid': data['uid'], 'code': data['code'] })
+ if primary:
+ yield s
+ else:
+ c = _login(client, email, mail_server)
+ yield c
+ s.password = c.password
+ finally:
+ try:
+ s.destroy_account(email, s.password)
+ except ClientError as e:
+ # don't fail if the account was already deleted
+ if e.details['errno'] != 102:
+ raise
+
+@pytest.fixture(params=[True, False], ids=["primary", "secondary"])
+def account(client, request, mail_server):
+ for a in _account(client, request.param, "test.account@test-auth", mail_server):
+ yield a
+@pytest.fixture(params=[True, False], ids=["primary", "secondary"])
+def account2(client, request, mail_server):
+ for a in _account(client, request.param, "test.account2@test-auth", mail_server):
+ yield a
+
+@pytest.fixture
+def unverified_account(client, mail_server):
+ s = client.create_account("test.account@test-auth", "")
+ yield s
+ s.destroy_account("test.account@test-auth", "")
+
+@pytest.fixture
+def login(client, mail_server):
+ return _login(client, "test.account@test-auth", mail_server)
+@pytest.fixture
+def login2(client, mail_server):
+ return _login(client, "test.account2@test-auth", mail_server)
+
+def _refresh_token(account, scope):
+ body = {
+ "client_id": "5882386c6d801776",
+ "ttl": 60,
+ "grant_type": "fxa-credentials",
+ "access_type": "offline",
+ "scope": scope,
+ }
+ yield account.post_a("/oauth/token", body)
+
+@pytest.fixture
+def refresh_token(account):
+ for r in _refresh_token(account, "profile https://identity.mozilla.com/apps/oldsync https://identity.mozilla.com/tokens/session"):
+ yield AuthClient(email=account.email, bearer=r['refresh_token'], props=r)
+@pytest.fixture
+def narrow_refresh_token(account):
+ for r in _refresh_token(account, "profile https://identity.mozilla.com/tokens/session"):
+ yield AuthClient(email=account.email, bearer=r['refresh_token'], props=r)
+
+def _account_or_rt(account, request, scope):
+ if request.param:
+ yield account
+ else:
+ for r in _refresh_token(account, scope):
+ yield AuthClient(email=account.email, bearer=r['refresh_token'], props=r)
+
+@pytest.fixture(params=[True, False], ids=["session", "refresh_token"])
+def account_or_rt(account, request):
+ for r in _account_or_rt(account, request, "profile https://identity.mozilla.com/apps/oldsync https://identity.mozilla.com/tokens/session"):
+ yield r
+
+@pytest.fixture
+def forgot_token(account):
+ resp = account.post_a("/password/forgot/send_code", { 'email': account.email })
+ assert 'passwordForgotToken' in resp
+ assert resp['ttl'] == 300
+ assert resp['codeLength'] == 16
+ assert resp['tries'] == 1
+ return PasswordChange(account.client, resp['passwordForgotToken'], 'passwordForgotToken')
diff --git a/tests/integration.rs b/tests/integration.rs
new file mode 100644
index 0000000..afded52
--- /dev/null
+++ b/tests/integration.rs
@@ -0,0 +1,73 @@
+use std::env;
+
+use anyhow::Context;
+use base64::URL_SAFE_NO_PAD;
+use chrono::{Duration, Utc};
+use futures::channel::oneshot::channel;
+use minor_skulk::{build, db::Db};
+use password_hash::rand_core::OsRng;
+use rand::RngCore;
+use rocket::{
+ fairing::AdHoc,
+ tokio::{process::Command, spawn},
+};
+
+#[macro_use]
+extern crate rocket;
+extern crate anyhow;
+
+async fn run_pytest(markers: &'static str, invite: bool) -> anyhow::Result<()> {
+ dotenv::dotenv().ok();
+ // at this point this is only a test runner to be used by the nix build.
+ env::set_var("ROCKET_LOG_LEVEL", "off");
+
+ let (tx, rx) = channel();
+ let rocket = build().await?.attach(AdHoc::on_liftoff("notify startup", move |rocket| {
+ Box::pin(async move {
+ // add an invite code as-if generated during startup and move it to an
+ // env var, emulating the user looking at the logs and copying the code.
+ // invite_only needs this to function.
+ if invite {
+ let db = rocket.state::<Db>().unwrap();
+ let tx = db.begin().await.unwrap();
+ let mut code = [0; 32];
+ OsRng.fill_bytes(&mut code);
+ let code = base64::encode_config(code, URL_SAFE_NO_PAD);
+ tx.add_invite_code(&code, Utc::now() + Duration::minutes(5)).await.unwrap();
+ tx.commit().await.unwrap();
+ env::set_var("INVITE_CODE", code);
+ }
+
+ tx.send(()).unwrap();
+ })
+ }));
+
+ spawn(async move { rocket.launch().await });
+ let test = spawn(async move {
+ rx.await.expect("test trigger failed");
+ let mut child = Command::new("pytest")
+ .arg("-vvv")
+ .arg("-m")
+ .arg(markers)
+ .spawn()
+ .expect("failed to spawn");
+ child.wait().await
+ });
+
+ assert!(test.await.context("starting pytest")?.context("running pytest")?.success());
+
+ Ok(())
+}
+
+#[async_test]
+async fn open() -> anyhow::Result<()> {
+ env::set_var("ROCKET_INVITE_ONLY", "false");
+ run_pytest("not invite", false).await
+}
+
+#[async_test]
+async fn invite_only() -> anyhow::Result<()> {
+ env::set_var("ROCKET_INVITE_ONLY", "true");
+ env::set_var("ROCKET_INVITE_ADMIN_ADDRESS", "test.account@test-auth");
+ run_pytest("invite", true).await
+}
diff --git a/tests/smtp.py b/tests/smtp.py
new file mode 100644
index 0000000..fa6e16b
--- /dev/null
+++ b/tests/smtp.py
@@ -0,0 +1,27 @@
+import asyncio
+import quopri
+from aiosmtpd.controller import Controller
+
+class PrintHandler:
+ async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
+ envelope.rcpt_tos.append(address)
+ return '250 OK'
+
+ async def handle_DATA(self, server, session, envelope):
+ print('Message from %s' % envelope.mail_from)
+ print('Message for %s' % envelope.rcpt_tos)
+ print('Message data:\n')
+ headers, body = envelope.content.decode('utf8').split("\r\n\r\n", maxsplit=1)
+ if "Content-Transfer-Encoding: quoted-printable" in headers:
+ body = quopri.decodestring(body).decode('utf8')
+ print(headers)
+ print()
+ print(body)
+ print()
+ print('End of message')
+ return '250 Message accepted for delivery'
+
+if __name__ == '__main__':
+ controller = Controller(PrintHandler(), hostname="localhost", port=2525)
+ controller.start()
+ input()
diff --git a/tests/test_auth_account.py b/tests/test_auth_account.py
new file mode 100644
index 0000000..68a407b
--- /dev/null
+++ b/tests/test_auth_account.py
@@ -0,0 +1,348 @@
+import os
+import pytest
+from fxa.errors import ClientError
+
+from api import *
+
+def test_create_no_args(client):
+ with pytest.raises(ClientError) as e:
+ client.post("/account/create")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 106,
+ 'error': 'Bad Request',
+ 'message': 'invalid json in request body'
+ }
+
+@pytest.mark.parametrize("args", [
+ {"email": "", "authPW": "", "extra": ""},
+ {"email": "", "authPW": "00"},
+ {"email": "a" * 257, "authPW": "0" * 64},
+ {"email": "a@test", "authPW": "00"},
+ {"email": "a@test", "authPW": "00" * 32, "style": "foo"},
+])
+def test_create_invalid_args(client, args):
+ with pytest.raises(ClientError) as e:
+ client.post("/account/create", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_create_nomail(client):
+ with pytest.raises(ClientError) as e:
+ client.create_account("test.account@test-auth", "")
+ assert e.value.details == {
+ 'code': 422,
+ 'errno': 151,
+ 'error': 'Unprocessable Entity',
+ 'message': 'failed to send email'
+ }
+
+def test_create_exists(account):
+ with pytest.raises(ClientError) as e:
+ account.create_account(account.email, "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 101,
+ 'error': 'Bad Request',
+ 'message': 'account already exists'
+ }
+
+@pytest.mark.parametrize("keys", [None, False, True])
+@pytest.mark.parametrize("verify", [False, True])
+def test_create_destroy(client, keys, verify, mail_server):
+ s = client.create_account("test.create.destroy@test-auth", "", keys=keys)
+ try:
+ if verify:
+ (to, body) = mail_server.wait()
+ assert to == [s.email]
+ data = json.loads(base64.urlsafe_b64decode(body.split("#/verify/", maxsplit=1)[1]).decode('utf8'))
+ s.post_a('/recovery_email/verify_code', { 'uid': data['uid'], 'code': data['code'] })
+ if keys:
+ k = s.fetch_keys(s.props['keyFetchToken'], "")
+ with pytest.raises(ClientError) as e:
+ s.fetch_keys(s.props['keyFetchToken'], "")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+ finally:
+ client.destroy_account("test.create.destroy@test-auth", "")
+
+def test_login_no_args(client):
+ with pytest.raises(ClientError) as e:
+ client.post("/account/login")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 106,
+ 'error': 'Bad Request',
+ 'message': 'invalid json in request body'
+ }
+
+@pytest.mark.parametrize("args", [
+ {"email": "", "authPW": "", "extra": ""},
+ {"email": "", "authPW": "00"},
+ {"email": "a" * 257, "authPW": "0" * 64},
+ {"email": "a@test", "authPW": "00"},
+])
+def test_login_invalid_args(client, args):
+ with pytest.raises(ClientError) as e:
+ client.post("/account/login", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_login_noaccount(client):
+ with pytest.raises(ClientError) as e:
+ client.login("test@test", "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 102,
+ 'error': 'Bad Request',
+ 'message': 'unknown account'
+ }
+
+def test_login_unverified(client, unverified_account):
+ with pytest.raises(ClientError) as e:
+ client.login(unverified_account.email, "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 104,
+ 'error': 'Bad Request',
+ 'message': 'unverified account'
+ }
+
+def test_login_badcase(account):
+ with pytest.raises(ClientError) as e:
+ account.login(account.email.upper(), "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 120,
+ 'error': 'Bad Request',
+ 'message': 'incorrect email case'
+ }
+
+def test_login_badpasswd(account):
+ with pytest.raises(ClientError) as e:
+ account.login(account.email, "ca0cb780-f114-405a-a5c2-1a4deb933a51")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 103,
+ 'error': 'Bad Request',
+ 'message': 'incorrect password'
+ }
+
+@pytest.mark.parametrize("keys", [None, False, True])
+def test_login(client, account, keys):
+ s = client.login(account.email, "", keys=keys)
+ try:
+ if keys:
+ s.fetch_keys(s.props['keyFetchToken'], "")
+ with pytest.raises(ClientError) as e:
+ s.fetch_keys(s.props['keyFetchToken'], "")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+ finally:
+ s.destroy_session()
+
+@pytest.mark.parametrize("args", [
+ {"email": "", "authPW": "", "extra": ""},
+ {"email": "", "authPW": "00"},
+ {"email": "a" * 257, "authPW": "0" * 64},
+ {"email": "a@test", "authPW": "00"},
+])
+def test_destroy_invalid_args(client, args):
+ with pytest.raises(ClientError) as e:
+ client.post("/account/destroy", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_destroy_noaccount(client):
+ with pytest.raises(ClientError) as e:
+ client.destroy_account("test@test", "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 102,
+ 'error': 'Bad Request',
+ 'message': 'unknown account'
+ }
+
+def test_destroy_badcase(account):
+ with pytest.raises(ClientError) as e:
+ account.destroy_account(account.email.upper(), "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 120,
+ 'error': 'Bad Request',
+ 'message': 'incorrect email case'
+ }
+
+def test_destroy_badpasswd(account):
+ with pytest.raises(ClientError) as e:
+ account.destroy_account(account.email, "ca0cb780-f114-405a-a5c2-1a4deb933a51")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 103,
+ 'error': 'Bad Request',
+ 'message': 'incorrect password'
+ }
+
+@pytest.mark.parametrize("auth", [
+ {},
+ {"authorization": "bearer invalid"},
+ {"authorization": "hawk id=invalid"},
+])
+def test_keys_invalid(client, auth):
+ with pytest.raises(ClientError) as e:
+ client.get("/account/keys", headers=auth)
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+# create and login do the remaining keyfetch tests.
+# we don't check whether the bundle is actually *valid* because we'd have to look
+# into the db to do a full test, so we'll trust it is correct if sync works.
+
+@pytest.fixture
+def reset_token(account, forgot_token, mail_server):
+ (to, body) = mail_server.wait()
+ assert account.email in to
+ resp = forgot_token.post_a("/password/forgot/verify_code", { 'code': body.strip() })
+ return AccountReset(account.client, resp['accountResetToken'])
+
+@pytest.mark.parametrize("args", [
+ { 'authPW': '00', },
+ { 'authPW': '00' * 32, 'extra': 0, },
+])
+def test_reset_invalid(reset_token, args):
+ with pytest.raises(ClientError) as e:
+ reset_token.post_a("/account/reset", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_reset(account, reset_token, mail_server, push_server):
+ dev = Device(account, "dev", pcb=push_server.good("d4105515-f4f0-4d26-85c1-f48c5c348edb"))
+ resp = reset_token.post_a("/account/reset", { 'authPW': auth_pw(account.email, "") })
+ assert resp == {}
+ (to, body) = mail_server.wait()
+ assert account.email in to
+ assert 'account has been reset' in body
+ p = push_server.wait()
+ assert p[0] == "/d4105515-f4f0-4d26-85c1-f48c5c348edb"
+ assert dev.decrypt(p[2]) == {'command': 'fxaccounts:password_reset', 'version': 1}
+ p = push_server.wait()
+ assert p[0] == "/d4105515-f4f0-4d26-85c1-f48c5c348edb"
+ assert dev.decrypt(p[2]) == {
+ 'command': 'fxaccounts:device_disconnected',
+ 'data': {'id': dev.id},
+ 'version': 1
+ }
+ assert push_server.done()
+
+def test_reset(account, reset_token, mail_server, push_server):
+ reset_token.post_a("/account/reset", { 'authPW': auth_pw(account.email, "") })
+ with pytest.raises(ClientError) as e:
+ reset_token.post_a("/account/reset", { 'authPW': auth_pw(account.email, "") })
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+@pytest.mark.invite
+def test_create_noinvite(client):
+ with pytest.raises(ClientError) as e:
+ client.create_account("test.create.destroy@test-auth", "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': -1,
+ 'error': 'Bad Request',
+ 'message': 'invite code required'
+ }
+
+@pytest.mark.invite
+def test_create_badinvite(client):
+ with pytest.raises(ClientError) as e:
+ client.create_account("test.create.destroy@test-auth", "", { 'style': '' })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': -1,
+ 'error': 'Bad Request',
+ 'message': 'invite code required'
+ }
+
+@pytest.mark.invite
+def test_create_invite(client, mail_server):
+ # all in one test because restarting the server from python would be a pain
+ c = client.create_account("test.account@test-auth", "", invite=os.environ['INVITE_CODE'])
+ c2 = None
+ try:
+ invite = Invite(c.props['sessionToken'])
+ # unverified sessions fail
+ with pytest.raises(ClientError) as e:
+ code = invite.post_a('/generate', { 'ttl_hours': 1 })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+ # verified session works
+ (to, body) = mail_server.wait()
+ data = json.loads(base64.urlsafe_b64decode(body.split("#/verify/", maxsplit=1)[1]).decode('utf8'))
+ c.post_a('/recovery_email/verify_code', { 'uid': data['uid'], 'code': data['code'] })
+ code = invite.post_a('/generate', { 'ttl_hours': 1 })
+ assert 'url' in code
+ code = code['url'].split('/')[-1]
+ # code allows registration
+ c2 = client.create_account("test.account2@test-auth", "", invite=code)
+ (to, body) = mail_server.wait()
+ data = json.loads(base64.urlsafe_b64decode(body.split("#/verify/", maxsplit=1)[1]).decode('utf8'))
+ c2.post_a('/recovery_email/verify_code', { 'uid': data['uid'], 'code': data['code'] })
+ # token is consumed
+ with pytest.raises(ClientError) as e:
+ client.create_account("test.account3@test-auth", "", invite=code)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': -2,
+ 'error': 'Bad Request',
+ 'message': 'invite code not found'
+ }
+ # non-admin accounts can't generate tokens
+ with pytest.raises(ClientError) as e:
+ invite2 = Invite(c2.props['sessionToken'])
+ invite2.post_a('/generate', { 'ttl_hours': 1 })
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 110,
+ 'error': 'Unauthorized',
+ 'message': 'invalid authentication token'
+ }
+ finally:
+ c.destroy_account(c.email, "")
+ if c2 is not None:
+ c2.destroy_account(c2.email, "")
diff --git a/tests/test_auth_device.py b/tests/test_auth_device.py
new file mode 100644
index 0000000..8504ba7
--- /dev/null
+++ b/tests/test_auth_device.py
@@ -0,0 +1,434 @@
+import copy
+import pytest
+import random
+import time
+from fxa.errors import ClientError
+
+from api import *
+
+device_data = [
+ { "name": "testdev1", "type": "desktop", "availableCommands": { "a": "b" } },
+ { "name": "testdev2", "type": "desktop", "availableCommands": { "a": "b" } },
+]
+
+@pytest.fixture
+def populate_devices(account_or_rt, login):
+ devs = []
+ for (account, dev) in [(account_or_rt, device_data[0]), (login, device_data[1])]:
+ dev = account.post_a("/account/device", dev)
+ devs.append(dev)
+ return devs
+
+def test_create_noauth(client):
+ with pytest.raises(ClientError) as e:
+ client.post_a("/account/device", device_data[0])
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+@pytest.mark.parametrize("args,code,errno,error,message", [
+ ({ 'name': 'dev', 'availableCommands': {'a':1} },
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({ 'name': 'dev', 'availableCommands': {'a':1}, 'extra': 0 },
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({},
+ 400, 108, 'Bad Request', 'missing parameter in request body'),
+ ({ 'id': '00' * 16, 'name': 'dev' },
+ 400, 108, 'Bad Request', 'missing parameter in request body'),
+])
+def test_create_invalid(account_or_rt, args, code, errno, error, message):
+ with pytest.raises(ClientError) as e:
+ account_or_rt.post_a("/account/device", args)
+ assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message}
+
+def test_destroy_noauth(client, populate_devices):
+ with pytest.raises(ClientError) as e:
+ client.post_a("/account/device/destroy")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+ with pytest.raises(ClientError) as e:
+ client.post_a("/account/device/destroy", {'id': populate_devices[0]['id']})
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+@pytest.mark.parametrize("args,code,errno,error,message", [
+ ({ 'id': '00' },
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({ 'id': '00' * 16, 'extra': 0 },
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({ 'id': '00' * 16 },
+ 400, 123, 'Bad Request', 'unknown device'),
+])
+def test_destroy_invalid(account_or_rt, args, code, errno, error, message):
+ with pytest.raises(ClientError) as e:
+ account_or_rt.post_a("/account/device/destroy", args)
+ assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message}
+
+def test_create_destroy(account_or_rt):
+ dev = account_or_rt.post_a("/account/device", device_data[0])
+ account_or_rt.post_a("/account/device/destroy", {'id': dev['id']})
+
+def test_create_unverified(unverified_account):
+ unverified_account.post_a("/account/device", device_data[0])
+
+def test_list_noauth(client):
+ with pytest.raises(ClientError) as e:
+ client.get("/account/devices")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_list_unverified(unverified_account):
+ with pytest.raises(ClientError) as e:
+ unverified_account.get_a("/account/devices")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+
+def test_list_none(account_or_rt):
+ devs = account_or_rt.get_a("/account/devices")
+ assert devs == []
+
+@pytest.mark.usefixtures("populate_devices")
+def test_list(account_or_rt):
+ devs = account_or_rt.get_a("/account/devices")
+ assert len(devs) == 2
+ (first, second) = (0, 1) if devs[0]['name'] == 'testdev1' else (1, 0)
+ assert devs[first]['isCurrentDevice']
+ assert not devs[second]['isCurrentDevice']
+ for (k, v) in device_data[first].items():
+ assert devs[0][k] == v
+ for (k, v) in device_data[second].items():
+ assert devs[1][k] == v
+
+def test_change(account_or_rt, populate_devices):
+ devs1 = populate_devices
+ devs = copy.deepcopy(devs1)
+ (devs[0]['name'], devs[1]['name']) = (devs[1]['name'], devs[0]['name'])
+ (devs[0]['pushCallback'], devs[1]['pushCallback']) = (devs[1]['pushCallback'] or "", devs[0]['pushCallback'] or "")
+ (devs[0]['pushPublicKey'], devs[1]['pushPublicKey']) = (devs[1]['pushPublicKey'] or "", devs[0]['pushPublicKey'] or "")
+ (devs[0]['pushAuthKey'], devs[1]['pushAuthKey']) = (devs[1]['pushAuthKey'] or "", devs[0]['pushAuthKey'] or "")
+ for dev in devs:
+ del dev['isCurrentDevice']
+ del dev['lastAccessTime']
+ del dev['pushEndpointExpired']
+ account_or_rt.post_a("/account/device", dev)
+ devs2 = account_or_rt.get_a("/account/devices")
+ mdevs1 = {
+ devs1[0]['id']: devs1[0],
+ devs1[1]['id']: devs1[1],
+ }
+ mdevs2 = {
+ devs2[0]['id']: devs2[0],
+ devs2[1]['id']: devs2[1],
+ }
+ (id1, id2) = mdevs1.keys()
+ for (i1, i2) in [(id1, id2), (id2, id1)]:
+ assert mdevs1[i1]['name'] == mdevs2[i2]['name']
+ assert mdevs1[i1]['pushCallback'] or '' == mdevs2[i2]['pushCallback'] or ''
+ assert mdevs1[i1]['pushPublicKey'] or '' == mdevs2[i2]['pushPublicKey'] or ''
+ assert mdevs1[i1]['pushAuthKey'] or '' == mdevs2[i2]['pushAuthKey'] or ''
+
+def test_invoke_noauth(client):
+ body = {"target": "0" * 32, "command": "foo", "payload": {}, "ttl": 10}
+ with pytest.raises(ClientError) as e:
+ client.post_a("/account/devices/invoke_command", body)
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_invoke_unverified(unverified_account):
+ body = {"target": "0" * 32, "command": "foo", "payload": {}, "ttl": 10}
+ with pytest.raises(ClientError) as e:
+ unverified_account.post_a("/account/devices/invoke_command", body)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+
+@pytest.mark.parametrize("args,code,errno,error,message", [
+ ({"target": "00", "command": "foo", "payload": {}, "ttl": 10},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({"target": "00" * 16, "command": "foo", "payload": {}, "ttl": 10, 'extra': 0},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({"target": "0" * 32, "command": "foo", "payload": {}, "ttl": 10},
+ 400, 123, 'Bad Request', 'unknown device'),
+])
+def test_invoke_invalid(account_or_rt, args, code, errno, error, message):
+ with pytest.raises(ClientError) as e:
+ account_or_rt.post_a("/account/devices/invoke_command", args)
+ assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message}
+
+def test_invoke_nocmd(account_or_rt, populate_devices):
+ body = {"target": populate_devices[0]['id'], "command": "foo", "payload": {}, "ttl": 10}
+ with pytest.raises(ClientError) as e:
+ account_or_rt.post_a("/account/devices/invoke_command", body)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 157,
+ 'error': 'Bad Request',
+ 'message': 'unavailable device command'
+ }
+
+def test_invoke_other_account(account_or_rt, account2):
+ dev = account2.post_a("/account/device", device_data[0])
+ body = {"target": dev['id'], "command": "foo", "payload": {}, "ttl": 10}
+ with pytest.raises(ClientError) as e:
+ account_or_rt.post_a("/account/devices/invoke_command", body)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 123,
+ 'error': 'Bad Request',
+ 'message': 'unknown device'
+ }
+
+@pytest.mark.parametrize("ttl", [None, 1, 60, 999999999])
+@pytest.mark.parametrize("has_sender", [False, True])
+def test_invoke(account_or_rt, login, ttl, has_sender):
+ sender = account_or_rt.post_a("/account/device", device_data[1])['id'] if has_sender else None
+ dev = login.post_a("/account/device", device_data[0])
+
+ body = {
+ "target": dev['id'],
+ "command": "a",
+ "payload": { "data": str(random.random()), },
+ "ttl": ttl,
+ }
+ resp = account_or_rt.post_a("/account/devices/invoke_command", body)
+ assert resp['enqueued']
+ assert not resp['notified']
+ assert resp['notifyError'] == "no push callback"
+
+ cmd = login.get_a("/account/device/commands?index=0&limit=11")
+ assert cmd['last']
+ assert len(cmd['messages']) == 1
+ assert cmd['messages'][0]['data']['command'] == 'a'
+ assert cmd['messages'][0]['data']['payload'] == body['payload']
+ assert cmd['messages'][0]['data']['sender'] == sender
+
+def test_commands_noauth(client):
+ with pytest.raises(ClientError) as e:
+ client.get_a("/account/device/commands?index=1")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_commands_unverified(unverified_account):
+ with pytest.raises(ClientError) as e:
+ unverified_account.get_a("/account/device/commands?index=1")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+
+def test_commands_nodev(account_or_rt):
+ with pytest.raises(ClientError) as e:
+ account_or_rt.get_a("/account/device/commands?index=1")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 123,
+ 'error': 'Bad Request',
+ 'message': 'unknown device'
+ }
+
+@pytest.mark.parametrize('n_cmds,offset,limit', [
+ (1, 0, 0),
+ (1, 0, 1),
+ (1, 0, 100),
+ (1, 1, 0),
+ (1, 1, 1),
+ (1, 1, 100),
+ (2, 0, 0),
+ (2, 0, 1),
+ (2, 0, 100),
+ (2, 1, 0),
+ (2, 1, 1),
+ (2, 1, 100),
+ (101, 0, 100),
+ (101, 10, 100),
+])
+def test_device_commands_list(account_or_rt, login, n_cmds, offset, limit):
+ account_or_rt.post_a("/account/device", device_data[1])['id']
+ dev = login.post_a("/account/device", device_data[0])
+
+ bodies = [ {
+ "target": dev['id'],
+ "command": "a",
+ "payload": { "data": str(i), },
+ } for i in range(0, n_cmds) ]
+
+ for b in bodies:
+ resp = account_or_rt.post_a("/account/devices/invoke_command", b)
+ assert resp['enqueued']
+ assert not resp['notified']
+ assert resp['notifyError'] == "no push callback"
+
+ cmd = login.get_a("/account/device/commands", params={ "index": 0, "limit": 1 })
+ assert cmd == login.get_a("/account/device/commands", params={ "index": 0, "limit": 1 })
+
+ first_index = cmd['index']
+ cmds = login.get_a("/account/device/commands", params={ "limit": limit, "index": first_index + offset })
+ assert cmds['last'] == (offset + limit >= n_cmds)
+ assert len(cmds['messages']) == min(max(n_cmds - offset, 0), limit)
+
+def test_attached_clients_unauth(client):
+ with pytest.raises(ClientError) as e:
+ client.get_a("/account/attached_clients")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_attached_clients_unverified(unverified_account):
+ with pytest.raises(ClientError) as e:
+ unverified_account.get_a("/account/attached_clients")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+
+def test_attached_clients_badauth(refresh_token):
+ with pytest.raises(ClientError) as e:
+ refresh_token.get_a("/account/attached_clients")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_attached_clients(account, refresh_token, push_server):
+ dev1 = Device(account, "dev1")
+ dev2 = Device(refresh_token, "dev2", "mobile")
+ # filter potential login-only sessions
+ devs = [ d for d in account.get_a("/account/attached_clients") if d['name'] != None ]
+ assert len(devs) == 2
+ devs = {
+ devs[0]['name']: devs[0],
+ devs[1]['name']: devs[1],
+ }
+ assert devs['dev1']['deviceId'] == dev1.id
+ assert devs['dev1']['sessionTokenId'] != None
+ assert devs['dev1']['refreshTokenId'] == None
+ assert devs['dev1']['isCurrentSession']
+ assert devs['dev1']['deviceType'] == "desktop"
+ assert (time.time() - devs['dev1']['createdTime']) < 10
+ assert (time.time() - devs['dev1']['lastAccessTime']) < 10
+ assert devs['dev1']['scope'] == None
+ #
+ assert devs['dev2']['deviceId'] == dev2.id
+ assert devs['dev2']['sessionTokenId'] != None
+ assert devs['dev2']['refreshTokenId'] != None
+ assert not devs['dev2']['isCurrentSession']
+ assert devs['dev2']['deviceType'] == "mobile"
+ assert (time.time() - devs['dev2']['createdTime']) < 10
+ assert (time.time() - devs['dev2']['lastAccessTime']) < 10
+ assert devs['dev2']['scope'] == "profile https://identity.mozilla.com/apps/oldsync"
+
+def test_attached_client_destroy_unauth(client):
+ with pytest.raises(ClientError) as e:
+ client.post_a("/account/attached_client/destroy")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_attached_client_destroy_unverified(unverified_account):
+ with pytest.raises(ClientError) as e:
+ unverified_account.post_a("/account/attached_client/destroy")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+
+def test_attached_client_destroy_badauth(refresh_token):
+ with pytest.raises(ClientError) as e:
+ refresh_token.post_a("/account/attached_client/destroy")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+@pytest.mark.parametrize("args,code,errno,error,message", [
+ ({"sessionTokenId": "0"},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({"refreshTokenId": "0"},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({"deviceId": "0"},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({"sessionTokenId": "00" * 16, 'extra': 0},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({"sessionTokenId": "00" * 16, 'refreshTokenId': "00" * 16},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({"sessionTokenId": "00" * 16, 'refreshTokenId': "00" * 16, 'deviceId': "00" * 16},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({'refreshTokenId': "00" * 16, 'deviceId': "00" * 16},
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+])
+def test_attached_client_destroy_invalid(account, args, code, errno, error, message):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/account/attached_client/destroy", args)
+ assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message}
+
+@pytest.mark.parametrize("fn", [
+ (lambda d: {'sessionTokenId': d['sessionTokenId']}),
+ (lambda d: {'refreshTokenId': d['refreshTokenId']}),
+ (lambda d: {'deviceId': d['deviceId']}),
+], ids=["session", "refresh", "device"])
+def test_attached_client_destroy(account, refresh_token, fn):
+ Device(refresh_token, "dev2")
+ devs = account.get_a("/account/attached_clients")
+ account.post_a("/account/attached_client/destroy", fn(devs[0]))
+ devs2 = account.get_a("/account/attached_clients")
+ assert len(devs2) == len(devs) - 1
+
+def test_attached_client_destroy_push(account, refresh_token, push_server):
+ dev = Device(account, "dev")
+ dev2 = Device(refresh_token, "dev2")
+ dev.update_pcb(push_server.good("4ed8d1d3-e756-4866-9169-aafe612eb1e9"))
+ account.post_a("/account/attached_client/destroy", { 'deviceId': dev2.id })
+ p = push_server.wait()
+ assert p[0] == "/4ed8d1d3-e756-4866-9169-aafe612eb1e9"
+ assert dev.decrypt(p[2]) == {
+ 'command': 'fxaccounts:device_disconnected',
+ 'data': {'id': dev2.id},
+ 'version': 1
+ }
+ assert push_server.done()
diff --git a/tests/test_auth_email.py b/tests/test_auth_email.py
new file mode 100644
index 0000000..319f2d4
--- /dev/null
+++ b/tests/test_auth_email.py
@@ -0,0 +1,96 @@
+import pytest
+from fxa.errors import ClientError
+
+from api import *
+
+def test_status_noauth(client, refresh_token):
+ with pytest.raises(ClientError) as e:
+ client.post_a("/recovery_email/status")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+ with pytest.raises(ClientError) as e:
+ refresh_token.post_a("/recovery_email/status")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_status_unverified(unverified_account):
+ resp = unverified_account.get_a("/recovery_email/status")
+ assert resp == {
+ 'email': unverified_account.email,
+ 'emailVerified': False,
+ 'sessionVerified': False,
+ 'verified': False
+ }
+
+def test_status_verified(account):
+ resp = account.get_a("/recovery_email/status")
+ assert resp == {
+ 'email': account.email,
+ 'emailVerified': True,
+ 'sessionVerified': True,
+ 'verified': True
+ }
+
+@pytest.mark.parametrize("args,code,errno,error,message", [
+ ({ 'uid': '00', 'code': "" },
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({ 'id': '00' * 16, 'code': 0 },
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+ ({ 'id': '00' * 16, 'code': "", 'extra': 0 },
+ 400, 107, 'Bad Request', 'invalid parameter in request body'),
+])
+def test_verify_code_invalid(unverified_account, args, code, errno, error, message):
+ with pytest.raises(ClientError) as e:
+ unverified_account.post_a("/recovery_email/verify_code", args)
+ assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message}
+
+def test_verify_code(account):
+ # fixture does all the work
+ pass
+
+def test_verify_code_reuse(client, mail_server):
+ s = client.create_account("test@test", "")
+ (to, body) = mail_server.wait()
+ assert to == ["test@test"]
+ data = json.loads(base64.urlsafe_b64decode(body.split("#/verify/", maxsplit=1)[1]).decode('utf8'))
+ s.post_a('/recovery_email/verify_code', { 'uid': data['uid'], 'code': data['code'] })
+ with pytest.raises(ClientError) as e:
+ s.post_a('/recovery_email/verify_code', { 'uid': data['uid'], 'code': data['code'] })
+ s.destroy_account("test@test", "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 105,
+ 'error': 'Bad Request',
+ 'message': 'invalid verification code'
+ }
+
+def test_resend_code(client, mail_server):
+ s = client.create_account("test@test", "")
+ (to, body) = mail_server.wait()
+ assert to == ["test@test"]
+ data = json.loads(base64.urlsafe_b64decode(body.split("#/verify/", maxsplit=1)[1]).decode('utf8'))
+ s.post_a('/recovery_email/resend_code', {})
+ (to2, body2) = mail_server.wait()
+ assert to == to2
+ assert body == body2
+ s.post_a('/recovery_email/verify_code', { 'uid': data['uid'], 'code': data['code'] })
+ with pytest.raises(ClientError) as e:
+ s.post_a('/recovery_email/resend_code', {})
+ (to, body) = mail_server.wait()
+ assert to == ["test@test"]
+ data = json.loads(base64.urlsafe_b64decode(body.split("#/verify/", maxsplit=1)[1]).decode('utf8'))
+ s.destroy_account("test@test", "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 105,
+ 'error': 'Bad Request',
+ 'message': 'invalid verification code'
+ }
diff --git a/tests/test_auth_oauth.py b/tests/test_auth_oauth.py
new file mode 100644
index 0000000..f8ad201
--- /dev/null
+++ b/tests/test_auth_oauth.py
@@ -0,0 +1,369 @@
+import copy
+import pytest
+import time
+from fxa.errors import ClientError
+
+@pytest.mark.parametrize("client_id,scopes", [
+ # firefox
+ ("5882386c6d801776", "profile:write https://identity.mozilla.com/apps/oldsync https://identity.mozilla.com/tokens/session"),
+ # fenix
+ ("a2270f727f45f648", "profile https://identity.mozilla.com/apps/oldsync https://identity.mozilla.com/tokens/session"),
+])
+def test_oauth_client_scopes(account, client_id, scopes):
+ body = {
+ "client_id": client_id,
+ "ttl": 60,
+ "grant_type": "fxa-credentials",
+ "access_type": "online",
+ "scope": scopes,
+ }
+ s = account.post_a("/oauth/token", body)['access_token']
+ account.post_a("/oauth/destroy", { "client_id": client_id, "token": s })
+
+def test_oauth_authorization_noauth(account):
+ body = {
+ "client_id": "5882386c6d801776",
+ "ttl": 60,
+ "grant_type": "fxa-credentials",
+ "access_type": "online",
+ "scope": "profile",
+ }
+ with pytest.raises(ClientError) as e:
+ account.post("/oauth/authorization", body)
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_oauth_authorization_unverified(unverified_account):
+ body = {
+ "client_id": "5882386c6d801776",
+ "ttl": 60,
+ "grant_type": "fxa-credentials",
+ "access_type": "online",
+ "scope": "profile",
+ }
+ with pytest.raises(ClientError) as e:
+ unverified_account.post_a("/oauth/authorization", body)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+
+@pytest.mark.parametrize("args", [
+ { "client_id": "5882386c6d801776", "state": "", "scope": "profile", "access_type": "invalid",
+ "code_challenge": "", "code_challenge_method": "S256", "response_type": "code" },
+ { "client_id": "5882386c6d801776", "state": "", "scope": "profile", "access_type": "online",
+ "code_challenge": "", "code_challenge_method": "invalid", "response_type": "code" },
+ { "client_id": "5882386c6d801776", "state": "", "scope": "profile", "access_type": "online",
+ "code_challenge": "", "code_challenge_method": "S256", "response_type": "invalid" },
+])
+def test_oauth_authorization_invalid(account, args):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/oauth/authorization", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_oauth_authorization_badclientid(account):
+ args = { "client_id": "invalid", "state": "", "scope": "profile", "access_type": "online",
+ "code_challenge": "", "code_challenge_method": "S256", "response_type": "code" }
+ with pytest.raises(ClientError) as e:
+ account.post_a("/oauth/authorization", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 162,
+ 'error': 'Bad Request',
+ 'message': 'unknown client_id'
+ }
+
+def test_oauth_authorization_badscope(account):
+ args = { "client_id": "5882386c6d801776", "state": "", "scope": "invalid", "access_type": "online",
+ "code_challenge": "", "code_challenge_method": "S256", "response_type": "code" }
+ with pytest.raises(ClientError) as e:
+ account.post_a("/oauth/authorization", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 169,
+ 'error': 'Bad Request',
+ 'message': 'requested scopes not allowed'
+ }
+
+# see below for combined /authorization + unauthed /token
+
+def test_oauth_destroy_notoken(account):
+ args = { "client_id": "5882386c6d801776", "token": "00" * 32 }
+ with pytest.raises(ClientError) as e:
+ account.post("/oauth/destroy", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_oauth_destroy_badclient(account, refresh_token):
+ args = { "client_id": "other", "token": refresh_token.bearer }
+ with pytest.raises(ClientError) as e:
+ account.post("/oauth/destroy", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_oauth_scoped_keys_badclient(account):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/account/scoped-key-data", {
+ "client_id": "invalid",
+ "scope": "https://identity.mozilla.com/apps/oldsync"
+ })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 162,
+ 'error': 'Bad Request',
+ 'message': 'unknown client_id'
+ }
+
+def test_oauth_scoped_keys_badscope(account):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/account/scoped-key-data", {
+ "client_id": "5882386c6d801776",
+ "scope": "scope"
+ })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 169,
+ 'error': 'Bad Request',
+ 'message': 'requested scopes not allowed'
+ }
+
+def test_oauth_scoped_unverified(unverified_account):
+ with pytest.raises(ClientError) as e:
+ unverified_account.post_a("/account/scoped-key-data", {
+ "client_id": "5882386c6d801776",
+ "scope": "https://identity.mozilla.com/apps/oldsync"
+ })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+
+def test_oauth_scoped_keys(account):
+ resp = account.post_a("/account/scoped-key-data", {
+ "client_id": "5882386c6d801776",
+ "scope": "https://identity.mozilla.com/apps/oldsync"
+ })
+ assert resp == {
+ "https://identity.mozilla.com/apps/oldsync": {
+ "identifier": "https://identity.mozilla.com/apps/oldsync",
+ "keyRotationSecret": "00" * 32,
+ "keyRotationTimestamp": 0,
+ },
+ }
+
+@pytest.mark.parametrize("access_type", ["online", "offline"])
+def test_oauth_token_fxa_badclient(account, access_type):
+ body = { "client_id": "invalid", "ttl": 60, "grant_type": "fxa-credentials",
+ "access_type": access_type, "scope": "profile" }
+ with pytest.raises(ClientError) as e:
+ account.post_a("/oauth/token", body)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 162,
+ 'error': 'Bad Request',
+ 'message': 'unknown client_id'
+ }
+
+@pytest.mark.parametrize("access_type", ["online", "offline"])
+def test_oauth_token_fxa_badscope(account, access_type):
+ body = { "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "fxa-credentials",
+ "access_type": access_type, "scope": "scope" }
+ with pytest.raises(ClientError) as e:
+ account.post_a("/oauth/token", body)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 169,
+ 'error': 'Bad Request',
+ 'message': 'requested scopes not allowed'
+ }
+
+@pytest.mark.parametrize("grant_type,access_type", [
+ ("authorization_code", "online"),
+ ("refresh_token", "online"),
+ ("fxa-credentials", "foo"),
+])
+def test_oauth_token_fxa_invalid(account, grant_type, access_type):
+ body = { "client_id": "5882386c6d801776", "ttl": 60, "grant_type": grant_type,
+ "access_type": access_type, "scope": "scope" }
+ with pytest.raises(ClientError) as e:
+ account.post_a("/oauth/token", body)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_oauth_token_unverified(unverified_account):
+ body = { "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "fxa-credentials",
+ "access_type": "online", "scope": "profile" }
+ with pytest.raises(ClientError) as e:
+ unverified_account.post_a("/oauth/token", body)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 138,
+ 'error': 'Bad Request',
+ 'message': 'unverified session'
+ }
+
+@pytest.fixture
+def auth_code(account):
+ body = {
+ "client_id": "5882386c6d801776",
+ "state": "test",
+ "scope": "profile",
+ "access_type": "online",
+ "code_challenge": "n4bQgYhMfWWaL-qgxVrQFaO_TxsrC4Is0V1sFbDwCgg", # "test"
+ "code_challenge_method": "S256",
+ "response_type": "code",
+ }
+ return account.post_a("/oauth/authorization", body)['code']
+
+@pytest.mark.parametrize("args,code,error,errno,message", [
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "authorization_code",
+ "code": "invalid", "code_verifier": "test" },
+ 400, 'Bad Request', 107, 'invalid parameter in request body'),
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "authorization_code",
+ "code_verifier": "test", "extra": 0 },
+ 400, 'Bad Request', 107, 'invalid parameter in request body'),
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "authorization_code",
+ "code": "00" * 32, "code_verifier": "test" },
+ 401, 'Unauthorized', 110, 'invalid authentication token'),
+ ({ "client_id": "invalid", "ttl": 60, "grant_type": "authorization_code",
+ "code_verifier": "test" },
+ 400, 'Bad Request', 162, 'unknown client_id'),
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "authorization_code",
+ "code_verifier": "invalid" },
+ 400, 'Bad Request', 107, 'invalid parameter in request body'),
+])
+def test_oauth_token_other_invalidcode(account, args, code, error, errno, message, auth_code):
+ args = copy.deepcopy(args)
+ if 'code' not in args: args['code'] = auth_code
+ with pytest.raises(ClientError) as e:
+ account.post("/oauth/token", args)
+ assert e.value.details == { 'code': code, 'errno': errno, 'error': error, 'message': message }
+
+@pytest.mark.parametrize("args,code,error,errno,message", [
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "fxa-credentials",
+ "scope": "profile", "access_type": "online" },
+ 400, 'Bad Request', 107, 'invalid parameter in request body'),
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "refresh_token",
+ "refresh_token": "invalid", "code_verifier": "test" },
+ 400, 'Bad Request', 107, 'invalid parameter in request body'),
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "refresh_token",
+ "scope": "profile", "extra": 0 },
+ 400, 'Bad Request', 107, 'invalid parameter in request body'),
+ ({ "client_id": "invalid", "ttl": 60, "grant_type": "refresh_token",
+ "scope": "profile" },
+ 400, 'Bad Request', 162, 'unknown client_id'),
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "refresh_token",
+ "scope": "foo" },
+ 400, 'Bad Request', 169, 'requested scopes not allowed'),
+ ({ "client_id": "5882386c6d801776", "ttl": 60, "grant_type": "refresh_token",
+ "scope": "profile:write" },
+ 400, 'Bad Request', 169, 'requested scopes not allowed'),
+])
+def test_oauth_token_other_invalidrefresh(account, args, code, error, errno, message, refresh_token):
+ args = copy.deepcopy(args)
+ if 'refresh_token' not in args: args['refresh_token'] = refresh_token.bearer
+ with pytest.raises(ClientError) as e:
+ account.post("/oauth/token", args)
+ assert e.value.details == { 'code': code, 'errno': errno, 'error': error, 'message': message }
+
+@pytest.mark.parametrize("refresh", [False, True])
+def test_oauth_fxa(account, refresh):
+ body = {
+ "client_id": "5882386c6d801776",
+ "ttl": 60,
+ "grant_type": "fxa-credentials",
+ "access_type": "offline" if refresh else "online",
+ "scope": "profile",
+ }
+ resp = account.post_a("/oauth/token", body)
+
+ assert 'access_token' in resp
+ assert ('refresh_token' in resp) == refresh
+ assert 'session_token' not in resp
+ assert resp['scope'] == 'profile'
+ assert resp['token_type'] == 'bearer'
+ assert resp['expires_in'] <= 60
+ assert (resp['auth_at'] - time.time()) < 10
+ assert 'keys_jwe' not in resp
+
+@pytest.mark.parametrize("keys_jwe", [None, "keyskeyskeys"])
+@pytest.mark.parametrize("refresh,session", [
+ (False, False),
+ (True, False),
+ (True, True),
+])
+def test_oauth_auth_code(account, keys_jwe, refresh, session):
+ scope = "profile" + (" https://identity.mozilla.com/tokens/session" if session else "")
+ body = {
+ "client_id": "5882386c6d801776",
+ "state": "test",
+ "keys_jwe": keys_jwe,
+ "scope": scope,
+ "access_type": "offline" if refresh else "online",
+ "code_challenge": "n4bQgYhMfWWaL-qgxVrQFaO_TxsrC4Is0V1sFbDwCgg", # "test"
+ "code_challenge_method": "S256",
+ "response_type": "code",
+ }
+ resp = account.post_a("/oauth/authorization", body)
+ assert resp['state'] == "test"
+
+ body = {
+ "client_id": "5882386c6d801776",
+ "ttl": 60,
+ "grant_type": "authorization_code",
+ "code": resp['code'],
+ "code_verifier": "test",
+ }
+ resp = account.post("/oauth/token", body)
+ assert 'access_token' in resp
+ assert ('refresh_token' in resp) == refresh
+ assert ('session_token' in resp) == session
+ assert resp['scope'] == 'profile'
+ assert resp['token_type'] == 'bearer'
+ assert resp['expires_in'] <= 60
+ assert (resp['auth_at'] - time.time()) < 10
+ assert keys_jwe is None or (resp['keys_jwe'] == keys_jwe)
+
+def test_oauth_refresh(account, refresh_token):
+ body = {
+ "client_id": "5882386c6d801776",
+ "ttl": 60,
+ "grant_type": "refresh_token",
+ "refresh_token": refresh_token.bearer,
+ "scope": "profile",
+ }
+ resp = account.post("/oauth/token", body)
+
+ assert 'access_token' in resp
+ assert 'refresh_token' not in resp
+ assert 'session_token' not in resp
+ assert resp['scope'] == 'profile'
+ assert resp['token_type'] == 'bearer'
+ assert resp['expires_in'] <= 60
+ assert (resp['auth_at'] - time.time()) < 10
+ assert 'keys_jwe' not in resp
diff --git a/tests/test_auth_password.py b/tests/test_auth_password.py
new file mode 100644
index 0000000..7c2064a
--- /dev/null
+++ b/tests/test_auth_password.py
@@ -0,0 +1,211 @@
+import pytest
+from fxa.crypto import derive_key, quick_stretch_password
+from fxa.errors import ClientError
+
+from api import *
+
+@pytest.mark.parametrize("args", [
+ { 'email': "", 'oldAuthPW': '00' * 32 },
+ { 'email': "test0@test", 'oldAuthPW': '00' },
+ { 'email': "test0@test", 'oldAuthPW': '00' * 32, 'extra': 0 },
+])
+def test_change_start_invalid(account, args):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/password/change/start", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_change_start_badaccount(account):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/password/change/start", { 'email': "test0@test", 'oldAuthPW': '00' * 32 })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 102,
+ 'error': 'Bad Request',
+ 'message': 'unknown account'
+ }
+ with pytest.raises(ClientError) as e:
+ account.post_a("/password/change/start", { 'email': account.email.upper(), 'oldAuthPW': '00' * 32 })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 120,
+ 'error': 'Bad Request',
+ 'message': 'incorrect email case'
+ }
+
+def test_change_start_unverified(unverified_account):
+ with pytest.raises(ClientError) as e:
+ unverified_account.post_a("/password/change/start", {
+ 'email': unverified_account.email,
+ 'oldAuthPW': '00' * 32
+ })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 104,
+ 'error': 'Bad Request',
+ 'message': 'unverified account'
+ }
+
+def test_change_start_badpw(account):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/password/change/start", { 'email': account.email, 'oldAuthPW': '00' * 32 })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 103,
+ 'error': 'Bad Request',
+ 'message': 'incorrect password'
+ }
+
+@pytest.fixture
+def change_token(account):
+ pw = auth_pw(account.email, "")
+ resp = account.post_a("/password/change/start", { 'email': account.email, 'oldAuthPW': pw })
+ assert 'keyFetchToken' in resp
+ return PasswordChange(account.client, resp['passwordChangeToken'])
+
+@pytest.mark.parametrize("args", [
+ { 'authPW': '00', 'wrapKb': '00' * 32, 'sessionToken': '00' * 32, },
+ { 'authPW': '00' * 32, 'wrapKb': '00', 'sessionToken': '00' * 32, },
+ { 'authPW': '00' * 32, 'wrapKb': '00' * 32, 'sessionToken': '00', },
+])
+def test_change_finish_invalid(change_token, args):
+ with pytest.raises(ClientError) as e:
+ change_token.post_a("/password/change/finish", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_change_finish(account, change_token, mail_server):
+ pw = auth_pw(account.email, "new")
+ change_token.post_a("/password/change/finish", {
+ 'authPW': pw,
+ 'wrapKb': '00' * 32,
+ })
+ account.password = "new" # for fixture teardown
+ (to, body) = mail_server.wait()
+ assert account.email in to
+ assert 'password has been changed' in body
+
+ # just do a login test to see that the password was really changed
+ account.login(account.email, "new")
+ with pytest.raises(ClientError) as e:
+ account.login(account.email, "")
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 103,
+ 'error': 'Bad Request',
+ 'message': 'incorrect password'
+ }
+
+def test_change_finish_twice(account, change_token, mail_server):
+ pw = auth_pw(account.email, "new")
+ change_token.post_a("/password/change/finish", {
+ 'authPW': pw,
+ 'wrapKb': '00' * 32,
+ })
+ account.password = "new" # for fixture teardown
+
+ with pytest.raises(ClientError) as e:
+ change_token.post_a("/password/change/finish", {
+ 'authPW': pw,
+ 'wrapKb': '00' * 32,
+ })
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+@pytest.mark.parametrize("args", [
+ { 'email': "" },
+ { 'email': "test0@test", 'extra': 0 },
+])
+def test_forgot_start_invalid(account, args):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/password/forgot/send_code", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_change_forgot_badaccount(account):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/password/forgot/send_code", { 'email': "test0@test" })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 102,
+ 'error': 'Bad Request',
+ 'message': 'unknown account'
+ }
+ with pytest.raises(ClientError) as e:
+ account.post_a("/password/forgot/send_code", { 'email': account.email.upper() })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 120,
+ 'error': 'Bad Request',
+ 'message': 'incorrect email case'
+ }
+
+def test_change_forgot_unverified(unverified_account):
+ with pytest.raises(ClientError) as e:
+ unverified_account.post_a("/password/forgot/send_code", { 'email': unverified_account.email })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 104,
+ 'error': 'Bad Request',
+ 'message': 'unverified account'
+ }
+
+@pytest.mark.parametrize("args", [
+ { 'code': '', 'extra': 0, },
+])
+def test_forgot_finish_invalid(change_token, args):
+ with pytest.raises(ClientError) as e:
+ change_token.post_a("/password/forgot/send_code", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_forgot_finish_badcode(account, forgot_token, mail_server):
+ (to, body) = mail_server.wait()
+ assert account.email in to
+ with pytest.raises(ClientError) as e:
+ resp = forgot_token.post_a("/password/forgot/verify_code", { 'code': '' })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 105,
+ 'error': 'Bad Request',
+ 'message': 'invalid verification code'
+ }
+
+def test_forgot_finish(account, forgot_token, mail_server):
+ (to, body) = mail_server.wait()
+ assert account.email in to
+ resp = forgot_token.post_a("/password/forgot/verify_code", { 'code': body.strip() })
+ assert 'accountResetToken' in resp
+
+def test_forgot_finish_twice(account, forgot_token, mail_server):
+ (to, body) = mail_server.wait()
+ forgot_token.post_a("/password/forgot/verify_code", { 'code': body.strip() })
+
+ with pytest.raises(ClientError) as e:
+ forgot_token.post_a("/password/forgot/verify_code", { 'code': body.strip() })
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
diff --git a/tests/test_auth_session.py b/tests/test_auth_session.py
new file mode 100644
index 0000000..3a6e7c4
--- /dev/null
+++ b/tests/test_auth_session.py
@@ -0,0 +1,69 @@
+import pytest
+from fxa.errors import ClientError
+
+from api import *
+
+def test_session_loggedout(client):
+ with pytest.raises(ClientError) as e:
+ client.post("/session/destroy")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_status(account):
+ resp = account.get_a("/session/status")
+ assert resp == { 'state': '', 'uid': account.props['uid'] }
+
+def test_resend(account, mail_server):
+ c = account.login(account.email, "")
+ (to, body) = mail_server.wait()
+ assert to == [account.email]
+ c.post_a("/session/resend_code", {})
+ (to2, body2) = mail_server.wait()
+ assert to == to2
+ assert body == body2
+
+@pytest.mark.parametrize("args", [
+ { 'custom_session_id': '00' },
+ { 'extra': '00' },
+])
+def test_session_invalid(account, args):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/session/destroy", args)
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 107,
+ 'error': 'Bad Request',
+ 'message': 'invalid parameter in request body'
+ }
+
+def test_session_noid(account):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/session/destroy", { 'custom_session_id': '0' * 64 })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 123,
+ 'error': 'Bad Request',
+ 'message': 'unknown device'
+ }
+
+def test_session_destroy_other(account, account2):
+ with pytest.raises(ClientError) as e:
+ account.post_a("/session/destroy", { 'custom_session_id': account2.auth.id })
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 123,
+ 'error': 'Bad Request',
+ 'message': 'unknown device'
+ }
+
+def test_session_destroy_unverified(unverified_account):
+ unverified_account.destroy_session()
+ unverified_account.destroy_session = lambda *args: None
+
+def test_session_destroy(account):
+ s = account.login(account.email, "")
+ s.destroy_session()
diff --git a/tests/test_oauth.py b/tests/test_oauth.py
new file mode 100644
index 0000000..3eb32ac
--- /dev/null
+++ b/tests/test_oauth.py
@@ -0,0 +1,97 @@
+import pytest
+from fxa.errors import ClientError
+
+from api import *
+
+@pytest.fixture
+def oauth():
+ return Oauth()
+
+@pytest.fixture
+def access_token(account):
+ body = {
+ "client_id": "5882386c6d801776",
+ "ttl": 60,
+ "grant_type": "fxa-credentials",
+ "access_type": "online",
+ "scope": "profile",
+ }
+ resp = account.post_a("/oauth/token", body)
+ return resp['access_token']
+
+@pytest.mark.parametrize("args,code,errno,error,message", [
+ ({"access_token": "0"},
+ 400, 109, 'Bad Request', 'invalid request parameter'),
+ ({"refresh_token": "0"},
+ 400, 109, 'Bad Request', 'invalid request parameter'),
+ ({"token": "0"},
+ 400, 109, 'Bad Request', 'invalid request parameter'),
+])
+def test_destroy_invalid(oauth, args, code, errno, error, message):
+ with pytest.raises(ClientError) as e:
+ oauth.post("/destroy", args)
+ assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message}
+
+def test_destroy_access(oauth, access_token):
+ oauth.post("/verify", {'token': access_token})
+ oauth.post("/destroy", {'access_token': access_token})
+ with pytest.raises(ClientError) as e:
+ oauth.post("/verify", {'token': access_token})
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 109,
+ 'error': 'Bad Request',
+ 'message': 'invalid request parameter'
+ }
+
+def test_destroy_refresh(oauth, refresh_token):
+ refresh_token.get_a("/account/devices")
+ oauth.post("/destroy", {'refresh_token': refresh_token.bearer})
+ with pytest.raises(ClientError) as e:
+ refresh_token.get_a("/account/devices")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_destroy_any(oauth, access_token, refresh_token):
+ oauth.post("/verify", {'token': access_token})
+ oauth.post("/destroy", {'token': access_token})
+ with pytest.raises(ClientError) as e:
+ oauth.post("/verify", {'token': access_token})
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 109,
+ 'error': 'Bad Request',
+ 'message': 'invalid request parameter'
+ }
+
+ refresh_token.get_a("/account/devices")
+ oauth.post("/destroy", {'token': refresh_token.bearer})
+ with pytest.raises(ClientError) as e:
+ refresh_token.get_a("/account/devices")
+ assert e.value.details == {
+ 'code': 401,
+ 'errno': 109,
+ 'error': 'Unauthorized',
+ 'message': 'invalid request signature'
+ }
+
+def test_oauth_verify(account, oauth, access_token):
+ assert oauth.post("/verify", {'token': access_token}) == {
+ 'user': account.props['uid'],
+ 'client_id': "5882386c6d801776",
+ 'scope': ['profile'],
+ }
+
+def test_oauth_verify_refresh(oauth, refresh_token):
+ with pytest.raises(ClientError) as e:
+ oauth.post("/verify", {'token': refresh_token.bearer})
+ assert e.value.details == {
+ 'code': 400,
+ 'errno': 109,
+ 'error': 'Bad Request',
+ 'message': 'invalid request parameter'
+ }
diff --git a/tests/test_profile.py b/tests/test_profile.py
new file mode 100644
index 0000000..5e7308a
--- /dev/null
+++ b/tests/test_profile.py
@@ -0,0 +1,134 @@
+import pytest
+from fxa.errors import ClientError
+
+from api import *
+
+@pytest.fixture
+def profile(account):
+ return account.profile()
+
+def test_profile_noauth(profile):
+ with pytest.raises(ClientError) as e:
+ profile.get("/profile")
+ assert e.value.details == {
+ 'code': 403,
+ 'errno': 100,
+ 'error': 'Forbidden',
+ 'message': 'unauthorized'
+ }
+
+def test_display_name_noauth(profile):
+ with pytest.raises(ClientError) as e:
+ profile.post("/display_name", {'displayName': 'foo'})
+ assert e.value.details == {
+ 'code': 403,
+ 'errno': 100,
+ 'error': 'Forbidden',
+ 'message': 'unauthorized'
+ }
+
+def test_avatar_upload_noauth(profile):
+ with pytest.raises(ClientError) as e:
+ profile.post("/avatar/upload", "foo", headers={'content-type': 'image/png'})
+ assert e.value.details == {
+ 'code': 403,
+ 'errno': 100,
+ 'error': 'Forbidden',
+ 'message': 'unauthorized'
+ }
+
+def test_avatar_delete_noauth(profile):
+ with pytest.raises(ClientError) as e:
+ profile.delete("/avatar/00000000000000000000000000000000")
+ assert e.value.details == {
+ 'code': 403,
+ 'errno': 100,
+ 'error': 'Forbidden',
+ 'message': 'unauthorized'
+ }
+
+def test_profile(account, profile):
+ resp = profile.get_a("/profile")
+ assert resp == {
+ 'amrValues': None,
+ 'avatar': 'http://localhost:8000/avatars/00000000000000000000000000000000',
+ 'avatarDefault': True,
+ 'displayName': None,
+ 'email': account.email,
+ 'locale': None,
+ 'subscriptions': None,
+ 'twoFactorAuthentication': False,
+ 'uid': account.props['uid']
+ }
+
+def test_display_name(account, profile):
+ resp = profile.get_a("/profile")
+ assert resp == {
+ 'amrValues': None,
+ 'avatar': 'http://localhost:8000/avatars/00000000000000000000000000000000',
+ 'avatarDefault': True,
+ 'displayName': None,
+ 'email': account.email,
+ 'locale': None,
+ 'subscriptions': None,
+ 'twoFactorAuthentication': False,
+ 'uid': account.props['uid']
+ }
+ profile.post_a("/display_name", {'displayName': 'foo'})
+ resp = profile.get_a("/profile")
+ assert resp == {
+ 'amrValues': None,
+ 'avatar': 'http://localhost:8000/avatars/00000000000000000000000000000000',
+ 'avatarDefault': True,
+ 'displayName': 'foo',
+ 'email': account.email,
+ 'locale': None,
+ 'subscriptions': None,
+ 'twoFactorAuthentication': False,
+ 'uid': account.props['uid']
+ }
+
+def test_avatar(account, profile):
+ resp = profile.get_a("/avatar")
+ assert resp == {
+ 'avatar': 'http://localhost:8000/avatars/00000000000000000000000000000000',
+ 'avatarDefault': True,
+ 'id': '00000000000000000000000000000000'
+ }
+
+def test_avatar_upload(account, profile):
+ # server does not parse the bytes
+ profile.post_a("/avatar/upload", "foo", headers={'content-type': 'image/png'})
+ resp = profile.get_a("/avatar")
+ new_id = resp['id']
+ assert resp['avatar'] != 'http://localhost:8000/avatars/00000000000000000000000000000000'
+ assert not resp['avatarDefault']
+ assert resp['id'] != '00000000000000000000000000000000'
+ resp = profile.get_a("/profile")
+ assert resp['avatar'] != 'http://localhost:8000/avatars/00000000000000000000000000000000'
+ assert not resp['avatarDefault']
+
+def test_avatar_delete(account, profile):
+ # server does not parse the bytes
+ profile.post_a("/avatar/upload", "foo", headers={'content-type': 'image/png'})
+ resp = profile.get_a("/avatar")
+ new_id = resp['id']
+ profile.delete_a(f"/avatar/{new_id}")
+ resp = profile.get_a("/avatar")
+ assert resp == {
+ 'avatar': 'http://localhost:8000/avatars/00000000000000000000000000000000',
+ 'avatarDefault': True,
+ 'id': '00000000000000000000000000000000'
+ }
+ resp = profile.get_a("/profile")
+ assert resp == {
+ 'amrValues': None,
+ 'avatar': 'http://localhost:8000/avatars/00000000000000000000000000000000',
+ 'avatarDefault': True,
+ 'displayName': None,
+ 'email': account.email,
+ 'locale': None,
+ 'subscriptions': None,
+ 'twoFactorAuthentication': False,
+ 'uid': account.props['uid']
+ }
diff --git a/tests/test_push.py b/tests/test_push.py
new file mode 100644
index 0000000..e6d732a
--- /dev/null
+++ b/tests/test_push.py
@@ -0,0 +1,147 @@
+import pytest
+
+from api import *
+
+def test_account_destroy(account, push_server):
+ dev = Device(account, "dev", pcb=push_server.good("09d0114e-3b23-4ba3-8474-efac7433e3ba"))
+ account.destroy_account(account.email, "")
+ p = push_server.wait()
+ assert p[0] == "/09d0114e-3b23-4ba3-8474-efac7433e3ba"
+ assert dev.decrypt(p[2]) == {
+ 'command': 'fxaccounts:account_destroyed',
+ 'data': {'uid': account.props['uid']},
+ 'version': 1,
+ }
+ assert push_server.done()
+
+def test_account_verify(client, push_server, mail_server):
+ account = client.create_account("test@test", "")
+ try:
+ (to, body) = mail_server.wait()
+ assert to == ["test@test"]
+ data = json.loads(base64.urlsafe_b64decode(body.split("#/verify/", maxsplit=1)[1]).decode('utf8'))
+ dev = Device(account, "dev", pcb=push_server.good("8accbe08-4040-44c2-8fd9-cf2669b56cb1"))
+ account.post_a('/recovery_email/verify_code', { 'uid': data['uid'], 'code': data['code'] })
+ p = push_server.wait()
+ assert p[0] == "/8accbe08-4040-44c2-8fd9-cf2669b56cb1"
+ assert p[2] == b''
+ assert push_server.done()
+ finally:
+ account.destroy_account(account.email, "")
+
+def test_session_destroy(client, account, push_server):
+ dev1 = Device(account, "dev1")
+ session = client.login(account.email, "")
+ dev2 = Device(session, "dev2")
+ dev1.update_pcb(push_server.good("e6d21a00-9e5e-4d21-92bc-90860cba836e"))
+ session.destroy_session()
+ p = push_server.wait()
+ assert p[0] == "/e6d21a00-9e5e-4d21-92bc-90860cba836e"
+ assert dev1.decrypt(p[2]) == {
+ 'command': 'fxaccounts:device_disconnected',
+ 'data': {'id': dev2.id},
+ 'version': 1,
+ }
+ assert push_server.done()
+
+def test_device_connected(client, account, push_server):
+ dev1 = Device(account, "dev1", pcb=push_server.good("236b8205-daee-4879-b911-64b1f4fd8fd7"))
+ session = client.login(account.email, "")
+ dev2 = Device(session, "dev2")
+ p = push_server.wait()
+ assert p[0] == "/236b8205-daee-4879-b911-64b1f4fd8fd7"
+ assert dev1.decrypt(p[2]) == {
+ 'command': 'fxaccounts:device_connected',
+ 'data': {'deviceName': 'dev2'},
+ 'version': 1,
+ }
+ assert push_server.done()
+
+def test_device_invoke(account, login, push_server):
+ dev = Device(account, "dev1", commands={'a':'a'}, pcb=push_server.good("3610b071-e2ef-4daa-a4e3-eaa74e50f2a0"))
+ account.post_a("/account/devices/invoke_command", {
+ "target": dev.id,
+ "command": "a",
+ "payload": {"data": "foo"},
+ "ttl": 10,
+ })
+ p = push_server.wait()
+ assert p[0] == "/3610b071-e2ef-4daa-a4e3-eaa74e50f2a0"
+ msg = dev.decrypt(p[2])
+ # NOTE needed because index is unpredictable due to pg sequence use
+ del msg['data']['index']
+ del msg['data']['url']
+ assert msg == {
+ 'command': 'fxaccounts:command_received',
+ 'data': {'command': 'a', 'sender': dev.id},
+ 'version': 1,
+ }
+ assert push_server.done()
+
+def test_expiry(account, login, push_server):
+ dev = Device(account, "dev1", commands={'a':'a'}, pcb=push_server.bad("59ba8fcc-f3b0-4b1f-ac27-36d66e022d1e"))
+ account.post_a("/account/devices/invoke_command", {
+ "target": dev.id,
+ "command": "a",
+ "payload": {"data": "foo"},
+ "ttl": 10,
+ })
+ p = push_server.wait()
+ assert p[0] == "/err/59ba8fcc-f3b0-4b1f-ac27-36d66e022d1e"
+ account.post_a("/account/devices/invoke_command", {
+ "target": dev.id,
+ "command": "a",
+ "payload": {"data": "foo"},
+ "ttl": 10,
+ })
+ with pytest.raises(queue.Empty):
+ push_server.wait()
+ dev.update_pcb(push_server.good("59ba8fcc-f3b0-4b1f-ac27-36d66e022d1e"))
+ account.post_a("/account/devices/invoke_command", {
+ "target": dev.id,
+ "command": "a",
+ "payload": {"data": "foo"},
+ "ttl": 10,
+ })
+ p = push_server.wait()
+ assert p[0] == "/59ba8fcc-f3b0-4b1f-ac27-36d66e022d1e"
+ assert push_server.done()
+
+def test_device_notify(account, login, push_server):
+ dev1 = Device(account, "dev1")
+ dev2 = Device(login, "dev2")
+ dev1.update_pcb(push_server.good("738ac7e3-96ef-461c-880c-0af20e311354"))
+ dev2.update_pcb(push_server.good("85a98191-9486-46e2-877a-1152e3d4af4e"))
+ account.post_a("/account/devices/notify", {
+ "to": "all",
+ "_endpointAction": "accountVerify",
+ "excluded": [dev2.id],
+ "payload": {'a':1},
+ "TTL": 0,
+ })
+ p = push_server.wait()
+ assert p[0] == "/738ac7e3-96ef-461c-880c-0af20e311354"
+ assert dev1.decrypt(p[2]) == {'a':1}
+ account.post_a("/account/devices/notify", {
+ "to": [dev2.id],
+ "_endpointAction": "accountVerify",
+ "payload": {'a':2},
+ "TTL": 0,
+ })
+ p = push_server.wait()
+ assert p[0] == "/85a98191-9486-46e2-877a-1152e3d4af4e"
+ assert dev2.decrypt(p[2]) == {'a':2}
+ assert push_server.done()
+
+def test_profile(account, push_server):
+ dev = Device(account, "dev", pcb=push_server.good("12608154-8942-4f1c-9de2-a56465d27d6e"))
+ profile = account.profile()
+ profile.post_a("/display_name", {"displayName": "foo"})
+ p = push_server.wait()
+ assert p[0] == "/12608154-8942-4f1c-9de2-a56465d27d6e"
+ assert dev.decrypt(p[2]) == {'command': 'fxaccounts:profile_updated', 'version': 1}
+ profile.post_a("/avatar/upload", "doesn't parse the image", headers={'content-type': 'image/png'})
+ p = push_server.wait()
+ assert p[0] == "/12608154-8942-4f1c-9de2-a56465d27d6e"
+ assert dev.decrypt(p[2]) == {'command': 'fxaccounts:profile_updated', 'version': 1}
+ assert push_server.done()