Compare commits

..

60 Commits
1.6.0 ... 1.7.0

Author SHA1 Message Date
bc85c532dd build: bump version 1.6.0 -> 1.7.0 2025-01-04 10:53:44 +00:00
Jon
660eafe598 Merge pull request #452 from nofusscomputing/feature-next-release 2025-01-04 20:04:41 +09:30
Jon
2543047331 Merge pull request #459 from nofusscomputing/refactor-inventory 2025-01-04 19:51:37 +09:30
Jon
4ebff09671 fix(api): Ensure ALL required classes for viewset are inherited
ref: #459
2025-01-04 19:14:25 +09:30
Jon
e7601e311a test(access): Skip test case for appsettings different organization due to model not being tenancy model.
ref: #448 #459
2025-01-04 18:26:12 +09:30
Jon
1087dde2d5 test(access): Ensure items returned from query are from user organization and/or globally set organization
ref: #459 closes #448
2025-01-04 18:04:02 +09:30
Jon
dd72843ffb feat(access): Enable Objects from global organization to be viewable by user with the permission
ref: #448 #459
2025-01-03 12:14:26 +09:30
Jon
0d5f329146 feat(access): Enable Objects from globally set organization to return within query
ref: #448 #459
2025-01-03 12:00:46 +09:30
Jon
4b2a89c992 chore: test cleanup
ref: #459 closes #437
2025-01-03 10:09:38 +09:30
Jon
f218d8e2fa docs(roadmap): added release management
ref: #459
2025-01-03 10:07:50 +09:30
Jon
a4a9f2c3a9 feat(access): Enable the calling of the dynamic permissions function to obtain permissions
creates recursive loop

ref: #437 #459
2025-01-02 17:01:01 +09:30
Jon
e69c1e1b99 fix(itam): Dont query parent class for permissions
creates recursive loop

ref: #437 #459
2025-01-02 17:00:14 +09:30
Jon
0b362f04ee test(itam): API v2 Inventory Permission Check skip diff org
ref: #437 #459 #461
2025-01-02 16:57:30 +09:30
Jon
d1dc330744 test(itam): API v2 Inventory Permission Checks
ref: #437 #459
2025-01-02 16:54:55 +09:30
Jon
ed8f8ae411 test: mv inventory test to itam app
ref: #437 #459
2025-01-02 13:58:06 +09:30
Jon
d33cf96db3 fix(core): If no org specified serializer fetch, dont attempt to access
ref: #459
2025-01-01 18:33:30 +09:30
Jon
f7b444b8e4 fix(access): If no org specified during permission check, rtn false for permission
ref: #437 #459
2025-01-01 18:25:44 +09:30
Jon
1fd433e621 feat(itam): Cater for RabbitMQ errors when uploading inventory
ref: #437 #459
2025-01-01 17:31:39 +09:30
Jon
df037e59c9 fix(itam): return serializer for inventory endpoint
ref: #437 #459
2025-01-01 17:31:06 +09:30
Jon
ddbce0c0ce feat(itam): On Inventory upload validate existing device
ref: #437 #459
2024-12-31 14:10:35 +09:30
Jon
3c120291d2 refactor(itam): Device UUID field requires no default
ref: #437 #459
2024-12-31 13:29:37 +09:30
Jon
d6eea69c52 refactor(itam): mv inventory task to itam app
ref: #437
2024-12-31 11:32:18 +09:30
Jon
b32346d3f0 Merge pull request #456 from nofusscomputing/refactor-permission_checking 2024-12-28 21:54:48 +09:30
Jon
845a5fb473 test(access): Test Cases for Organization Permission Mixin
ref: #456 # closes #442
2024-12-28 21:37:01 +09:30
Jon
14b7c6d55b fix(api): base index must inherit from IndexViewset
ref: #442 #456
2024-12-28 19:53:56 +09:30
Jon
406fd1bb01 fix(core): Dont attempt to access the object if it doesn't exist when fetching ticket permissions
ref: #442 #456
2024-12-28 18:32:25 +09:30
Jon
4656617583 fix(access): Cached list objects must be a list including an empty one as required
ref: #442 #456
2024-12-28 18:31:53 +09:30
Jon
08b113b1ba feat(access): During permission checking also capture Http404
ref: #442 #456
2024-12-28 18:06:22 +09:30
Jon
a07dee370c refactor(access): Use exceptions for permission flow as required
ref: #442 #456
2024-12-28 17:33:11 +09:30
Jon
fbaf8770df feat(access): Super User to be granted permission
ref: #442 #456
2024-12-28 16:19:35 +09:30
Jon
e96916768e feat(access): Cache the permission required during permission checking
ref: #442 #456
2024-12-28 15:47:12 +09:30
Jon
8e1cf2401a fix(core): when gather ticket permissions, use getter as object may not exist
ref: #442 #456
2024-12-28 15:45:47 +09:30
Jon
580abaefa6 fix(core): action metadata to use view permission for tickets
ref: #442 #456
2024-12-28 15:11:50 +09:30
Jon
193c6c3b7f feat(api): Add IndexViewset to ViewSet mixin
ref: #442 #456
2024-12-27 23:03:39 +09:30
Jon
62fcb5aa01 test(api): Adjust test case for metadata visibility
view user only

ref: #442 #456
2024-12-27 22:28:12 +09:30
Jon
1c87eeb188 feat(access): If the user lacks the permission during permission checks, return sooner
ref: #442 #456
2024-12-27 18:51:10 +09:30
Jon
17e437ce68 fix(access): Use request.method for determining the HTTP/Method for permission checks
ref: #442 #456
2024-12-27 18:50:09 +09:30
Jon
7c62643c6c feat(access): Enforce view action and HTTP/Method match for permission checks
ref: #442 #456
2024-12-27 18:26:58 +09:30
Jon
f211f022a0 Merge pull request #454 from nofusscomputing/refactor-permission-checking 2024-12-26 21:55:21 +09:30
Jon
dc553317de refactor(api): dedup code within viewset mixin
ref: #442 #454
2024-12-26 20:52:27 +09:30
Jon
aee6ccfb7a test(core): remove different org testcase from history checks
this model is not a tenancy model yet. See #455 for details

ref: #442 #454 #455
2024-12-26 16:10:21 +09:30
Jon
10becacbf7 fix(access): Add HTTP/Method=DELETE as valid option for object delete/destroy.
ref: #442 #454
2024-12-26 15:31:02 +09:30
Jon
4545b3d721 test(core): When testing if history access is possible for user with perms, correct status is HTTP/200
ref: #442 #454
2024-12-26 14:12:07 +09:30
Jon
534186a7f9 fix(access): Ensure Object permission are checked when an object is having an action performed against it.
ref: #442 #454
2024-12-26 14:11:05 +09:30
Jon
4be1e97cbe refactor(access): Object permission checking moved to has_object_permission function
ref: #442 #454
2024-12-26 00:49:40 +09:30
Jon
f2181b018d refactor(access): move ability to get required permissions from permissions mixin to organization mixin
ref: #442 #454
2024-12-26 00:33:21 +09:30
Jon
d39f9ad463 refactor(core): move ticket linked item to dynamic parent model
now possible due to org mixins re-write

ref: #442 #454
2024-12-25 21:00:11 +09:30
Jon
6ff3d83222 refactor(api): Use new re-writen Mixins for Tenancy and Permission checks
ref: #442 #454
2024-12-25 20:59:14 +09:30
Jon
96ff5bd839 refactor(access): Organization Permission Mixin now caters for API ONLY
ref: #442 #454
2024-12-25 20:57:45 +09:30
Jon
d61929adaa refactor(access): Organization Mixin now caters for API ONLY
ref: #442 #454
2024-12-25 20:57:31 +09:30
Jon
04d1795a1b fix(core): History View is a read-only view
ref: #442 #454
2024-12-25 19:05:48 +09:30
Jon
7ced4cf524 fix(core): Permissions for Related ticket to be derived from ticket org
ref: #442 #454
2024-12-25 19:05:19 +09:30
Jon
bc1600e07b fix(access): Team User permission organiztion is team org
ref: #442 #454
2024-12-25 19:04:19 +09:30
Jon
2c715d69fa test(access): When adding org, test case must use non-super user
ref: #442 #454
2024-12-25 19:02:04 +09:30
Jon
595209709b test(itim): Ensure external_links are returned as part of _urls
ref: #450 #452
2024-12-24 16:44:13 +09:30
Jon
0db83614a7 feat(itim): External Links to display on cluster details page
ref: #450 #452
2024-12-24 16:40:15 +09:30
Jon
b13bfdb47d test(itim): Add API v2 permission checks for cluster services
ref: #452 fixes #451
2024-12-24 16:12:19 +09:30
Jon
0b6ec7bba8 test(itim): Add API v2 permission checks for device services
ref: #451 #452
2024-12-24 16:11:43 +09:30
Jon
1fcab6f245 feat(api): Add API v2 Endpoint for cluster services
ref: #451 #452
2024-12-24 16:10:57 +09:30
Jon
384b0e1d10 feat(api): distinguish between read-only and authenticateed user permissions
ref: #451 #452
2024-12-24 16:08:22 +09:30
108 changed files with 10370 additions and 689 deletions

View File

@ -17,5 +17,5 @@ commitizen:
prerelease_offset: 1
tag_format: $version
update_changelog_on_bump: false
version: 1.6.0
version: 1.7.0
version_scheme: semver

View File

@ -8,7 +8,8 @@
// "-v",
// "--cov",
// "--cov-report xml",
"app"
"-s",
"app",
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,

View File

@ -1,3 +1,70 @@
## 1.7.0 (2025-01-04)
### feat
- **access**: Enable Objects from global organization to be viewable by user with the permission
- **access**: Enable Objects from globally set organization to return within query
- **access**: Enable the calling of the dynamic permissions function to obtain permissions
- **itam**: Cater for RabbitMQ errors when uploading inventory
- **itam**: On Inventory upload validate existing device
- **access**: During permission checking also capture Http404
- **access**: Super User to be granted permission
- **access**: Cache the permission required during permission checking
- **api**: Add `IndexViewset` to ViewSet mixin
- **access**: If the user lacks the permission during permission checks, return sooner
- **access**: Enforce view action and HTTP/Method match for permission checks
- **itim**: External Links to display on cluster details page
- **api**: Add API v2 Endpoint for cluster services
- **api**: distinguish between read-only and authenticateed user permissions
### Fixes
- **api**: Ensure ALL required classes for viewset are inherited
- **itam**: Dont query parent class for permissions
- **core**: If no org specified serializer fetch, dont attempt to access
- **access**: If no org specified during permission check, rtn false for permission
- **itam**: return serializer for inventory endpoint
- **api**: base index must inherit from IndexViewset
- **core**: Dont attempt to access the object if it doesn't exist when fetching ticket permissions
- **access**: Cached list objects must be a list including an empty one as required
- **core**: when gather ticket permissions, use getter as object may not exist
- **core**: action metadata to use view permission for tickets
- **access**: Use request.method for determining the HTTP/Method for permission checks
- **access**: Add HTTP/Method=DELETE as valid option for object delete/destroy.
- **access**: Ensure Object permission are checked when an object is having an action performed against it.
- **core**: History View is a read-only view
- **core**: Permissions for Related ticket to be derived from ticket org
- **access**: Team User permission organiztion is team org
### Refactoring
- **itam**: Device UUID field requires no default
- **itam**: mv inventory task to itam app
- **access**: Use exceptions for permission flow as required
- **api**: dedup code within viewset mixin
- **access**: Object permission checking moved to `has_object_permission` function
- **access**: move ability to get required permissions from permissions mixin to organization mixin
- **core**: move ticket linked item to dynamic parent model
- **api**: Use new re-writen Mixins for Tenancy and Permission checks
- **access**: Organization Permission Mixin now caters for API ONLY
- **access**: Organization Mixin now caters for API ONLY
### Tests
- **access**: Skip test case for appsettings different organization due to model not being tenancy model.
- **access**: Ensure items returned from query are from user organization and/or globally set organization
- **itam**: API v2 Inventory Permission Check skip diff org
- **itam**: API v2 Inventory Permission Checks
- mv inventory test to itam app
- **access**: Test Cases for Organization Permission Mixin
- **api**: Adjust test case for metadata visibility
- **core**: remove different org testcase from history checks
- **core**: When testing if history access is possible for user with perms, correct status is HTTP/200
- **access**: When adding org, test case must use non-super user
- **itim**: Ensure external_links are returned as part of _urls
- **itim**: Add API v2 permission checks for cluster services
- **itim**: Add API v2 permission checks for device services
## 1.6.0 (2024-12-23)
### feat

