Skip to content
This repository has been archived by the owner on Feb 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #129 from justinjeffery-ipf/ipf-v5.0
Browse files Browse the repository at this point in the history
Ipf v5.0
  • Loading branch information
Justin Jeffery authored Jul 1, 2022
2 parents 19d1534 + 83e58a3 commit e7da978
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 55 deletions.
26 changes: 11 additions & 15 deletions examples/settings/user_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,35 @@

print(usermgmt.users[0])
"""
username='justin' scope=['read', 'write', 'settings'] email='justin.jeffrey@ipfabric.io' user_id='1108612054'
local=True sso_provider=None domains='' custom_scope=True ldap_id=None
username='test' email='test@test.ipfabric.io' user_id='50632669251' local=True sso_provider=None domains=''
role_names=['admin'] role_ids=['admin'] ldap_id=None timezone='UTC'
"""
print()

user = usermgmt.add_user(username='Test', email='test@ipfabric.io', password='8characters',
scope=['read', 'write', 'settings', 'team'])
roles=[usermgmt.roles[0].role_id])
print(user)
"""
username='Test' scope=['read', 'write', 'settings', 'team'] email='test@ipfabric.io' user_id='1168572704'
local=None sso_provider=None domains=None custom_scope=True ldap_id=None
username='Test' email='test@ipfabric.io' user_id='52914335501' local=True sso_provider=None domains=None
role_names=['Network engineer'] role_ids=['Network engineer'] ldap_id=None timezone='UTC'
"""
print()

print(usermgmt.get_user_by_id(user_id=user.user_id))
"""
username='Test' scope=['read', 'write', 'settings', 'team'] email='test@ipfabric.io' user_id='1168572704'
local=None sso_provider=None domains=None custom_scope=True ldap_id=None
username='Test' email='test@ipfabric.io' user_id='52914335501' local=True sso_provider=None domains=None
role_names=['Network engineer'] role_ids=['Network engineer'] ldap_id=None timezone='UTC'
"""
print()

print(usermgmt.get_users(username=user.username))
"""
[User(username='Test', scope=['read', 'write', 'settings', 'team'], email='test@ipfabric.io',
user_id='1168572704', local=True, sso_provider=None, domains='', custom_scope=True, ldap_id=None)]
[User(username='Test', email='test@ipfabric.io', user_id='52914337160', local=True, sso_provider=None, domains='',
role_names=['Network engineer'], role_ids=['Network engineer'], ldap_id=None, timezone='UTC')]
"""
print()

print(usermgmt.delete_user(user_id=user.user_id))
"""
[User(username='justin', scope=['read', 'write', 'settings'], email='justin.jeffrey@ipfabric.io',
user_id='1108612054', local=True, sso_provider=None, domains='', custom_scope=True, ldap_id=None),
User(username='vector', scope=['read', 'write', 'settings'], email='vector@vector.pl', user_id='1083776225',
local=True, sso_provider=None, domains='', custom_scope=True, ldap_id=None), ]
"""

print()
77 changes: 55 additions & 22 deletions ipfabric/settings/user_mgmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,56 @@

class User(BaseModel):
username: str
scope: list
email: str
email: Optional[str]
user_id: str = Field(alias="id")
local: Optional[bool] = Field(alias="isLocal")
sso_provider: Optional[Any] = Field(alias="ssoProvider")
domains: Optional[Any] = Field(alias="domainSuffixes")
custom_scope: bool = Field(alias="customScope")
role_names: Optional[list] = Field(alias="roleNames", default_factory=list)
role_ids: list = Field(alias="roleNames")
ldap_id: Any = Field(alias="ldapId")
timezone: Optional[str] = None
timezone: str


class Role(BaseModel):
role_id: str = Field(alias="id")
name: str
description: Optional[str]
role_type: str = Field(alias="type")
admin: bool = Field(alias="isAdmin")
system: bool = Field(alias="isSystem")


class UserMgmt:
def __init__(self, client):
self.client: Any = client
self.roles = self.get_roles()
self.users = self.get_users()

def get_roles(self, role_name: str = None):
"""
Gets all users or filters on one of the options.
:param role_name: str: Role Name to filter
:return: List of roles
"""
payload = {
"columns": [
"id",
"name",
"description",
"type",
"isAdmin",
"isSystem",
]
}
if role_name:
payload["filters"] = {"name": ["ieq", role_name]}
return [Role(**role) for role in self.client._ipf_pager("tables/roles", payload)]

@property
def roles_by_id(self):
return {r.role_id: r for r in self.roles}

