diff --git a/examples/settings/user_management.py b/examples/settings/user_management.py index 1171e81..d624350 100644 --- a/examples/settings/user_management.py +++ b/examples/settings/user_management.py @@ -12,39 +12,35 @@ print(usermgmt.users[0]) """ - username='justin' scope=['read', 'write', 'settings'] email='justin.jeffrey@ipfabric.io' user_id='1108612054' - local=True sso_provider=None domains='' custom_scope=True ldap_id=None + username='test' email='test@test.ipfabric.io' user_id='50632669251' local=True sso_provider=None domains='' + role_names=['admin'] role_ids=['admin'] ldap_id=None timezone='UTC' """ print() user = usermgmt.add_user(username='Test', email='test@ipfabric.io', password='8characters', - scope=['read', 'write', 'settings', 'team']) + roles=[usermgmt.roles[0].role_id]) print(user) """ - username='Test' scope=['read', 'write', 'settings', 'team'] email='test@ipfabric.io' user_id='1168572704' - local=None sso_provider=None domains=None custom_scope=True ldap_id=None + username='Test' email='test@ipfabric.io' user_id='52914335501' local=True sso_provider=None domains=None + role_names=['Network engineer'] role_ids=['Network engineer'] ldap_id=None timezone='UTC' + """ print() print(usermgmt.get_user_by_id(user_id=user.user_id)) """ - username='Test' scope=['read', 'write', 'settings', 'team'] email='test@ipfabric.io' user_id='1168572704' - local=None sso_provider=None domains=None custom_scope=True ldap_id=None + username='Test' email='test@ipfabric.io' user_id='52914335501' local=True sso_provider=None domains=None + role_names=['Network engineer'] role_ids=['Network engineer'] ldap_id=None timezone='UTC' """ print() print(usermgmt.get_users(username=user.username)) """ - [User(username='Test', scope=['read', 'write', 'settings', 'team'], email='test@ipfabric.io', - user_id='1168572704', local=True, sso_provider=None, domains='', custom_scope=True, ldap_id=None)] + [User(username='Test', email='test@ipfabric.io', user_id='52914337160', local=True, sso_provider=None, domains='', + role_names=['Network engineer'], role_ids=['Network engineer'], ldap_id=None, timezone='UTC')] """ print() print(usermgmt.delete_user(user_id=user.user_id)) - """ - [User(username='justin', scope=['read', 'write', 'settings'], email='justin.jeffrey@ipfabric.io', - user_id='1108612054', local=True, sso_provider=None, domains='', custom_scope=True, ldap_id=None), - User(username='vector', scope=['read', 'write', 'settings'], email='vector@vector.pl', user_id='1083776225', - local=True, sso_provider=None, domains='', custom_scope=True, ldap_id=None), ] - """ + print() \ No newline at end of file diff --git a/ipfabric/settings/user_mgmt.py b/ipfabric/settings/user_mgmt.py index daf05a7..d1ca143 100644 --- a/ipfabric/settings/user_mgmt.py +++ b/ipfabric/settings/user_mgmt.py @@ -9,22 +9,56 @@ class User(BaseModel): username: str - scope: list - email: str + email: Optional[str] user_id: str = Field(alias="id") local: Optional[bool] = Field(alias="isLocal") sso_provider: Optional[Any] = Field(alias="ssoProvider") domains: Optional[Any] = Field(alias="domainSuffixes") - custom_scope: bool = Field(alias="customScope") + role_names: Optional[list] = Field(alias="roleNames", default_factory=list) + role_ids: list = Field(alias="roleNames") ldap_id: Any = Field(alias="ldapId") - timezone: Optional[str] = None + timezone: str + + +class Role(BaseModel): + role_id: str = Field(alias="id") + name: str + description: Optional[str] + role_type: str = Field(alias="type") + admin: bool = Field(alias="isAdmin") + system: bool = Field(alias="isSystem") class UserMgmt: def __init__(self, client): self.client: Any = client + self.roles = self.get_roles() self.users = self.get_users() + def get_roles(self, role_name: str = None): + """ + Gets all users or filters on one of the options. + :param role_name: str: Role Name to filter + :return: List of roles + """ + payload = { + "columns": [ + "id", + "name", + "description", + "type", + "isAdmin", + "isSystem", + ] + } + if role_name: + payload["filters"] = {"name": ["ieq", role_name]} + return [Role(**role) for role in self.client._ipf_pager("tables/roles", payload)] + + @property + def roles_by_id(self): + return {r.role_id: r for r in self.roles} + def get_users(self, username: str = None): """ Gets all users or filters on one of the options. @@ -40,12 +74,11 @@ def get_users(self, username: str = None): "ldapId", "domainSuffixes", "email", - "customScope", - "scope", + "roleNames", + "roleIds", + "timezone", ] } - if int(self.client.version) >= 4.2: - payload["columns"].append("timezone") if username: payload["filters"] = {"username": ["ieq", username]} users = self.client._ipf_pager("tables/users", payload) @@ -59,14 +92,15 @@ def get_user_by_id(self, user_id: str): """ resp = self.client.get("users/" + str(user_id)) resp.raise_for_status() - return User(**resp.json()) + user = resp.json() + return User(roleNames=[self.roles_by_id[r].name for r in user["roleIds"]], **user) def add_user( self, username: str, email: str, password: str, - scope: list, + roles: list, timezone: str = "UTC", ): """ @@ -74,30 +108,29 @@ def add_user( :param username: str: Username :param email: str: Email :param password: str: Must be 8 characters - :param scope: list: Accepted values: ['read', 'write', 'settings', 'team'] + :param roles: list: Role IDs for Users :param timezone: str: v4.2 and above, Defaults UTC. See pytz.all_timezones for correct syntax :return: User """ if len(password) < 8: raise SyntaxError("Password must be 8 characters.") - if not all(x in ["read", "write", "settings", "team"] for x in scope): - raise SyntaxError("Only accepted scopes are ['read', 'write', 'settings', 'team']") + if not all(x in [r.role_id for r in self.roles] for x in roles): + raise SyntaxError(f"Only accepted scopes are {[r.role_id for r in self.roles]}") payload = { "username": username, "email": email, "password": password, - "scope": scope, + "roleIds": roles, } - if int(self.client.version) >= 4.2: - if timezone not in pytz.all_timezones: - raise ValueError( - f"Timezone {timezone} is not located. This is case sensitive please see pytz.all_timezones." - ) - payload["timezone"] = timezone + if timezone not in pytz.all_timezones: + raise ValueError( + f"Timezone {timezone} is not located. This is case sensitive please see pytz.all_timezones." + ) + payload["timezone"] = timezone resp = self.client.post("users", json=payload) resp.raise_for_status() - user_id = resp.json()["id"] - return self.get_user_by_id(user_id) + user = resp.json() + return User(roleNames=[self.roles_by_id[r].name for r in user["roleIds"]], **user) def delete_user(self, user_id: str): """ diff --git a/tests/unittests/settings/test_users.py b/tests/unittests/settings/test_users.py index d136aa8..e7e653d 100644 --- a/tests/unittests/settings/test_users.py +++ b/tests/unittests/settings/test_users.py @@ -4,10 +4,9 @@ from ipfabric.settings import user_mgmt -class TestUsers(unittest.TestCase): - def setUp(self) -> None: - mock = MagicMock() - mock._ipf_pager.return_value = [ +def mock_pager(*args): + if args[0] == "tables/users": + return [ { "id": "1108612054", "isLocal": True, @@ -15,20 +14,19 @@ def setUp(self) -> None: "ssoProvider": None, "domainSuffixes": "", "email": "justin.jeffrey@ipfabric.io", - "customScope": True, - "scope": ["read", "write", "settings"], + "roleNames": ["admin"], + "timezone": "UTC" } ] - mock.get().json.return_value = { - "id": "1108612054", - "isLocal": True, - "username": "justin", - "ssoProvider": None, - "domainSuffixes": "", - "email": "justin.jeffrey@ipfabric.io", - "customScope": True, - "scope": ["read", "write", "settings"], - } + else: + return [{"id": "admin", "name": "admin", "description": "Administrator", "type": "Admin", "isAdmin": True, + "isSystem": True}] + + +class TestUsers(unittest.TestCase): + def setUp(self) -> None: + mock = MagicMock() + mock._ipf_pager.side_effect = mock_pager self.usermgmt = user_mgmt.UserMgmt(mock) def test_users(self): @@ -40,12 +38,31 @@ def test_get_users(self): self.assertIsInstance(u, user_mgmt.User) def test_get_user_by_id(self): + self.usermgmt.client.get().json.return_value = { + "id": "1108612054", + "isLocal": True, + "username": "justin", + "ssoProvider": None, + "domainSuffixes": "", + "email": "justin.jeffrey@ipfabric.io", + "roleIds": ["admin"], + "timezone": "UTC" + } u = self.usermgmt.get_user_by_id("1108612054") self.assertIsInstance(u, user_mgmt.User) def test_add_user(self): - self.usermgmt.client.post().json.return_value = {"id": 1} - u = self.usermgmt.add_user("test", "test", "test1234", ["read"]) + self.usermgmt.client.post().json.return_value = { + "id": "1108612054", + "isLocal": True, + "username": "justin", + "ssoProvider": None, + "domainSuffixes": "", + "email": "justin.jeffrey@ipfabric.io", + "roleIds": ["admin"], + "timezone": "UTC" + } + u = self.usermgmt.add_user("test", "test", "test1234", ["admin"]) self.assertIsInstance(u, user_mgmt.User) def test_add_user_fail(self):