View File

@ -0,0 +1,410 @@
from django.contrib.auth.models import User, Group
from access.models import Organization, Team
class OrganizationMixin:
"""Organization Tenancy Mixin
This class is intended to be included in **ALL** View / Viewset classes as
it contains the functions/methods required to conduct the permission
checking.
"""
_obj_organization: int = None
"""Cached Object Organization"""
def get_obj_organization(self, obj = None, request = None) -> Organization:
"""Fetch the objects Organization
Args:
obj (Model): Model of object
Raises:
ValueError: When `obj` and `request` are both missing
Returns:
Organization: Organization the object is from
None: No Organization was found
"""
if obj is None and request is None:
raise ValueError('Missing Parameter. obj or request must be supplied')
if self._obj_organization:
return self._obj_organization
_obj_organization: Organization = None
if obj:
_obj_organization = getattr(obj, 'organization', None)
if not _obj_organization:
_obj_organization = getattr(obj, 'get_organization', lambda: None)()
elif request:
if getattr(request.stream, 'method', '') != 'DELETE':
data = getattr(request, 'data', None)
if data:
data_organization = self.kwargs.get('organization_id', None)
if not data_organization:
data_organization = request.data.get('organization_id', None)
if not data_organization:
data_organization = request.data.get('organization', None)
if data_organization:
_obj_organization = Organization.objects.get(
pk = int( data_organization )
)
if self.get_parent_model(): # if defined is to overwrite object organization
parent_obj = self.get_parent_obj()
_obj_organization = parent_obj.get_organization()
if _obj_organization:
self._obj_organization = _obj_organization
return self._obj_organization
def get_parent_model(self):
"""Get the Parent Model
This function exists so that dynamic parent models can be defined.
They are defined by overriding this method.
Returns:
Model: Parent Model
"""
return self.parent_model
def get_parent_obj(self):
""" Get the Parent Model Object
Use in views where the the model has no organization and the organization should be fetched from the parent model.
Requires attribute `parent_model` within the view with the value of the parent's model class
Returns:
parent_model (Model): with PK from kwargs['pk']
"""
return self.parent_model.objects.get(pk=self.kwargs[self.parent_model_pk_kwarg])
def get_permission_organizations(self, permission: str ) -> list([ int ]):
"""Return Organization(s) the permission belongs to
Searches the users organizations for the required permission, if found
the organization is added to the list to return.
Args:
permission (str): Permission to search users organizations for
Returns:
Organizations (list): All Organizations where the permission was found.
"""
_permission_organizations: list = []
for team in self.get_user_teams( self.request.user ):
for team_permission in team.permissions.all():
permission_value = str( team_permission.content_type.app_label + '.' + team_permission.codename )
if permission_value == permission:
_permission_organizations += [ team.organization.id ]
return _permission_organizations
_permission_required: str = None
"""Cached Permissions required"""
def get_permission_required(self) -> str:
""" Get / Generate Permission Required
If there is a requirement that there be custom/dynamic permissions,
this function can be safely overridden.
Raises:
ValueError: Unable to determin the view action
Returns:
str: Permission in format `<app_name>.<action>_<model_name>`
"""
if self._permission_required:
return self._permission_required
if hasattr(self, 'get_dynamic_permissions'):
self._permission_required = self.get_dynamic_permissions()
if type(self._permission_required) is list:
self._permission_required = self._permission_required[0]
return self._permission_required
view_action: str = None
if(
self.action == 'create'
or getattr(self.request._stream, 'method', '') == 'POST'
):
view_action = 'add'
elif (
self.action == 'partial_update'
or self.action == 'update'
or getattr(self.request._stream, 'method', '') == 'PATCH'
or getattr(self.request._stream, 'method', '') == 'PUT'
):
view_action = 'change'
elif(
self.action == 'destroy'
or getattr(self.request._stream, 'method', '') == 'DELETE'
):
view_action = 'delete'
elif (
self.action == 'list'
):
view_action = 'view'
elif self.action == 'retrieve':
view_action = 'view'
elif self.action == 'metadata':
view_action = 'view'
elif self.action is None:
return False
if view_action is None:
raise ValueError('view_action could not be defined.')
permission = self.model._meta.app_label + '.' + view_action + '_' + self.model._meta.model_name
permission_required = permission
self._permission_required = permission_required
return self._permission_required
parent_model: str = None
""" Parent Model
This attribute defines the parent model for the model in question. The parent model when defined
will be used as the object to obtain the permissions from.
"""
parent_model_pk_kwarg: str = 'pk'
"""Parent Model kwarg
This value is used to define the kwarg that is used as the parent objects primary key (pk).
"""
_user_organizations: list = []
"""Cached User Organizations"""
_user_teams: list = []
"""Cached User Teams"""
_user_permissions: list = []
"""Cached User User Permissions"""
def get_user_organizations(self, user: User) -> list([int]):
"""Get the Organization the user is a part of
Args:
user (User): User Making the request
Returns:
list(int()): List containing the organizations the user is a part of.
"""
if self._user_organizations and self._user_teams and self._user_permissions:
return self._user_organizations
teams = Team.objects.all()
_user_organizations: list([ int ]) = []
_user_teams: list([ Team ]) = []
_user_permissions: list([ str ]) = []
for group in user.groups.all():
team = teams.get(pk=group.id)
if team not in _user_teams:
_user_teams += [ team ]
for permission in team.permissions.all():
permission_value = str( permission.content_type.app_label + '.' + permission.codename )
if permission_value not in _user_permissions:
_user_permissions += [ permission_value ]
if team.organization.id not in _user_organizations:
_user_organizations += [ team.organization.id ]
if len(_user_organizations) > 0:
self._user_organizations = _user_organizations
if len(_user_teams) > 0:
self._user_teams = _user_teams
if len(_user_permissions) > 0:
self._user_permissions = _user_permissions
return self._user_organizations
def get_user_teams(self, user: User) -> list([ Team ]):
if not self._user_teams:
self.get_user_organizations( user = user )
return self._user_teams
def has_organization_permission(self, organization: int, permissions_required: list) -> bool:
""" Check if user has permission within organization.
Args:
organization (int): Organization to check.
permissions_required (list): if doing object level permissions, pass in required permission.
Returns:
bool: True for yes.
"""
has_permission: bool = False
if not organization:
return has_permission
from settings.models.app_settings import AppSettings
app_settings = AppSettings.objects.get(
owner_organization = None
)
for team in self.get_user_teams( user = self.request.user ):
if(
team.organization.id == int(organization)
or getattr(app_settings.global_organization, 'id', 0) == int(organization)
):
for permission in team.permissions.all():
assembled_permission = str(permission.content_type.app_label) + '.' + str( permission.codename )
if assembled_permission in permissions_required:
has_permission = True
return has_permission
def is_member(self, organization: int) -> bool:
"""Returns true if the current user is a member of the organization
iterates over the user_organizations list and returns true if the user is a member
Returns:
bool: _description_
"""
is_member: bool = False
if organization is None:
return False
if int(organization) in self.get_user_organizations(self.request.user):
is_member = True
return is_member

View File

