From 2f8dce44d3f2be74b5c6ec0a2e7f4ceced715328 Mon Sep 17 00:00:00 2001 From: pennae Date: Wed, 13 Jul 2022 10:33:30 +0200 Subject: initial import --- tests/test_auth_device.py | 434 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 434 insertions(+) create mode 100644 tests/test_auth_device.py (limited to 'tests/test_auth_device.py') diff --git a/tests/test_auth_device.py b/tests/test_auth_device.py new file mode 100644 index 0000000..8504ba7 --- /dev/null +++ b/tests/test_auth_device.py @@ -0,0 +1,434 @@ +import copy +import pytest +import random +import time +from fxa.errors import ClientError + +from api import * + +device_data = [ + { "name": "testdev1", "type": "desktop", "availableCommands": { "a": "b" } }, + { "name": "testdev2", "type": "desktop", "availableCommands": { "a": "b" } }, +] + +@pytest.fixture +def populate_devices(account_or_rt, login): + devs = [] + for (account, dev) in [(account_or_rt, device_data[0]), (login, device_data[1])]: + dev = account.post_a("/account/device", dev) + devs.append(dev) + return devs + +def test_create_noauth(client): + with pytest.raises(ClientError) as e: + client.post_a("/account/device", device_data[0]) + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +@pytest.mark.parametrize("args,code,errno,error,message", [ + ({ 'name': 'dev', 'availableCommands': {'a':1} }, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({ 'name': 'dev', 'availableCommands': {'a':1}, 'extra': 0 }, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({}, + 400, 108, 'Bad Request', 'missing parameter in request body'), + ({ 'id': '00' * 16, 'name': 'dev' }, + 400, 108, 'Bad Request', 'missing parameter in request body'), +]) +def test_create_invalid(account_or_rt, args, code, errno, error, message): + with pytest.raises(ClientError) as e: + account_or_rt.post_a("/account/device", args) + assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message} + +def test_destroy_noauth(client, populate_devices): + with pytest.raises(ClientError) as e: + client.post_a("/account/device/destroy") + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + with pytest.raises(ClientError) as e: + client.post_a("/account/device/destroy", {'id': populate_devices[0]['id']}) + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +@pytest.mark.parametrize("args,code,errno,error,message", [ + ({ 'id': '00' }, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({ 'id': '00' * 16, 'extra': 0 }, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({ 'id': '00' * 16 }, + 400, 123, 'Bad Request', 'unknown device'), +]) +def test_destroy_invalid(account_or_rt, args, code, errno, error, message): + with pytest.raises(ClientError) as e: + account_or_rt.post_a("/account/device/destroy", args) + assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message} + +def test_create_destroy(account_or_rt): + dev = account_or_rt.post_a("/account/device", device_data[0]) + account_or_rt.post_a("/account/device/destroy", {'id': dev['id']}) + +def test_create_unverified(unverified_account): + unverified_account.post_a("/account/device", device_data[0]) + +def test_list_noauth(client): + with pytest.raises(ClientError) as e: + client.get("/account/devices") + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +def test_list_unverified(unverified_account): + with pytest.raises(ClientError) as e: + unverified_account.get_a("/account/devices") + assert e.value.details == { + 'code': 400, + 'errno': 138, + 'error': 'Bad Request', + 'message': 'unverified session' + } + +def test_list_none(account_or_rt): + devs = account_or_rt.get_a("/account/devices") + assert devs == [] + +@pytest.mark.usefixtures("populate_devices") +def test_list(account_or_rt): + devs = account_or_rt.get_a("/account/devices") + assert len(devs) == 2 + (first, second) = (0, 1) if devs[0]['name'] == 'testdev1' else (1, 0) + assert devs[first]['isCurrentDevice'] + assert not devs[second]['isCurrentDevice'] + for (k, v) in device_data[first].items(): + assert devs[0][k] == v + for (k, v) in device_data[second].items(): + assert devs[1][k] == v + +def test_change(account_or_rt, populate_devices): + devs1 = populate_devices + devs = copy.deepcopy(devs1) + (devs[0]['name'], devs[1]['name']) = (devs[1]['name'], devs[0]['name']) + (devs[0]['pushCallback'], devs[1]['pushCallback']) = (devs[1]['pushCallback'] or "", devs[0]['pushCallback'] or "") + (devs[0]['pushPublicKey'], devs[1]['pushPublicKey']) = (devs[1]['pushPublicKey'] or "", devs[0]['pushPublicKey'] or "") + (devs[0]['pushAuthKey'], devs[1]['pushAuthKey']) = (devs[1]['pushAuthKey'] or "", devs[0]['pushAuthKey'] or "") + for dev in devs: + del dev['isCurrentDevice'] + del dev['lastAccessTime'] + del dev['pushEndpointExpired'] + account_or_rt.post_a("/account/device", dev) + devs2 = account_or_rt.get_a("/account/devices") + mdevs1 = { + devs1[0]['id']: devs1[0], + devs1[1]['id']: devs1[1], + } + mdevs2 = { + devs2[0]['id']: devs2[0], + devs2[1]['id']: devs2[1], + } + (id1, id2) = mdevs1.keys() + for (i1, i2) in [(id1, id2), (id2, id1)]: + assert mdevs1[i1]['name'] == mdevs2[i2]['name'] + assert mdevs1[i1]['pushCallback'] or '' == mdevs2[i2]['pushCallback'] or '' + assert mdevs1[i1]['pushPublicKey'] or '' == mdevs2[i2]['pushPublicKey'] or '' + assert mdevs1[i1]['pushAuthKey'] or '' == mdevs2[i2]['pushAuthKey'] or '' + +def test_invoke_noauth(client): + body = {"target": "0" * 32, "command": "foo", "payload": {}, "ttl": 10} + with pytest.raises(ClientError) as e: + client.post_a("/account/devices/invoke_command", body) + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +def test_invoke_unverified(unverified_account): + body = {"target": "0" * 32, "command": "foo", "payload": {}, "ttl": 10} + with pytest.raises(ClientError) as e: + unverified_account.post_a("/account/devices/invoke_command", body) + assert e.value.details == { + 'code': 400, + 'errno': 138, + 'error': 'Bad Request', + 'message': 'unverified session' + } + +@pytest.mark.parametrize("args,code,errno,error,message", [ + ({"target": "00", "command": "foo", "payload": {}, "ttl": 10}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({"target": "00" * 16, "command": "foo", "payload": {}, "ttl": 10, 'extra': 0}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({"target": "0" * 32, "command": "foo", "payload": {}, "ttl": 10}, + 400, 123, 'Bad Request', 'unknown device'), +]) +def test_invoke_invalid(account_or_rt, args, code, errno, error, message): + with pytest.raises(ClientError) as e: + account_or_rt.post_a("/account/devices/invoke_command", args) + assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message} + +def test_invoke_nocmd(account_or_rt, populate_devices): + body = {"target": populate_devices[0]['id'], "command": "foo", "payload": {}, "ttl": 10} + with pytest.raises(ClientError) as e: + account_or_rt.post_a("/account/devices/invoke_command", body) + assert e.value.details == { + 'code': 400, + 'errno': 157, + 'error': 'Bad Request', + 'message': 'unavailable device command' + } + +def test_invoke_other_account(account_or_rt, account2): + dev = account2.post_a("/account/device", device_data[0]) + body = {"target": dev['id'], "command": "foo", "payload": {}, "ttl": 10} + with pytest.raises(ClientError) as e: + account_or_rt.post_a("/account/devices/invoke_command", body) + assert e.value.details == { + 'code': 400, + 'errno': 123, + 'error': 'Bad Request', + 'message': 'unknown device' + } + +@pytest.mark.parametrize("ttl", [None, 1, 60, 999999999]) +@pytest.mark.parametrize("has_sender", [False, True]) +def test_invoke(account_or_rt, login, ttl, has_sender): + sender = account_or_rt.post_a("/account/device", device_data[1])['id'] if has_sender else None + dev = login.post_a("/account/device", device_data[0]) + + body = { + "target": dev['id'], + "command": "a", + "payload": { "data": str(random.random()), }, + "ttl": ttl, + } + resp = account_or_rt.post_a("/account/devices/invoke_command", body) + assert resp['enqueued'] + assert not resp['notified'] + assert resp['notifyError'] == "no push callback" + + cmd = login.get_a("/account/device/commands?index=0&limit=11") + assert cmd['last'] + assert len(cmd['messages']) == 1 + assert cmd['messages'][0]['data']['command'] == 'a' + assert cmd['messages'][0]['data']['payload'] == body['payload'] + assert cmd['messages'][0]['data']['sender'] == sender + +def test_commands_noauth(client): + with pytest.raises(ClientError) as e: + client.get_a("/account/device/commands?index=1") + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +def test_commands_unverified(unverified_account): + with pytest.raises(ClientError) as e: + unverified_account.get_a("/account/device/commands?index=1") + assert e.value.details == { + 'code': 400, + 'errno': 138, + 'error': 'Bad Request', + 'message': 'unverified session' + } + +def test_commands_nodev(account_or_rt): + with pytest.raises(ClientError) as e: + account_or_rt.get_a("/account/device/commands?index=1") + assert e.value.details == { + 'code': 400, + 'errno': 123, + 'error': 'Bad Request', + 'message': 'unknown device' + } + +@pytest.mark.parametrize('n_cmds,offset,limit', [ + (1, 0, 0), + (1, 0, 1), + (1, 0, 100), + (1, 1, 0), + (1, 1, 1), + (1, 1, 100), + (2, 0, 0), + (2, 0, 1), + (2, 0, 100), + (2, 1, 0), + (2, 1, 1), + (2, 1, 100), + (101, 0, 100), + (101, 10, 100), +]) +def test_device_commands_list(account_or_rt, login, n_cmds, offset, limit): + account_or_rt.post_a("/account/device", device_data[1])['id'] + dev = login.post_a("/account/device", device_data[0]) + + bodies = [ { + "target": dev['id'], + "command": "a", + "payload": { "data": str(i), }, + } for i in range(0, n_cmds) ] + + for b in bodies: + resp = account_or_rt.post_a("/account/devices/invoke_command", b) + assert resp['enqueued'] + assert not resp['notified'] + assert resp['notifyError'] == "no push callback" + + cmd = login.get_a("/account/device/commands", params={ "index": 0, "limit": 1 }) + assert cmd == login.get_a("/account/device/commands", params={ "index": 0, "limit": 1 }) + + first_index = cmd['index'] + cmds = login.get_a("/account/device/commands", params={ "limit": limit, "index": first_index + offset }) + assert cmds['last'] == (offset + limit >= n_cmds) + assert len(cmds['messages']) == min(max(n_cmds - offset, 0), limit) + +def test_attached_clients_unauth(client): + with pytest.raises(ClientError) as e: + client.get_a("/account/attached_clients") + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +def test_attached_clients_unverified(unverified_account): + with pytest.raises(ClientError) as e: + unverified_account.get_a("/account/attached_clients") + assert e.value.details == { + 'code': 400, + 'errno': 138, + 'error': 'Bad Request', + 'message': 'unverified session' + } + +def test_attached_clients_badauth(refresh_token): + with pytest.raises(ClientError) as e: + refresh_token.get_a("/account/attached_clients") + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +def test_attached_clients(account, refresh_token, push_server): + dev1 = Device(account, "dev1") + dev2 = Device(refresh_token, "dev2", "mobile") + # filter potential login-only sessions + devs = [ d for d in account.get_a("/account/attached_clients") if d['name'] != None ] + assert len(devs) == 2 + devs = { + devs[0]['name']: devs[0], + devs[1]['name']: devs[1], + } + assert devs['dev1']['deviceId'] == dev1.id + assert devs['dev1']['sessionTokenId'] != None + assert devs['dev1']['refreshTokenId'] == None + assert devs['dev1']['isCurrentSession'] + assert devs['dev1']['deviceType'] == "desktop" + assert (time.time() - devs['dev1']['createdTime']) < 10 + assert (time.time() - devs['dev1']['lastAccessTime']) < 10 + assert devs['dev1']['scope'] == None + # + assert devs['dev2']['deviceId'] == dev2.id + assert devs['dev2']['sessionTokenId'] != None + assert devs['dev2']['refreshTokenId'] != None + assert not devs['dev2']['isCurrentSession'] + assert devs['dev2']['deviceType'] == "mobile" + assert (time.time() - devs['dev2']['createdTime']) < 10 + assert (time.time() - devs['dev2']['lastAccessTime']) < 10 + assert devs['dev2']['scope'] == "profile https://identity.mozilla.com/apps/oldsync" + +def test_attached_client_destroy_unauth(client): + with pytest.raises(ClientError) as e: + client.post_a("/account/attached_client/destroy") + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +def test_attached_client_destroy_unverified(unverified_account): + with pytest.raises(ClientError) as e: + unverified_account.post_a("/account/attached_client/destroy") + assert e.value.details == { + 'code': 400, + 'errno': 138, + 'error': 'Bad Request', + 'message': 'unverified session' + } + +def test_attached_client_destroy_badauth(refresh_token): + with pytest.raises(ClientError) as e: + refresh_token.post_a("/account/attached_client/destroy") + assert e.value.details == { + 'code': 401, + 'errno': 109, + 'error': 'Unauthorized', + 'message': 'invalid request signature' + } + +@pytest.mark.parametrize("args,code,errno,error,message", [ + ({"sessionTokenId": "0"}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({"refreshTokenId": "0"}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({"deviceId": "0"}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({"sessionTokenId": "00" * 16, 'extra': 0}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({"sessionTokenId": "00" * 16, 'refreshTokenId': "00" * 16}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({"sessionTokenId": "00" * 16, 'refreshTokenId': "00" * 16, 'deviceId': "00" * 16}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), + ({'refreshTokenId': "00" * 16, 'deviceId': "00" * 16}, + 400, 107, 'Bad Request', 'invalid parameter in request body'), +]) +def test_attached_client_destroy_invalid(account, args, code, errno, error, message): + with pytest.raises(ClientError) as e: + account.post_a("/account/attached_client/destroy", args) + assert e.value.details == {'code': code, 'errno': errno, 'error': error, 'message': message} + +@pytest.mark.parametrize("fn", [ + (lambda d: {'sessionTokenId': d['sessionTokenId']}), + (lambda d: {'refreshTokenId': d['refreshTokenId']}), + (lambda d: {'deviceId': d['deviceId']}), +], ids=["session", "refresh", "device"]) +def test_attached_client_destroy(account, refresh_token, fn): + Device(refresh_token, "dev2") + devs = account.get_a("/account/attached_clients") + account.post_a("/account/attached_client/destroy", fn(devs[0])) + devs2 = account.get_a("/account/attached_clients") + assert len(devs2) == len(devs) - 1 + +def test_attached_client_destroy_push(account, refresh_token, push_server): + dev = Device(account, "dev") + dev2 = Device(refresh_token, "dev2") + dev.update_pcb(push_server.good("4ed8d1d3-e756-4866-9169-aafe612eb1e9")) + account.post_a("/account/attached_client/destroy", { 'deviceId': dev2.id }) + p = push_server.wait() + assert p[0] == "/4ed8d1d3-e756-4866-9169-aafe612eb1e9" + assert dev.decrypt(p[2]) == { + 'command': 'fxaccounts:device_disconnected', + 'data': {'id': dev2.id}, + 'version': 1 + } + assert push_server.done() -- cgit v1.2.3