def get_users(self, username: str = None):
"""
Gets all users or filters on one of the options.
Expand All @@ -40,12 +74,11 @@ def get_users(self, username: str = None):
"ldapId",
"domainSuffixes",
"email",
"customScope",
"scope",
"roleNames",
"roleIds",
"timezone",
]
}
if int(self.client.version) >= 4.2:
payload["columns"].append("timezone")
if username:
payload["filters"] = {"username": ["ieq", username]}
users = self.client._ipf_pager("tables/users", payload)
Expand All @@ -59,45 +92,45 @@ def get_user_by_id(self, user_id: str):
"""
resp = self.client.get("users/" + str(user_id))
resp.raise_for_status()
return User(**resp.json())
user = resp.json()
return User(roleNames=[self.roles_by_id[r].name for r in user["roleIds"]], **user)

def add_user(
self,
username: str,
email: str,
password: str,
scope: list,
roles: list,
timezone: str = "UTC",
):
"""
Adds a user
:param username: str: Username
:param email: str: Email
:param password: str: Must be 8 characters
:param scope: list: Accepted values: ['read', 'write', 'settings', 'team']
:param roles: list: Role IDs for Users
:param timezone: str: v4.2 and above, Defaults UTC. See pytz.all_timezones for correct syntax
:return: User
"""
if len(password) < 8:
raise SyntaxError("Password must be 8 characters.")
if not all(x in ["read", "write", "settings", "team"] for x in scope):
raise SyntaxError("Only accepted scopes are ['read', 'write', 'settings', 'team']")
if not all(x in [r.role_id for r in self.roles] for x in roles):
raise SyntaxError(f"Only accepted scopes are {[r.role_id for r in self.roles]}")
payload = {
"username": username,
"email": email,
"password": password,
"scope": scope,
"roleIds": roles,
}
if int(self.client.version) >= 4.2:
if timezone not in pytz.all_timezones:
raise ValueError(
f"Timezone {timezone} is not located. This is case sensitive please see pytz.all_timezones."
)
payload["timezone"] = timezone
if timezone not in pytz.all_timezones:
raise ValueError(
f"Timezone {timezone} is not located. This is case sensitive please see pytz.all_timezones."
)
payload["timezone"] = timezone
resp = self.client.post("users", json=payload)
resp.raise_for_status()
user_id = resp.json()["id"]
return self.get_user_by_id(user_id)
user = resp.json()
return User(roleNames=[self.roles_by_id[r].name for r in user["roleIds"]], **user)

def delete_user(self, user_id: str):
"""
Expand Down
53 changes: 35 additions & 18 deletions tests/unittests/settings/test_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,29 @@
from ipfabric.settings import user_mgmt


class TestUsers(unittest.TestCase):
def setUp(self) -> None:
mock = MagicMock()
mock._ipf_pager.return_value = [
def mock_pager(*args):
if args[0] == "tables/users":
return [
{
"id": "1108612054",
"isLocal": True,
"username": "justin",
"ssoProvider": None,
"domainSuffixes": "",
"email": "justin.jeffrey@ipfabric.io",
"customScope": True,
"scope": ["read", "write", "settings"],
"roleNames": ["admin"],
"timezone": "UTC"
}
]
mock.get().json.return_value = {
"id": "1108612054",
"isLocal": True,
"username": "justin",
"ssoProvider": None,
"domainSuffixes": "",
"email": "justin.jeffrey@ipfabric.io",
"customScope": True,
"scope": ["read", "write", "settings"],
}
else:
return [{"id": "admin", "name": "admin", "description": "Administrator", "type": "Admin", "isAdmin": True,
"isSystem": True}]


class TestUsers(unittest.TestCase):
def setUp(self) -> None:
mock = MagicMock()
mock._ipf_pager.side_effect = mock_pager
self.usermgmt = user_mgmt.UserMgmt(mock)

def test_users(self):
Expand All @@ -40,12 +38,31 @@ def test_get_users(self):
self.assertIsInstance(u, user_mgmt.User)

def test_get_user_by_id(self):
self.usermgmt.client.get().json.return_value = {
"id": "1108612054",
"isLocal": True,
"username": "justin",
"ssoProvider": None,
"domainSuffixes": "",
"email": "justin.jeffrey@ipfabric.io",
"roleIds": ["admin"],
"timezone": "UTC"
}
u = self.usermgmt.get_user_by_id("1108612054")
self.assertIsInstance(u, user_mgmt.User)

def test_add_user(self):
self.usermgmt.client.post().json.return_value = {"id": 1}
u = self.usermgmt.add_user("test", "test", "test1234", ["read"])
self.usermgmt.client.post().json.return_value = {
"id": "1108612054",
"isLocal": True,
"username": "justin",
"ssoProvider": None,
"domainSuffixes": "",
"email": "justin.jeffrey@ipfabric.io",
"roleIds": ["admin"],
"timezone": "UTC"
}
u = self.usermgmt.add_user("test", "test", "test1234", ["admin"])
self.assertIsInstance(u, user_mgmt.User)

def test_add_user_fail(self):
Expand Down

0 comments on commit e7da978

Please sign in to comment.