@ -0,0 +1,306 @@
import traceback
from django.core.exceptions import ObjectDoesNotExist
from rest_framework import exceptions
from rest_framework.permissions import DjangoObjectPermissions
from access.models import TenancyObject
from core import exceptions as centurion_exceptions
class OrganizationPermissionMixin(
DjangoObjectPermissions,
):
"""Organization Permission Mixin
This class is to be used as the permission class for API `Views`/`ViewSets`.
In combination with the `OrganizationPermissionsMixin`, permission checking
will be done to ensure the user has the correct permissions to perform the
CRUD operation.
**Note:** If the user is not authenticated, they will be denied access
globally.
Permissions are broken down into two areas:
- `Tenancy` Objects
This object requires that the user have the correct permission and that
permission be assigned within the organiztion the object belongs to.
- `Non-Tenancy` Objects.
This object requires the the use have the correct permission assigned,
regardless of the organization the object is from. This includes objects
that have no organization.
"""
_is_tenancy_model: bool = None
def is_tenancy_model(self, view) -> bool:
"""Determin if the Model is a `Tenancy` Model
Will look at the model defined within the view unless a parent
model is found. If the latter is true, the parent_model will be used to
determin if the model is a `Tenancy` model
Args:
view (object): The View the HTTP request was mad to
Returns:
True (bool): Model is a Tenancy Model.
False (bool): Model is not a Tenancy model.
"""
if not self._is_tenancy_model:
if hasattr(view, 'model'):
self._is_tenancy_model = issubclass(view.model, TenancyObject)
if view.get_parent_model():
self._is_tenancy_model = issubclass(view.get_parent_model(), TenancyObject)
return self._is_tenancy_model
def has_permission(self, request, view):
""" Check if user has the required permission
Permission flow is as follows:
- Un-authenticated users. Access Denied
- Authenticated user whom make a request using wrong method. Access
Denied
- Authenticated user who is not in same organization as object. Access
Denied
- Authenticated user who is in same organization as object, however is
missing the correct permission. Access Denied
Depending upon user type, they will recieve different feedback. In order
they are:
- Non-authenticated users will **always** recieve HTTP/401
- Authenticated users who use an unsupported method, HTTP/405
- Authenticated users missing the correct permission recieve HTTP/403
Args:
request (object): The HTTP Request Object
view (_type_): The View/Viewset Object the request was made to
Raises:
PermissionDenied: User does not have the required permission.
NotAuthenticated: User is not logged into Centurion.
ValueError: Could not determin the view action.
Returns:
True (bool): User has the required permission.
False (bool): User does not have the required permission
"""
if request.user.is_anonymous:
raise centurion_exceptions.NotAuthenticated()
try:
view.get_user_organizations( request.user )
has_permission_required: bool = False
user_permissions = getattr(view, '_user_permissions', None)
permission_required = view.get_permission_required()
if permission_required and user_permissions:
# No permission_required couldnt get permissions
# No user_permissions, user missing the required permission
has_permission_required: bool = permission_required in user_permissions
if request.method not in view.allowed_methods:
raise centurion_exceptions.MethodNotAllowed(method = request.method)
elif not has_permission_required and not request.user.is_superuser:
raise centurion_exceptions.PermissionDenied()
obj_organization: Organization = view.get_obj_organization(
request = request
)
view_action: str = None
if(
view.action == 'create'
and request.method == 'POST'
):
view_action = 'add'
elif(
view.action == 'destroy'
and request.method == 'DELETE'
):
view_action = 'delete'
obj_organization: Organization = view.get_obj_organization(
obj = view.get_object()
)
elif (
view.action == 'list'
):
view_action = 'view'
elif (
view.action == 'partial_update'
and request.method == 'PATCH'
):
view_action = 'change'
obj_organization: Organization = view.get_obj_organization(
obj = view.get_object()
)
elif (
view.action == 'update'
and request.method == 'PUT'
):
view_action = 'change'
obj_organization: Organization = view.get_obj_organization(
obj = view.get_object()
)
elif(
view.action == 'retrieve'
and request.method == 'GET'
):
view_action = 'view'
obj_organization: Organization = view.get_obj_organization(
obj = view.get_object()
)
elif(
view.action == 'metadata'
and request.method == 'OPTIONS'
):
return True
if view_action is None:
raise ValueError('view_action could not be defined.')
if obj_organization is None or request.user.is_superuser:
return True
elif obj_organization is not None:
if view.has_organization_permission(
organization = obj_organization.id,
permissions_required = [ view.get_permission_required() ]
):
return True
except ValueError as e:
# ToDo: This exception could be used in traces as it provides
# information as to dodgy requests. This exception is raised
# when the method does not match the view action.
print(traceback.format_exc())
except centurion_exceptions.Http404 as e:
# This exception genrally means that the user is not in the same
# organization as the object as objects are filtered to users
# organizations ONLY.
pass
except centurion_exceptions.ObjectDoesNotExist as e:
# This exception genrally means that the user is not in the same
# organization as the object as objects are filtered to users
# organizations ONLY.
pass
except centurion_exceptions.PermissionDenied as e:
# This Exception will be raised after this function has returned
# False.
pass
return False
def has_object_permission(self, request, view, obj):
try:
if request.user.is_anonymous:
return False
object_organization: int = getattr(view.get_obj_organization( obj = obj ), 'id', None)
from settings.models.app_settings import AppSettings
app_settings = AppSettings.objects.get(
owner_organization = None
)
if object_organization:
if(
object_organization
in view.get_permission_organizations( view.get_permission_required() )
or request.user.is_superuser
or getattr(app_settings.global_organization, 'id', 0) == int(object_organization)
):
return True
elif not self.is_tenancy_model( view ) or request.user.is_superuser:
return True
except Exception as e:
print(traceback.format_exc())
return False

View File

@ -11,6 +11,7 @@ from core.middleware.get_request import get_request
from core.mixin.history_save import SaveHistory
class Organization(SaveHistory):
class Meta:
@ -178,6 +179,16 @@ class TenancyManager(models.Manager):
if request:
from settings.models.app_settings import AppSettings
app_settings = AppSettings.objects.get(
owner_organization = None
)
if app_settings.global_organization:
user_organizations += [ app_settings.global_organization.id ]
# user = request.user._wrapped if hasattr(request.user,'_wrapped') else request.user
user = request.user

View File

@ -198,6 +198,17 @@ class OrganizationPermissionsAPI(
TestCase
):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
def test_add_has_permission(self):
@ -217,7 +228,7 @@ class OrganizationPermissionsAPI(
url = reverse( self.app_namespace + ':' + self.url_name + '-list' )
client.force_login( self.super_add_user )
client.force_login( self.add_user )
response = client.post( url, data = self.add_data )
@ -271,6 +282,17 @@ class OrganizationPermissionsAPI(
assert len(response.data['results']) == 2
def test_add_different_organization_denied(self):
""" Check correct permission for add
This test is a duplicate of a test case with the same name.
Organizations are not tenancy models so this test does nothing of value
attempt to add as user from different organization
"""
pass
class OrganizationViewSet(
ViewSetBase,

View File

@ -191,7 +191,17 @@ class TeamPermissionsAPI(
TestCase,
):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -205,6 +205,21 @@ class TeamUserPermissionsAPI(
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
class TeamUserViewSet(
ViewSetBase,
SerializersTestCases,

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,8 @@ from django.contrib.auth.models import User
from django.shortcuts import reverse
from django.test import Client, TestCase
from rest_framework.permissions import IsAuthenticated
from access.models import Organization
from api.tests.abstract.viewsets import ViewSetCommon
@ -39,4 +41,18 @@ class AccessViewset(
client.force_login(self.view_user)
self.http_options_response_list = client.options(url)
self.http_options_response_list = client.options(url)
def test_view_attr_permission_classes_value(self):
"""Attribute Test
Attribute `permission_classes` must be metadata class `ReactUIMetadata`
"""
view_set = self.viewset()
assert view_set.permission_classes[0] is IsAuthenticated
assert len(view_set.permission_classes) == 1

View File

@ -3,12 +3,12 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from api.viewsets.common import CommonViewSet
from api.viewsets.common import IndexViewset
@extend_schema(exclude = True)
class Index(CommonViewSet):
class Index(IndexViewset):
allowed_methods: list = [
'GET',

View File

@ -145,6 +145,10 @@ class ViewSet( ModelViewSet ):
model = TeamUsers
parent_model = Team
parent_model_pk_kwarg = 'team_id'
documentation: str = ''
view_description = 'Users belonging to a single team'

View File

@ -113,6 +113,16 @@ class APIPermissionView:
url = reverse(self.app_namespace + ':' + self.url_name + '-list')
viewable_organizations = [
self.organization.id,
]
if getattr(self, 'global_organization', None): # Cater for above test that also has global org
viewable_organizations += [ self.global_organization.id ]
client.force_login(self.view_user)
response = client.get(url)
@ -120,14 +130,59 @@ class APIPermissionView:
for item in response.data['results']:
if int(item['organization']['id']) != self.organization.id:
if int(item['organization']['id']) not in viewable_organizations:
contains_different_org = True
print(f'Failed returned row was: {item}')
assert not contains_different_org
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
Items returned from the query Must be from the users organization and
global ONLY!
"""
client = Client()
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs=self.url_kwargs)
only_from_user_org: bool = True
viewable_organizations = [
self.organization.id,
self.global_organization.id
]
assert getattr(self.global_organization, 'id', False) # fail if no global org set
assert getattr(self.global_org_item, 'id', False) # fail if no global item set
client.force_login(self.view_user)
response = client.get(url)
assert len(response.data['results']) >= 2 # fail if only one item extist.
for row in response.data['results']:
if row['organization']['id'] not in viewable_organizations:
only_from_user_org = False
print(f'Users org: {self.organization.id}')
print(f'global org: {self.global_organization.id}')
print(f'Failed returned row was: {row}')
assert only_from_user_org
class APIPermissionAdd:

View File

@ -615,54 +615,7 @@ class MetaDataNavigationEntriesFunctional:
content_type='application/json'
)
no_menu_entry_found: bool = True
for nav_menu in response.data['navigation']:
if nav_menu['name'] == self.menu_id:
for menu_entry in nav_menu['pages']:
if menu_entry['name'] == self.menu_entry_id:
no_menu_entry_found = False
assert no_menu_entry_found
def test_navigation_no_empty_menu_add_user(self):
"""Test HTTP/Options Method Navigation Entry
Ensure that a user with add permission, does not
have any nave menu without pages
"""
client = Client()
client.force_login(self.add_user)
if getattr(self, 'url_kwargs', None):
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs = self.url_kwargs)
else:
url = reverse(self.app_namespace + ':' + self.url_name + '-list')
response = client.options(
url,
content_type='application/json'
)
no_empty_menu_found: bool = True
for nav_menu in response.data['navigation']:
if len(nav_menu['pages']) == 0:
no_empty_menu_found = False
assert no_empty_menu_found
assert response.status_code == 403
@ -689,54 +642,8 @@ class MetaDataNavigationEntriesFunctional:
content_type='application/json'
)
no_menu_entry_found: bool = True
for nav_menu in response.data['navigation']:
if nav_menu['name'] == self.menu_id:
for menu_entry in nav_menu['pages']:
if menu_entry['name'] == self.menu_entry_id:
no_menu_entry_found = False
assert no_menu_entry_found
def test_navigation_no_empty_menu_change_user(self):
"""Test HTTP/Options Method Navigation Entry
Ensure that a user with change permission, does not
have any nave menu without pages
"""
client = Client()
client.force_login(self.change_user)
if getattr(self, 'url_kwargs', None):
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs = self.url_kwargs)
else:
url = reverse(self.app_namespace + ':' + self.url_name + '-list')
response = client.options(
url,
content_type='application/json'
)
no_empty_menu_found: bool = True
for nav_menu in response.data['navigation']:
if len(nav_menu['pages']) == 0:
no_empty_menu_found = False
assert no_empty_menu_found
assert response.status_code == 403
@ -763,54 +670,8 @@ class MetaDataNavigationEntriesFunctional:
content_type='application/json'
)
no_menu_entry_found: bool = True
for nav_menu in response.data['navigation']:
if nav_menu['name'] == self.menu_id:
for menu_entry in nav_menu['pages']:
if menu_entry['name'] == self.menu_entry_id:
no_menu_entry_found = False
assert no_menu_entry_found
def test_navigation_no_empty_menu_delete_user(self):
"""Test HTTP/Options Method Navigation Entry
Ensure that a user with delete permission, does not
have any nave menu without pages
"""
client = Client()
client.force_login(self.delete_user)
if getattr(self, 'url_kwargs', None):
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs = self.url_kwargs)
else:
url = reverse(self.app_namespace + ':' + self.url_name + '-list')
response = client.options(
url,
content_type='application/json'
)
no_empty_menu_found: bool = True
for nav_menu in response.data['navigation']:
if len(nav_menu['pages']) == 0:
no_empty_menu_found = False
assert no_empty_menu_found
assert response.status_code == 403

View File

@ -1,5 +1,6 @@
from access.mixins.permissions import OrganizationPermissionMixin
from api.react_ui_metadata import ReactUIMetadata
from api.views.mixin import OrganizationPermissionAPI
@ -145,7 +146,7 @@ class AllViewSet:
view_set = self.viewset()
assert view_set.permission_classes[0] is OrganizationPermissionAPI
assert view_set.permission_classes[0] is OrganizationPermissionMixin
assert len(view_set.permission_classes) == 1

View File

@ -2,6 +2,8 @@ from django.contrib.auth.models import User
from django.shortcuts import reverse
from django.test import Client, TestCase
from rest_framework.permissions import IsAuthenticated
from access.models import Organization
from api.tests.abstract.viewsets import ViewSetCommon
@ -39,4 +41,19 @@ class HomeViewset(
client.force_login(self.view_user)
self.http_options_response_list = client.options(url)
self.http_options_response_list = client.options(url)
def test_view_attr_permission_classes_value(self):
"""Attribute Test
Attribute `permission_classes` must be metadata class `ReactUIMetadata`
"""
view_set = self.viewset()
assert view_set.permission_classes[0] is IsAuthenticated
assert len(view_set.permission_classes) == 1

View File

@ -73,7 +73,8 @@ from itim.viewsets import (
port as port_v2,
problem,
service as service_v2,
service_device as service_device_v2
service_cluster,
service_device as service_device_v2,
)
from project_management.viewsets import (
@ -153,6 +154,7 @@ router.register('itam/software/(?P<software_id>[0-9]+)/version', software_versio
router.register('itim', itim_v2.Index, basename='_api_v2_itim_home')
router.register('itim/ticket/change', change.ViewSet, basename='_api_v2_ticket_change')
router.register('itim/cluster', cluster_v2.ViewSet, basename='_api_v2_cluster')
router.register('itim/cluster/(?P<cluster_id>[0-9]+)/service', service_cluster.ViewSet, basename='_api_v2_service_cluster')
router.register('itim/cluster/(?P<cluster_id>[0-9]+)/notes', notes_v2.ViewSet, basename='_api_v2_cluster_notes')
router.register('itim/ticket/incident', incident.ViewSet, basename='_api_v2_ticket_incident')
router.register('itim/ticket/problem', problem.ViewSet, basename='_api_v2_ticket_problem')

View File

@ -14,11 +14,10 @@ from api.serializers.inventory import Inventory
from core.http.common import Http
from itam.models.device import Device
from itam.tasks.inventory import process_inventory
from settings.models.user_settings import UserSettings
from api.tasks import process_inventory
class InventoryPermissions(OrganizationPermissionAPI):

View File

@ -5,11 +5,279 @@ from rest_framework.exceptions import APIException
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from access.mixin import OrganizationMixin
from access.mixins.organization import OrganizationMixin
from access.mixins.permissions import OrganizationPermissionMixin
from api.auth import TokenScheme
from api.react_ui_metadata import ReactUIMetadata
from api.views.mixin import OrganizationPermissionAPI
class Create(
viewsets.mixins.CreateModelMixin
):
def create(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().create(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
class Destroy(
viewsets.mixins.DestroyModelMixin
):
def destroy(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().destroy(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
class List(
viewsets.mixins.ListModelMixin
):
def list(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().list(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
# class PartialUpdate:
class Retrieve(
viewsets.mixins.RetrieveModelMixin
):
def retrieve(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().retrieve(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
else:
ex = e.get_full_details()
response = Response(
data = {
'message': ex['message']
},
status = e.status_code
)
return response
class Update(
viewsets.mixins.UpdateModelMixin
):
def partial_update(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().partial_update(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
def update(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().update(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
@ -65,7 +333,7 @@ class CommonViewSet(
for detail view, Enables the UI can setup the page layout.
"""
permission_classes = [ OrganizationPermissionAPI ]
permission_classes = [ OrganizationPermissionMixin ]
"""Permission Class
_Mandatory_, Permission check class
@ -188,7 +456,7 @@ class CommonViewSet(
def get_view_name(self):
if hasattr(self, 'model'):
if getattr(self, 'model', None):
if self.detail:
@ -268,374 +536,87 @@ class ModelViewSetBase(
class ModelViewSet(
ModelViewSetBase,
Create,
Retrieve,
Update,
Destroy,
List,
viewsets.ModelViewSet,
):
def retrieve(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().retrieve(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
pass
class ModelCreateViewSet(
ModelViewSetBase,
viewsets.mixins.CreateModelMixin,
Create,
viewsets.GenericViewSet,
):
def create(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().create(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
pass
class ModelListRetrieveDeleteViewSet(
viewsets.mixins.ListModelMixin,
viewsets.mixins.RetrieveModelMixin,
viewsets.mixins.DestroyModelMixin,
ModelViewSetBase,
List,
Retrieve,
Destroy,
viewsets.GenericViewSet,
ModelViewSetBase
):
""" Use for models that you wish to delete and view ONLY!"""
def list(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().list(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
def retrieve(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().retrieve(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
def destroy(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().destroy(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
pass
class ModelRetrieveUpdateViewSet(
viewsets.mixins.RetrieveModelMixin,
viewsets.mixins.UpdateModelMixin,
ModelViewSetBase,
Retrieve,
Update,
viewsets.GenericViewSet,
ModelViewSetBase
):
""" Use for models that you wish to update and view ONLY!"""
def partial_update(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().partial_update(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
def update(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().update(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
pass
class ReadOnlyModelViewSet(
viewsets.ReadOnlyModelViewSet,
ModelViewSetBase
ModelViewSetBase,
Retrieve,
List,
viewsets.GenericViewSet,
):
pass
class AuthUserReadOnlyModelViewSet(
ReadOnlyModelViewSet
):
"""Authenticated User Read-Only Viewset
Use this class if the model only requires that the user be authenticated
to obtain view permission.
Args:
ReadOnlyModelViewSet (class): Read-Only base class
"""
permission_classes = [
IsAuthenticated,
]
def retrieve(self, request, *args, **kwargs):
"""Sainty override
class IndexViewset(
ModelViewSetBase,
):
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
permission_classes = [
IsAuthenticated,
]
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().retrieve(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response
def list(self, request, *args, **kwargs):
"""Sainty override
This function overrides the function of the same name
in the parent class for the purpose of ensuring a
non-api exception will not have the API return a HTTP
500 error.
This function is a sanity check that if it triggers,
(an exception occured), the user will be presented with
a stack trace that they will hopefully report as a bug.
HTTP status set to HTTP/501 so it's distinguishable from
a HTTP/500 which is generally a random error that has not
been planned for. i.e. uncaught exception
"""
response = None
try:
response = super().list(request = request, *args, **kwargs)
except Exception as e:
if not isinstance(e, APIException):
response = Response(
data = {
'server_error': str(e)
},
status = 501
)
return response

View File

@ -3,12 +3,12 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from api.viewsets.common import CommonViewSet
from api.viewsets.common import IndexViewset
@extend_schema(exclude = True)
class Index(CommonViewSet):
class Index(IndexViewset):
allowed_methods: list = [
'GET',

View File

@ -1,6 +1,6 @@
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse
from api.viewsets.common import ReadOnlyModelViewSet
from api.viewsets.common import AuthUserReadOnlyModelViewSet
from app.serializers.content_type import (
ContentType,
@ -26,7 +26,7 @@ from app.serializers.content_type import (
),
)
class ViewSet(
ReadOnlyModelViewSet
AuthUserReadOnlyModelViewSet
):

View File

@ -3,12 +3,12 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from api.viewsets.common import CommonViewSet
from api.viewsets.common import IndexViewset
@extend_schema(exclude = True)
class Index(CommonViewSet):
class Index(IndexViewset):
allowed_methods: list = [
'GET',

View File

@ -1,6 +1,6 @@
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse
from api.viewsets.common import ReadOnlyModelViewSet
from api.viewsets.common import AuthUserReadOnlyModelViewSet
from app.serializers.permission import (
Permission,
@ -26,7 +26,7 @@ from app.serializers.permission import (
),
)
class ViewSet(
ReadOnlyModelViewSet
AuthUserReadOnlyModelViewSet
):

View File

@ -1,6 +1,6 @@
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse
from api.viewsets.common import ReadOnlyModelViewSet
from api.viewsets.common import AuthUserReadOnlyModelViewSet
from app.serializers.user import (
User,
@ -28,7 +28,7 @@ from app.serializers.user import (
),
)
class ViewSet(
ReadOnlyModelViewSet
AuthUserReadOnlyModelViewSet
):

View File

@ -16,6 +16,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from assistance.models.knowledge_base import KnowledgeBase
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -50,6 +53,31 @@ class ViewSetBase:
self.different_organization = different_organization
self.view_user = User.objects.create_user(username="test_user_view", password="password")
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
title = 'one',
content = 'some text for bodygfdgdf',
target_user = self.view_user
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
self.url_kwargs = {}
@ -124,7 +152,6 @@ class ViewSetBase:
self.no_permissions_user = User.objects.create_user(username="test_no_permissions", password="password")
self.view_user = User.objects.create_user(username="test_user_view", password="password")
self.view_user_b = User.objects.create_user(username="test_user_view_b", password="password")
teamuser = TeamUsers.objects.create(
team = view_team,

View File

@ -12,6 +12,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from assistance.models.knowledge_base import KnowledgeBaseCategory
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,8 +49,29 @@ class ViewSetBase:
self.different_organization = different_organization
self.view_user = User.objects.create_user(username="test_user_view", password="password")
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'onesdsad',
target_user = self.view_user
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
# self.url_kwargs = {}
view_permissions = Permission.objects.get(
@ -120,7 +144,6 @@ class ViewSetBase:
self.no_permissions_user = User.objects.create_user(username="test_no_permissions", password="password")
self.view_user = User.objects.create_user(username="test_user_view", password="password")
self.view_user_b = User.objects.create_user(username="test_user_view_b", password="password")
teamuser = TeamUsers.objects.create(
team = view_team,

View File

@ -226,6 +226,18 @@ class ModelKnowledgeBaseArticlePermissionsAPI(
):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
@pytest.mark.skip( reason = 'not required' )
def test_delete_permission_change_denied(self):
"""This model does not have a change user"""

View File

@ -2,6 +2,8 @@ from django.contrib.auth.models import User
from django.shortcuts import reverse
from django.test import Client, TestCase
from rest_framework.permissions import IsAuthenticated
from access.models import Organization
from api.tests.abstract.viewsets import ViewSetCommon
@ -39,4 +41,18 @@ class AssistanceViewset(
client.force_login(self.view_user)
self.http_options_response_list = client.options(url)
self.http_options_response_list = client.options(url)
def test_view_attr_permission_classes_value(self):
"""Attribute Test
Attribute `permission_classes` must be metadata class `ReactUIMetadata`
"""
view_set = self.viewset()
assert view_set.permission_classes[0] is IsAuthenticated
assert len(view_set.permission_classes) == 1

View File

@ -3,12 +3,12 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from api.viewsets.common import CommonViewSet
from api.viewsets.common import IndexViewset
@extend_schema(exclude = True)
class Index(CommonViewSet):
class Index(IndexViewset):
allowed_methods: list = [
'GET',

View File

@ -60,3 +60,16 @@ class NotePermissionsAPI(
self.url_view_kwargs = {'config_group_id': self.note_item.id, 'pk': self.item.pk }
self.add_data = {'note': 'a note added', 'organization': self.organization.id}
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -16,6 +16,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from config_management.models.groups import ConfigGroups
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -51,7 +54,27 @@ class ViewSetBase:
# self.url_kwargs = {}
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(

View File

@ -237,7 +237,18 @@ class ConfigGroupSoftwarePermissionsAPI(
TestCase,
):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -2,6 +2,8 @@ from django.contrib.auth.models import User
from django.shortcuts import reverse
from django.test import Client, TestCase
from rest_framework.permissions import IsAuthenticated
from access.models import Organization
from api.tests.abstract.viewsets import ViewSetCommon
@ -39,4 +41,18 @@ class ConfigManagementViewset(
client.force_login(self.view_user)
self.http_options_response_list = client.options(url)
self.http_options_response_list = client.options(url)
def test_view_attr_permission_classes_value(self):
"""Attribute Test
Attribute `permission_classes` must be metadata class `ReactUIMetadata`
"""
view_set = self.viewset()
assert view_set.permission_classes[0] is IsAuthenticated
assert len(view_set.permission_classes) == 1

View File

@ -3,12 +3,12 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from api.viewsets.common import CommonViewSet
from api.viewsets.common import IndexViewset
@extend_schema(exclude = True)
class Index(CommonViewSet):
class Index(IndexViewset):
allowed_methods: list = [
'GET',

View File

@ -1,6 +1,13 @@
from django.core.exceptions import (
ObjectDoesNotExist
)
from django.http import Http404
from rest_framework import exceptions, status
from rest_framework.exceptions import (
MethodNotAllowed,
NotAuthenticated,
ParseError,
PermissionDenied,
ValidationError,

View File

@ -303,6 +303,19 @@ class TicketViewSetBase:
class TicketViewSetPermissionsAPI( TicketViewSetBase, APIPermissions ):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
def test_add_triage_user_denied(self):
""" Check correct permission for add

View File

@ -16,6 +16,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from core.models.manufacturer import Manufacturer
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -51,7 +54,28 @@ class ViewSetBase:
# self.url_kwargs = {}
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(

View File

@ -215,6 +215,19 @@ class RelatedTicketsPermissionsAPI(
TestCase,
):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
def test_add_has_permission_post_not_allowed(self):
""" Check correct permission for add

View File

@ -231,6 +231,18 @@ class HistoryPermissionsAPI(
):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
def test_view_list_has_permission(self):
""" Check correct permission for view
@ -263,7 +275,7 @@ class HistoryPermissionsAPI(
client.force_login(self.view_user)
response = client.get(url)
assert response.status_code == 403
assert response.status_code == 200
def test_add_has_permission_method_not_allowed(self):
@ -340,6 +352,22 @@ class HistoryPermissionsAPI(
pass
# item is not tenancy object
def test_view_different_organizaiton_denied(self):
""" Check correct permission for view
This test case is a duplicate of a test case with the same name. This
test is not required as currently the history model is not a tenancy
model.
see https://github.com/nofusscomputing/centurion_erp/issues/455 for
more details.
Attempt to view with user from different organization
"""
pass
class HistoryMetadata(
ViewSetBase,

View File

@ -147,7 +147,7 @@ class TaskResultPermissionsAPI(
self.item = self.model.objects.create(
task_id = 'd15233ee-a14d-4135-afe5-e406b1b61330',
task_name = 'api.tasks.process_inventory',
task_name = 'itam.tasks.process_inventory',
task_args = '{"random": "value"}',
task_kwargs = 'sdas',
status = "SUCCESS",
@ -209,6 +209,18 @@ class TaskResultPermissionsAPI(
)
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
def test_add_no_permission_denied(self):
""" Check correct permission for add

View File

@ -16,6 +16,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from core.models.ticket.ticket_category import TicketCategory
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -50,6 +53,34 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -242,6 +242,18 @@ class TicketCommentPermissionsAPI(
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
class TicketCommentMetadata(
ViewSetBase,

View File

@ -16,6 +16,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from core.models.ticket.ticket_comment_category import TicketCommentCategory
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -50,6 +53,33 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -247,7 +247,19 @@ class ViewSetBasePermissionsAPI(
APIPermissionView,
):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -53,7 +53,7 @@ class CeleryTaskResultAPI(
self.item = self.model.objects.create(
task_id = 'd15233ee-a14d-4135-afe5-e406b1b61330',
task_name = 'api.tasks.process_inventory',
task_name = 'itam.tasks.process_inventory',
task_args = '{"random": "value"}',
task_kwargs = 'sdas',
status = "SUCCESS",

View File

@ -6,7 +6,7 @@ from core.serializers.celery_log import (
TaskResultViewSerializer
)
from api.viewsets.common import ReadOnlyModelViewSet
from api.viewsets.common import AuthUserReadOnlyModelViewSet
@ -29,7 +29,7 @@ from api.viewsets.common import ReadOnlyModelViewSet
}
),
)
class ViewSet(ReadOnlyModelViewSet):
class ViewSet(AuthUserReadOnlyModelViewSet):
filterset_fields = [
'periodic_task_name',

View File

@ -2,7 +2,7 @@ from django.db.models import Q
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse
from api.viewsets.common import ModelViewSet
from api.viewsets.common import ReadOnlyModelViewSet
from core.serializers.history import (
History,
@ -27,7 +27,7 @@ from core.serializers.history import (
update = extend_schema( exclude = True ),
partial_update = extend_schema( exclude = True )
)
class ViewSet(ModelViewSet):
class ViewSet(ReadOnlyModelViewSet):
allowed_methods = [
'GET',

View File

@ -7,6 +7,7 @@ from access.mixin import OrganizationMixin
from api.viewsets.common import ModelListRetrieveDeleteViewSet
from core.serializers.ticket_related import (
Ticket,
RelatedTickets,
RelatedTicketModelSerializer,
RelatedTicketViewSerializer,
@ -79,6 +80,10 @@ class ViewSet(ModelListRetrieveDeleteViewSet):
model = RelatedTickets
parent_model = Ticket
parent_model_pk_kwarg = 'ticket_id'
def get_serializer_class(self):

View File

@ -86,10 +86,14 @@ class TicketViewSet(ModelViewSet):
"""
def get_dynamic_permissions(self):
def get_permission_required(self):
organization = None
if self._permission_required:
return self._permission_required
if(
self.action == 'create'
@ -108,9 +112,13 @@ class TicketViewSet(ModelViewSet):
or self.action == 'update'
):
obj = list(self.queryset)[0]
queryset = self.get_queryset()
organization = obj.organization
if len(queryset) > 0:
obj = queryset[0]
organization = obj.organization
if self.action == 'create':
@ -170,7 +178,10 @@ class TicketViewSet(ModelViewSet):
action_keyword = 'triage'
elif self.action is None:
elif(
self.action is None
or self.action == 'metadata'
):
action_keyword = 'view'
@ -178,11 +189,12 @@ class TicketViewSet(ModelViewSet):
raise ValueError('unable to determin the action_keyword')
self.permission_required = [
str('core.' + action_keyword + '_ticket_' + self._ticket_type).lower().replace(' ', '_'),
]
self._permission_required = str(
'core.' + action_keyword + '_ticket_' + self._ticket_type).lower().replace(' ', '_'
)
return self._permission_required
return super().get_permission_required()
def get_queryset(self):
@ -273,57 +285,59 @@ class TicketViewSet(ModelViewSet):
).organization.pk
if ( # Must be first as the priority to pickup
self._ticket_type
and self.action != 'list'
and self.action != 'retrieve'
):
if organization:
if self.has_organization_permission(
organization = organization,
permissions_required = [
'core.import_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
if ( # Must be first as the priority to pickup
self._ticket_type
and self.action != 'list'
and self.action != 'retrieve'
):
serializer_prefix = serializer_prefix + 'Import'
elif self.has_organization_permission(
organization = organization,
permissions_required = [
'core.triage_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
if self.has_organization_permission(
organization = organization,
permissions_required = [
'core.import_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
serializer_prefix = serializer_prefix + 'Triage'
serializer_prefix = serializer_prefix + 'Import'
elif self.has_organization_permission(
organization = organization,
permissions_required = [
'core.change_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
elif self.has_organization_permission(
organization = organization,
permissions_required = [
'core.triage_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
serializer_prefix = serializer_prefix + 'Change'
serializer_prefix = serializer_prefix + 'Triage'
elif self.has_organization_permission(
organization = organization,
permissions_required = [
'core.add_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
elif self.has_organization_permission(
organization = organization,
permissions_required = [
'core.change_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
serializer_prefix = serializer_prefix + 'Add'
serializer_prefix = serializer_prefix + 'Change'
elif self.has_organization_permission(
organization = organization,
permissions_required = [
'core.view_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
elif self.has_organization_permission(
organization = organization,
permissions_required = [
'core.add_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
serializer_prefix = serializer_prefix + 'View'
serializer_prefix = serializer_prefix + 'Add'
elif self.has_organization_permission(
organization = organization,
permissions_required = [
'core.view_ticket_' + str(self._ticket_type).lower().replace(' ', '_')
]
):
serializer_prefix = serializer_prefix + 'View'
if (

View File

@ -127,6 +127,75 @@ class ViewSet(ModelViewSet):
model = TicketLinkedItem
def get_parent_model(self):
if not self.parent_model:
if 'ticket_id' in self.kwargs:
self.parent_model = Ticket
self.parent_model_pk_kwarg = 'ticket_id'
elif 'item_id' in self.kwargs:
item_type: int = None
self.parent_model_pk_kwarg = 'item_id'
for choice in list(map(lambda c: c.name, TicketLinkedItem.Modules)):
if str(getattr(TicketLinkedItem.Modules, 'CLUSTER').label).lower() == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'CLUSTER').value
self.parent_model = Cluster
elif str(getattr(TicketLinkedItem.Modules, 'CONFIG_GROUP').label).lower().replace(' ', '_') == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'CONFIG_GROUP').value
self.parent_model = ConfigGroups
elif str(getattr(TicketLinkedItem.Modules, 'DEVICE').label).lower() == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'DEVICE').value
self.parent_model = Device
elif str(getattr(TicketLinkedItem.Modules, 'KB').label).lower().replace(' ', '_') == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'KB').value
self.parent_model = KnowledgeBase
elif str(getattr(TicketLinkedItem.Modules, 'OPERATING_SYSTEM').label).lower().replace(' ', '_') == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'OPERATING_SYSTEM').value
self.parent_model = OperatingSystem
elif str(getattr(TicketLinkedItem.Modules, 'SERVICE').label).lower() == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'SERVICE').value
self.parent_model = Service
elif str(getattr(TicketLinkedItem.Modules, 'SOFTWARE').label).lower() == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'SOFTWARE').value
self.parent_model = Software
self.item_type = item_type
return self.parent_model
def get_serializer_class(self):
if (
@ -140,73 +209,21 @@ class ViewSet(ModelViewSet):
return globals()[str( self.model._meta.verbose_name).replace(' ', '') + 'ModelSerializer']
def get_queryset(self):
if 'ticket_id' in self.kwargs:
self.queryset = TicketLinkedItem.objects.filter(ticket=self.kwargs['ticket_id']).order_by('id')
self.parent_model = Ticket
self.parent_model_pk_kwarg = 'ticket_id'
elif 'item_id' in self.kwargs:
item_type: int = None
self.parent_model_pk_kwarg = 'item_id'
for choice in list(map(lambda c: c.name, TicketLinkedItem.Modules)):
if str(getattr(TicketLinkedItem.Modules, 'CLUSTER').label).lower() == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'CLUSTER').value
self.parent_model = Cluster
elif str(getattr(TicketLinkedItem.Modules, 'CONFIG_GROUP').label).lower().replace(' ', '_') == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'CONFIG_GROUP').value
self.parent_model = ConfigGroups
elif str(getattr(TicketLinkedItem.Modules, 'DEVICE').label).lower() == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'DEVICE').value
self.parent_model = Device
elif str(getattr(TicketLinkedItem.Modules, 'KB').label).lower().replace(' ', '_') == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'KB').value
self.parent_model = KnowledgeBase
elif str(getattr(TicketLinkedItem.Modules, 'OPERATING_SYSTEM').label).lower().replace(' ', '_') == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'OPERATING_SYSTEM').value
self.parent_model = OperatingSystem
elif str(getattr(TicketLinkedItem.Modules, 'SERVICE').label).lower() == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'SERVICE').value
self.parent_model = Service
elif str(getattr(TicketLinkedItem.Modules, 'SOFTWARE').label).lower() == self.kwargs['item_class']:
item_type = getattr(TicketLinkedItem.Modules, 'SOFTWARE').value
self.parent_model = Software
self.queryset = TicketLinkedItem.objects.filter(
item=int(self.kwargs['item_id']),
item_type = item_type
item_type = self.item_type
)
self.item_type = item_type
if 'pk' in self.kwargs:
self.queryset = self.queryset.filter(pk = self.kwargs['pk'])

View File

@ -0,0 +1,19 @@
# Generated by Django 5.1.4 on 2024-12-31 03:13
import itam.models.device
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('itam', '0006_alter_deviceoperatingsystem_device'),
]
operations = [
migrations.AlterField(
model_name='device',
name='uuid',
field=models.CharField(blank=True, help_text='System GUID/UUID.', max_length=50, null=True, unique=True, validators=[itam.models.device.Device.validate_uuid_format], verbose_name='UUID'),
),
]

View File

@ -183,7 +183,6 @@ class Device(DeviceCommonFieldsName, SaveHistory):
uuid = models.CharField(
blank = True,
default = None,
help_text = 'System GUID/UUID.',
max_length = 50,
null = True,

View File

@ -1,3 +1,4 @@
from django.db.models import Q
from django.urls import reverse
from rest_framework import serializers
@ -41,10 +42,60 @@ class InventorySerializer(serializers.Serializer):
):
raise centurion_exceptions.ValidationError(
detail = 'Serial Number or UUID is required',
detail = 'Serial Number and/or UUID is required',
code = 'no_serial_or_uuid'
)
obj = Device.objects.filter(
Q(
name=str(data['name']).lower(),
serial_number = str(data['serial_number']).lower()
)
|
Q(
name = str(data['name']).lower(),
uuid = str(data['uuid']).lower()
)
|
Q(
serial_number = str(data['serial_number']).lower()
)
|
Q(
uuid = str(data['uuid']).lower()
)
)
if len(obj) > 1:
raise centurion_exceptions.ValidationError(
detail = {
'detail': 'Object is not unique. Confirm that uuid and/or serial number is unique'
},
code = 'not_unique'
)
elif len(obj) == 1:
obj = obj[0]
if obj.name == str(data['name']).lower():
if(
obj.serial_number != str(data['serial_number']).lower()
and obj.uuid != str(data['uuid']).lower()
):
raise centurion_exceptions.ValidationError(
detail = {
'detail': 'Device exists, however the serial number and/or UUID dont match'
},
code = 'not_unique'
)
return data

View File

View File

@ -65,3 +65,15 @@ class DeviceNotePermissionsAPI(
self.url_view_kwargs = {'device_id': self.note_item.id, 'pk': self.item.pk }
self.add_data = {'note': 'a note added', 'organization': self.organization.id}
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -13,6 +13,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itam.models.device import Device
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -47,6 +49,27 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -12,6 +12,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itam.models.device import DeviceModel
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +49,32 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -247,7 +247,17 @@ class DeviceOperatingSystemPermissionsAPI(
TestCase,
):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -214,8 +214,17 @@ class DeviceSoftwarePermissionsAPI(
TestCase
):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
class DeviceSoftwareViewSet(

View File

@ -12,6 +12,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itam.models.device import DeviceType
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +49,34 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -170,7 +170,17 @@ class OperatingSystemInstallsPermissionsAPI(
TestCase
):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -219,7 +219,17 @@ class SoftwareInstallsPermissionsAPI(
TestCase
):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -0,0 +1,510 @@
import copy
import pytest
from unittest.mock import patch
from django.contrib.auth.models import AnonymousUser, User
from django.contrib.contenttypes.models import ContentType
from django.shortcuts import reverse
from django.test import Client, TestCase
from access.models import Organization, Team, TeamUsers, Permission
from api.tests.abstract.api_serializer_viewset import SerializersTestCases
from api.tests.abstract.api_permissions_viewset import (
APIPermissionAdd,
APIPermissionChange
)
from api.tests.abstract.test_metadata_functional import MetadataAttributesFunctional, MetaDataNavigationEntriesFunctional
from itam.models.device import Device
from itam.tasks.inventory import process_inventory
from settings.models.user_settings import UserSettings
class ViewSetBase:
model = Device
app_namespace = 'v2'
url_name = '_api_v2_inventory'
@classmethod
def setUpTestData(self):
"""Setup Test
1. Create an organization for user and item
. create an organization that is different to item
2. Create a team
3. create teams with each permission: view, add, change, delete
4. create a user per team
"""
organization = Organization.objects.create(name='test_org')
self.organization = organization
different_organization = Organization.objects.create(name='test_different_organization')
self.different_organization = different_organization
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
view_team = Team.objects.create(
team_name = 'view_team',
organization = organization,
)
view_team.permissions.set([view_permissions])
add_permissions = Permission.objects.get(
codename = 'add_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
add_team = Team.objects.create(
team_name = 'add_team',
organization = organization,
)
add_team.permissions.set([add_permissions])
change_permissions = Permission.objects.get(
codename = 'change_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
change_team = Team.objects.create(
team_name = 'change_team',
organization = organization,
)
change_team.permissions.set([change_permissions])
delete_permissions = Permission.objects.get(
codename = 'delete_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
delete_team = Team.objects.create(
team_name = 'delete_team',
organization = organization,
)
delete_team.permissions.set([delete_permissions])
self.no_permissions_user = User.objects.create_user(username="test_no_permissions", password="password")
self.view_user = User.objects.create_user(username="test_user_view", password="password")
teamuser = TeamUsers.objects.create(
team = view_team,
user = self.view_user
)
self.item = self.model.objects.create(
organization = self.organization,
name = 'one-add'
)
self.other_org_item = self.model.objects.create(
organization = different_organization,
name = 'other_item'
)
self.add_user = User.objects.create_user(username="test_user_add", password="password")
teamuser = TeamUsers.objects.create(
team = add_team,
user = self.add_user
)
user_settings = UserSettings.objects.get(
user = self.add_user
)
user_settings.default_organization = self.organization
user_settings.save()
self.change_user = User.objects.create_user(username="test_user_change", password="password")
teamuser = TeamUsers.objects.create(
team = change_team,
user = self.change_user
)
user_settings = UserSettings.objects.get(
user = self.change_user
)
user_settings.default_organization = self.organization
user_settings.save()
self.delete_user = User.objects.create_user(username="test_user_delete", password="password")
teamuser = TeamUsers.objects.create(
team = delete_team,
user = self.delete_user
)
self.different_organization_user = User.objects.create_user(username="test_different_organization_user", password="password")
user_settings = UserSettings.objects.get(
user = self.different_organization_user
)
user_settings.default_organization = different_organization
user_settings.save()
different_organization_team = Team.objects.create(
team_name = 'different_organization_team',
organization = different_organization,
)
different_organization_team.permissions.set([
view_permissions,
add_permissions,
change_permissions,
delete_permissions,
])
TeamUsers.objects.create(
team = different_organization_team,
user = self.different_organization_user
)
self.inventory: dict = {
"details": {
"name": "string",
"serial_number": "string",
"uuid": "fc65b513-3ddc-4c90-af20-215b2db73455"
},
"os": {
"name": "string",
"version_major": 1,
"version": "1.2"
},
"software": [
{
"name": "string",
"category": "string",
"version": "1.1.1"
}
]
}
self.add_data = copy.deepcopy(self.inventory)
self.change_data = copy.deepcopy(self.inventory)
self.change_data['details']['name'] = 'device2'
self.change_data['details']['serial_number'] = 'sn123'
self.change_data['details']['uuid'] = '93e8e991-ad07-4b7b-a1a6-59968a5b54f8'
Device.objects.create(
organization = self.organization,
name = self.change_data['details']['name'],
serial_number = self.change_data['details']['serial_number'],
uuid = self.change_data['details']['uuid'],
)
class DevicePermissionsAPI(
ViewSetBase,
# APIPermissionAdd,
# APIPermissionChange,
TestCase
):
url_kwargs = None
url_view_kwargs = None
@patch.object(process_inventory, 'delay')
def test_add_has_permission(self, process_inventory):
""" Check correct permission for add
This test case is a over-ride of a test case with the same name.
This was done as the testcase needed to be modified to work with the
itam inventory endpoint.
Attempt to add as user with permission
"""
client = Client()
if self.url_kwargs:
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs = self.url_kwargs)
else:
url = reverse(self.app_namespace + ':' + self.url_name + '-list')
client.force_login(self.add_user)
response = client.post(url, data=self.add_data, content_type = 'application/json')
assert response.status_code == 200
@patch.object(process_inventory, 'delay')
def test_add_user_anon_denied(self, process_inventory):
""" Check correct permission for add
Attempt to add as anon user
"""
client = Client()
if self.url_kwargs:
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs = self.url_kwargs)
else:
url = reverse(self.app_namespace + ':' + self.url_name + '-list')
response = client.put(url, data=self.add_data)
assert response.status_code == 401
@patch.object(process_inventory, 'delay')
def test_add_no_permission_denied(self, process_inventory):
""" Check correct permission for add
Attempt to add as user with no permissions
"""
client = Client()
if self.url_kwargs:
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs = self.url_kwargs)
else:
url = reverse(self.app_namespace + ':' + self.url_name + '-list')
client.force_login(self.no_permissions_user)
response = client.post(url, data=self.add_data)
assert response.status_code == 403
# # @pytest.mark.skip(reason="ToDO: figure out why fails")
# def test_add_different_organization_denied(self):
# """ Check correct permission for add
# attempt to add as user from different organization
# """
# client = Client()
# if self.url_kwargs:
# url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs = self.url_kwargs)
# else:
# url = reverse(self.app_namespace + ':' + self.url_name + '-list')
# client.force_login(self.different_organization_user)
# response = client.post(url, data=self.add_data)
# assert response.status_code == 403
@patch.object(process_inventory, 'delay')
def test_add_permission_view_denied(self, process_inventory):
""" Check correct permission for add
Attempt to add a user with view permission
"""
client = Client()
if self.url_kwargs:
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs = self.url_kwargs)
else:
url = reverse(self.app_namespace + ':' + self.url_name + '-list')
client.force_login(self.view_user)
response = client.post(url, data=self.add_data)
assert response.status_code == 403
@patch.object(process_inventory, 'delay')
def test_change_has_permission(self, process_inventory):
""" Check correct permission for change
This test case is a over-ride of a test case with the same name.
This was done as the testcase needed to be modified to work with the
itam inventory endpoint.
Make change with user who has change permission
"""
client = Client()
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs=self.url_view_kwargs)
client.force_login(self.change_user)
response = client.post(url, data=self.change_data, content_type='application/json')
assert response.status_code == 200
@patch.object(process_inventory, 'delay')
def test_change_user_anon_denied(self, process_inventory):
""" Check correct permission for change
Attempt to change as anon
"""
client = Client()
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs=self.url_view_kwargs)
response = client.post(url, data=self.change_data, content_type='application/json')
assert response.status_code == 401
@patch.object(process_inventory, 'delay')
def test_change_no_permission_denied(self, process_inventory):
""" Ensure permission view cant make change
Attempt to make change as user without permissions
"""
client = Client()
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs=self.url_view_kwargs)
client.force_login(self.no_permissions_user)
response = client.post(url, data=self.change_data, content_type='application/json')
assert response.status_code == 403
@pytest.mark.skip( reason = 'see https://github.com/nofusscomputing/centurion_erp/issues/461' )
@patch.object(process_inventory, 'delay')
def test_change_different_organization_denied(self, process_inventory):
""" Ensure permission view cant make change
Attempt to make change as user from different organization
"""
client = Client()
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs=self.url_view_kwargs)
client.force_login(self.different_organization_user)
response = client.post(url, data=self.change_data, content_type='application/json')
assert response.status_code == 403
@patch.object(process_inventory, 'delay')
def test_change_permission_view_denied(self, process_inventory):
""" Ensure permission view cant make change
Attempt to make change as user with view permission
"""
client = Client()
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs=self.url_view_kwargs)
client.force_login(self.view_user)
response = client.post(url, data=self.change_data, content_type='application/json')
assert response.status_code == 403
@patch.object(process_inventory, 'delay')
def test_change_permission_add_denied(self, process_inventory):
""" Ensure permission view cant make change
Attempt to make change as user with add permission
"""
client = Client()
url = reverse(self.app_namespace + ':' + self.url_name + '-list', kwargs=self.url_view_kwargs)
client.force_login(self.add_user)
response = client.post(url, data=self.change_data, content_type='application/json')
assert response.status_code == 403

View File

@ -65,3 +65,15 @@ class OperatingSystemNotePermissionsAPI(
self.url_view_kwargs = {'operating_system_id': self.note_item.id, 'pk': self.item.pk }
self.add_data = {'note': 'a note added', 'organization': self.organization.id}
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -12,6 +12,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itam.models.operating_system import OperatingSystem
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +49,34 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -12,6 +12,9 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itam.models.operating_system import OperatingSystem, OperatingSystemVersion
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +49,39 @@ class ViewSetBase:
self.different_organization = different_organization
os = OperatingSystem.objects.create(
organization = self.organization,
name = 'one-add'
)
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = '22',
operating_system = os
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
@ -122,11 +158,6 @@ class ViewSetBase:
user = self.view_user
)
os = OperatingSystem.objects.create(
organization = self.organization,
name = 'one-add'
)
os_b = OperatingSystem.objects.create(
organization = different_organization,
name = 'two-add'

View File

@ -65,3 +65,15 @@ class SoftwareNotePermissionsAPI(
self.url_view_kwargs = {'software_id': self.note_item.id, 'pk': self.item.pk }
self.add_data = {'note': 'a note added', 'organization': self.organization.id}
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -12,6 +12,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itam.models.software import Software
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +48,35 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -12,6 +12,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itam.models.software import SoftwareCategory
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +48,34 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -12,6 +12,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itam.models.software import Software, SoftwareVersion
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +48,39 @@ class ViewSetBase:
self.different_organization = different_organization
software = Software.objects.create(
organization = self.organization,
name = 'software'
)
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = '12',
software = software
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
@ -122,11 +157,6 @@ class ViewSetBase:
user = self.view_user
)
software = Software.objects.create(
organization = self.organization,
name = 'software'
)
software_b = Software.objects.create(
organization = different_organization,
name = 'software-b'

View File

@ -2,6 +2,8 @@ from django.contrib.auth.models import User
from django.shortcuts import reverse
from django.test import Client, TestCase
from rest_framework.permissions import IsAuthenticated
from access.models import Organization
from api.tests.abstract.viewsets import ViewSetCommon
@ -39,4 +41,18 @@ class ItamViewset(
client.force_login(self.view_user)
self.http_options_response_list = client.options(url)
self.http_options_response_list = client.options(url)
def test_view_attr_permission_classes_value(self):
"""Attribute Test
Attribute `permission_classes` must be metadata class `ReactUIMetadata`
"""
view_set = self.viewset()
assert view_set.permission_classes[0] is IsAuthenticated
assert len(view_set.permission_classes) == 1

View File

@ -16,11 +16,10 @@ from access.models import Organization, Team, TeamUsers, Permission
from api.views.mixin import OrganizationPermissionAPI
from api.serializers.inventory import Inventory
from api.tasks import process_inventory
from itam.models.device import Device, DeviceOperatingSystem, DeviceSoftware
from itam.models.operating_system import OperatingSystem, OperatingSystemVersion
from itam.models.software import Software, SoftwareCategory, SoftwareVersion
from itam.tasks.inventory import process_inventory
from settings.models.user_settings import UserSettings

View File

@ -3,12 +3,12 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from api.viewsets.common import CommonViewSet
from api.viewsets.common import IndexViewset
@extend_schema(exclude = True)
class Index(CommonViewSet):
class Index(IndexViewset):
allowed_methods: list = [
'GET',

View File

@ -1,11 +1,13 @@
import json
from django.db.models import Q
from kombu.exceptions import OperationalError
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse
from rest_framework.response import Response
from api.tasks import process_inventory
from api.viewsets.common import ModelCreateViewSet
from core import exceptions as centurion_exception
@ -13,6 +15,7 @@ from core.http.common import Http
from itam.models.device import Device
from itam.serializers.inventory import InventorySerializer
from itam.tasks.inventory import process_inventory
from settings.models.user_settings import UserSettings
@ -101,16 +104,42 @@ class ViewSet( ModelCreateViewSet ):
self.default_organization = UserSettings.objects.get(user=request.user).default_organization
if Device.objects.filter(slug=str(data.validated_data['details']['name']).lower()).exists():
obj_organaization_id = getattr(self.default_organization, 'id', None)
self.obj = Device.objects.get(slug=str(data.validated_data['details']['name']).lower())
device = self.obj
obj = Device.objects.filter(
Q(
name=str(data.validated_data['details']['name']).lower(),
serial_number = str(data.validated_data['details']['serial_number']).lower()
task = process_inventory.delay(data.validated_data, self.default_organization.id)
)
|
Q(
name = str(data.validated_data['details']['name']).lower(),
uuid = str(data.validated_data['details']['uuid']).lower()
)
)
if len(obj) == 1:
obj_organaization_id = obj[0].organization.id
if not obj_organaization_id:
raise centurion_exception.ValidationError({
'detail': 'No Default organization set for user'
})
task = process_inventory.delay(data.validated_data, obj_organaization_id)
response_data: dict = {"task_id": f"{task.id}"}
except OperationalError as e:
status = 503
response_data = f'RabbitMQ error: {e.args[0]}'
except centurion_exception.PermissionDenied as e:
@ -193,4 +222,8 @@ class ViewSet( ModelCreateViewSet ):
self.inventory_action = 'new'
return super().get_permission_required()
return self.permission_required
def get_serializer_class(self):
return InventorySerializer

View File

@ -54,6 +54,7 @@ class ClusterModelSerializer(
return {
'_self': item.get_url( request = self._context['view'].request ),
'external_links': reverse("v2:_api_v2_external_link-list", request=self._context['view'].request) + '?cluster=true',
'history': reverse(
"v2:_api_v2_model_history-list",
request=self._context['view'].request,
@ -71,6 +72,7 @@ class ClusterModelSerializer(
}
),
'notes': reverse("v2:_api_v2_cluster_notes-list", request=self._context['view'].request, kwargs={'cluster_id': item.pk}),
'service': reverse("v2:_api_v2_service_cluster-list", request=self._context['view'].request, kwargs={'cluster_id': item.pk}),
'tickets': reverse(
"v2:_api_v2_item_tickets-list",
request=self._context['view'].request,

View File

@ -12,6 +12,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itim.models.clusters import Cluster
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +48,33 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -12,6 +12,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itim.models.clusters import ClusterType
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +48,31 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -12,6 +12,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from itim.models.services import Port
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +48,34 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
number = 8181,
protocol = Port.Protocol.TCP
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -60,3 +60,15 @@ class ServiceNotePermissionsAPI(
self.url_view_kwargs = {'service_id': self.note_item.id, 'pk': self.item.pk }
self.add_data = {'note': 'a note added', 'organization': self.organization.id}
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -14,6 +14,8 @@ from itam.models.device import Device
from itim.models.services import Service, Port
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -48,6 +50,40 @@ class ViewSetBase:
self.different_organization = different_organization
device = Device.objects.create(
organization=organization,
name = 'device'
)
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item',
device = device,
config_key_variable = 'value'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
@ -124,11 +160,6 @@ class ViewSetBase:
user = self.view_user
)
device = Device.objects.create(
organization=organization,
name = 'device'
)
port = Port.objects.create(
organization=organization,
number = 80,

View File

@ -0,0 +1,190 @@
import pytest
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase
from access.models import Organization, Team, TeamUsers, Permission
from api.tests.abstract.api_permissions_viewset import APIPermissionView
from api.tests.abstract.api_serializer_viewset import SerializerView
from api.tests.abstract.test_metadata_functional import MetadataAttributesFunctional, MetaDataNavigationEntriesFunctional
from itim.models.clusters import Cluster
from itim.models.services import Service, Port
class ViewSetBase:
model = Service
app_namespace = 'v2'
url_name = '_api_v2_service_cluster'
@classmethod
def setUpTestData(self):
"""Setup Test
1. Create an organization for user and item
. create an organization that is different to item
2. Create a team
3. create teams with each permission: view, add, change, delete
4. create a user per team
"""
organization = Organization.objects.create(name='test_org')
self.organization = organization
different_organization = Organization.objects.create(name='test_different_organization')
self.different_organization = different_organization
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
view_team = Team.objects.create(
team_name = 'view_team',
organization = organization,
)
view_team.permissions.set([view_permissions])
add_permissions = Permission.objects.get(
codename = 'add_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
change_permissions = Permission.objects.get(
codename = 'change_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
delete_permissions = Permission.objects.get(
codename = 'delete_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
self.no_permissions_user = User.objects.create_user(username="test_no_permissions", password="password")
self.view_user = User.objects.create_user(username="test_user_view", password="password")
teamuser = TeamUsers.objects.create(
team = view_team,
user = self.view_user
)
cluster = Cluster.objects.create(
organization=organization,
name = 'cluster'
)
port = Port.objects.create(
organization=organization,
number = 80,
protocol = Port.Protocol.TCP
)
self.item = self.model.objects.create(
organization=organization,
name = 'os name',
cluster = cluster,
config_key_variable = 'value'
)
self.other_org_item = self.model.objects.create(
organization=different_organization,
name = 'os name b',
cluster = cluster,
config_key_variable = 'values'
)
self.item.port.set([ port ])
self.url_view_kwargs = {'cluster_id': cluster.id, 'pk': self.item.id}
self.url_kwargs = {'cluster_id': cluster.id}
self.different_organization_user = User.objects.create_user(username="test_different_organization_user", password="password")
different_organization_team = Team.objects.create(
team_name = 'different_organization_team',
organization = different_organization,
)
different_organization_team.permissions.set([
view_permissions,
add_permissions,
change_permissions,
delete_permissions,
])
TeamUsers.objects.create(
team = different_organization_team,
user = self.different_organization_user
)
class ServicePermissionsAPI(ViewSetBase, APIPermissionView, TestCase):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
class ServiceViewSet(ViewSetBase, SerializerView, TestCase):
pass
class ServiceMetadata(
ViewSetBase,
MetadataAttributesFunctional,
# MetaDataNavigationEntriesFunctional,
TestCase
):
# menu_id = 'itim'
# menu_entry_id = 'service'
pass

View File

@ -0,0 +1,192 @@
import pytest
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase
from access.models import Organization, Team, TeamUsers, Permission
from api.tests.abstract.api_permissions_viewset import APIPermissionView
from api.tests.abstract.api_serializer_viewset import SerializerView
from api.tests.abstract.test_metadata_functional import MetadataAttributesFunctional, MetaDataNavigationEntriesFunctional
from itim.models.clusters import Cluster
from itim.models.services import Service, Port
from itam.models.device import Device
class ViewSetBase:
model = Service
app_namespace = 'v2'
url_name = '_api_v2_service_device'
@classmethod
def setUpTestData(self):
"""Setup Test
1. Create an organization for user and item
. create an organization that is different to item
2. Create a team
3. create teams with each permission: view, add, change, delete
4. create a user per team
"""
organization = Organization.objects.create(name='test_org')
self.organization = organization
different_organization = Organization.objects.create(name='test_different_organization')
self.different_organization = different_organization
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
view_team = Team.objects.create(
team_name = 'view_team',
organization = organization,
)
view_team.permissions.set([view_permissions])
add_permissions = Permission.objects.get(
codename = 'add_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
change_permissions = Permission.objects.get(
codename = 'change_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
delete_permissions = Permission.objects.get(
codename = 'delete_' + self.model._meta.model_name,
content_type = ContentType.objects.get(
app_label = self.model._meta.app_label,
model = self.model._meta.model_name,
)
)
self.no_permissions_user = User.objects.create_user(username="test_no_permissions", password="password")
self.view_user = User.objects.create_user(username="test_user_view", password="password")
teamuser = TeamUsers.objects.create(
team = view_team,
user = self.view_user
)
device = Device.objects.create(
organization=organization,
name = 'cluster'
)
port = Port.objects.create(
organization=organization,
number = 80,
protocol = Port.Protocol.TCP
)
self.item = self.model.objects.create(
organization=organization,
name = 'os name',
device = device,
config_key_variable = 'value'
)
self.other_org_item = self.model.objects.create(
organization=different_organization,
name = 'os name b',
device = device,
config_key_variable = 'values'
)
self.item.port.set([ port ])
self.url_view_kwargs = {'device_id': device.id, 'pk': self.item.id}
self.url_kwargs = {'device_id': device.id}
self.different_organization_user = User.objects.create_user(username="test_different_organization_user", password="password")
different_organization_team = Team.objects.create(
team_name = 'different_organization_team',
organization = different_organization,
)
different_organization_team.permissions.set([
view_permissions,
add_permissions,
change_permissions,
delete_permissions,
])
TeamUsers.objects.create(
team = different_organization_team,
user = self.different_organization_user
)
class ServicePermissionsAPI(ViewSetBase, APIPermissionView, TestCase):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
class ServiceViewSet(ViewSetBase, SerializerView, TestCase):
pass
class ServiceMetadata(
ViewSetBase,
MetadataAttributesFunctional,
# MetaDataNavigationEntriesFunctional,
TestCase
):
# menu_id = 'itim'
# menu_entry_id = 'service'
pass

View File

@ -2,6 +2,8 @@ from django.contrib.auth.models import User
from django.shortcuts import reverse
from django.test import Client, TestCase
from rest_framework.permissions import IsAuthenticated
from access.models import Organization
from api.tests.abstract.viewsets import ViewSetCommon
@ -39,4 +41,18 @@ class ITIMViewset(
client.force_login(self.view_user)
self.http_options_response_list = client.options(url)
self.http_options_response_list = client.options(url)
def test_view_attr_permission_classes_value(self):
"""Attribute Test
Attribute `permission_classes` must be metadata class `ReactUIMetadata`
"""
view_set = self.viewset()
assert view_set.permission_classes[0] is IsAuthenticated
assert len(view_set.permission_classes) == 1

View File

@ -467,3 +467,22 @@ class ClusterAPI(
"""
assert type(self.api_data['_urls']['tickets']) is str
def test_api_field_exists_urls_external_links(self):
""" Test for existance of API Field
_urls.external_links field must exist
"""
assert 'external_links' in self.api_data['_urls']
def test_api_field_type_urls_external_links(self):
""" Test for type for API Field
_urls.external_links field must be str
"""
assert type(self.api_data['_urls']['external_links']) is str

View File

@ -3,12 +3,12 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from api.viewsets.common import CommonViewSet
from api.viewsets.common import IndexViewset
@extend_schema(exclude = True)
class Index(CommonViewSet):
class Index(IndexViewset):
allowed_methods: list = [
'GET',

View File

@ -0,0 +1,56 @@
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse
from api.viewsets.common import ReadOnlyModelViewSet
from itim.serializers.service import (
Service,
ServiceModelSerializer,
ServiceViewSerializer
)
@extend_schema_view(
list=extend_schema(exclude=True),
retrieve=extend_schema(exclude=True),
create=extend_schema(exclude=True),
update=extend_schema(exclude=True),
partial_update=extend_schema(exclude=True),
destroy=extend_schema(exclude=True)
)
class ViewSet(ReadOnlyModelViewSet):
filterset_fields = [
'cluster',
'port',
]
search_fields = [
'name',
]
model = Service
def get_queryset(self):
queryset = super().get_queryset()
queryset = queryset.filter(cluster_id=self.kwargs['cluster_id'])
self.queryset = queryset
return self.queryset
def get_serializer_class(self):
if (
self.action == 'list'
or self.action == 'retrieve'
):
return globals()[str( self.model._meta.verbose_name).replace(' ', '') + 'ViewSerializer']
return globals()[str( self.model._meta.verbose_name).replace(' ', '') + 'ModelSerializer']

View File

@ -1,6 +1,6 @@
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse
from api.viewsets.common import ModelViewSet
from api.viewsets.common import ReadOnlyModelViewSet
from itim.serializers.service import (
Service,
@ -18,7 +18,7 @@ from itim.serializers.service import (
partial_update=extend_schema(exclude=True),
destroy=extend_schema(exclude=True)
)
class ViewSet(ModelViewSet):
class ViewSet(ReadOnlyModelViewSet):
filterset_fields = [
'cluster',

View File

@ -13,6 +13,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from project_management.models.projects import Project
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -47,6 +49,31 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -199,7 +199,18 @@ class ViewSetBase:
class ProjectMilestonePermissionsAPI(ViewSetBase, APIPermissions, TestCase):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -12,6 +12,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from project_management.models.project_states import ProjectState
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +48,32 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -12,6 +12,8 @@ from api.tests.abstract.test_metadata_functional import MetadataAttributesFuncti
from project_management.models.project_types import ProjectType
from settings.models.app_settings import AppSettings
class ViewSetBase:
@ -46,6 +48,31 @@ class ViewSetBase:
self.different_organization = different_organization
self.global_organization = Organization.objects.create(
name = 'test_global_organization'
)
self.global_org_item = self.model.objects.create(
organization = self.global_organization,
name = 'global_item'
)
app_settings = AppSettings.objects.get(
owner_organization = None
)
app_settings.global_organization = self.global_organization
app_settings.save()
view_permissions = Permission.objects.get(
codename = 'view_' + self.model._meta.model_name,
content_type = ContentType.objects.get(

View File

@ -2,6 +2,8 @@ from django.contrib.auth.models import User
from django.shortcuts import reverse
from django.test import Client, TestCase
from rest_framework.permissions import IsAuthenticated
from access.models import Organization
from api.tests.abstract.viewsets import ViewSetCommon
@ -40,4 +42,18 @@ class ProjectManagementViewset(
client.force_login(self.view_user)
self.http_options_response_list = client.options(url)
self.http_options_response_list = client.options(url)
def test_view_attr_permission_classes_value(self):
"""Attribute Test
Attribute `permission_classes` must be metadata class `ReactUIMetadata`
"""
view_set = self.viewset()
assert view_set.permission_classes[0] is IsAuthenticated
assert len(view_set.permission_classes) == 1

View File

@ -3,12 +3,12 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from api.viewsets.common import CommonViewSet
from api.viewsets.common import IndexViewset
@extend_schema(exclude = True)
class Index(CommonViewSet):
class Index(IndexViewset):
allowed_methods: list = [
'GET',

View File

@ -197,6 +197,18 @@ class AppSettingsPermissionsAPI(
):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
def test_add_create_not_allowed(self):
""" Check correct permission for add
@ -212,6 +224,18 @@ class AppSettingsPermissionsAPI(
assert e.typename == 'NoReverseMatch'
def test_change_different_organization_denied(self):
""" Ensure permission view cant make change
This test case is N/A as app settings are not a tenancy model
Attempt to make change as user from different organization
"""
pass
def test_delete_has_permission(self):
""" Check correct permission for delete
@ -237,6 +261,17 @@ class AppSettingsPermissionsAPI(
pass
def test_view_different_organizaiton_denied(self):
""" Check correct permission for view
This test case is N/A as app settings are not a tenancy model
Attempt to view with user from different organization
"""
pass
class AppSettingsViewSet(
ViewSetBase,
SerializerChange,

View File

@ -194,7 +194,17 @@ class ViewSetBase:
class ExternalLinkPermissionsAPI(ViewSetBase, APIPermissions, TestCase):
pass
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass

View File

@ -2,6 +2,8 @@ from django.contrib.auth.models import User
from django.shortcuts import reverse
from django.test import Client, TestCase
from rest_framework.permissions import IsAuthenticated
from access.models import Organization
from api.tests.abstract.viewsets import ViewSetCommon
@ -40,4 +42,18 @@ class SettingsViewset(
client.force_login(self.view_user)
self.http_options_response_list = client.options(url)
self.http_options_response_list = client.options(url)
def test_view_attr_permission_classes_value(self):
"""Attribute Test
Attribute `permission_classes` must be metadata class `ReactUIMetadata`
"""
view_set = self.viewset()
assert view_set.permission_classes[0] is IsAuthenticated
assert len(view_set.permission_classes) == 1

View File

@ -198,6 +198,19 @@ class UserSettingsPermissionsAPI(
):
def test_returned_data_from_user_and_global_organizations_only(self):
"""Check items returned
This test case is a over-ride of a test case with the same name.
This model is not a tenancy model making this test not-applicable.
Items returned from the query Must be from the users organization and
global ONLY!
"""
pass
def test_add_create_not_allowed(self):
""" Check correct permission for add

Some files were not shown because too many files have changed in this diff Show More