Skip to content

Commit

Permalink
✨(teams) return parent teams in resource server
Browse files Browse the repository at this point in the history
Also return the parent teams in the user's team endpoints.
  • Loading branch information
qbey committed Jan 6, 2025
1 parent 8c982b3 commit e7202a1
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 5 deletions.
41 changes: 38 additions & 3 deletions src/backend/core/api/resource_server/viewsets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Resource server API endpoints"""

from django.db.models import OuterRef, Prefetch, Subquery
import operator
from functools import reduce

from django.db.models import OuterRef, Prefetch, Q, Subquery, Value
from django.db.models.functions import Coalesce

from rest_framework import (
filters,
Expand Down Expand Up @@ -56,6 +60,14 @@ class TeamViewSet( # pylint: disable=too-many-ancestors

def get_queryset(self):
"""Custom queryset to get user related teams."""
teams_queryset = models.Team.objects.filter(
accesses__user=self.request.user,
)
depth_path = teams_queryset.values("depth", "path")

if not depth_path:
return models.Team.objects.none()

user_role_query = models.TeamAccess.objects.filter(
user=self.request.user, team=OuterRef("pk")
).values("role")[:1]
Expand All @@ -74,10 +86,33 @@ def get_queryset(self):
service_provider_prefetch,
)
.filter(
accesses__user=self.request.user,
reduce(
operator.or_,
(
Q(
# The team the user has access to
depth=d["depth"],
path=d["path"],
)
| Q(
# The parent team the user has access to
depth__lt=d["depth"],
path__startswith=d["path"][: models.Team.steplen],
organization_id=self.request.user.organization_id,
)
for d in depth_path
),
),
service_providers__audience_id=service_provider_audience,
)
.annotate(user_role=Subquery(user_role_query))
# Abilities are computed based on logged-in user's role for the team
# and if the user does not have access, it's ok to consider them as a member
# because it's a parent team.
.annotate(
user_role=Coalesce(
Subquery(user_role_query), Value(models.RoleChoices.MEMBER.value)
)
)
)

def perform_create(self, serializer):
Expand Down
81 changes: 80 additions & 1 deletion src/backend/core/tests/resource_server_api/teams/test_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def test_api_teams_list_authenticated( # pylint: disable=too-many-locals

# Authenticate using the resource server, ie via the Authorization header
with force_login_via_resource_server(client, user, service_provider.audience_id):
with django_assert_num_queries(4): # Count, Team, ServiceProvider, TeamAccess
with django_assert_num_queries(5):
# queries: Team path, Count, Team, ServiceProvider, TeamAccess
response = client.get(
"/resource-server/v1.0/teams/?ordering=created_at",
format="json",
Expand Down Expand Up @@ -182,3 +183,81 @@ def test_api_teams_order_param(client, force_login_via_resource_server):
assert (
response_team_ids == team_ids
), "created_at values are not sorted from oldest to newest"


def test_api_teams_list_with_parent_teams(client, force_login_via_resource_server):
"""
Authenticated users should be able to list teams including parent teams.
Parent teams should not be listed if they don't have the service provider.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()

root_team = factories.TeamFactory(name="Root", service_providers=[service_provider])
first_team = factories.TeamFactory(name="First", parent_id=root_team.pk)
second_team = factories.TeamFactory(
name="Second", parent_id=first_team.pk, service_providers=[service_provider]
)

factories.TeamAccessFactory(user=user, team=second_team, role="member")

with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
"/resource-server/v1.0/teams/",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)

assert response.status_code == HTTP_200_OK
response_data = response.json()
assert response_data["count"] == 2

team_ids = [team["id"] for team in response_data["results"]]
assert len(team_ids) == 2
assert set(team_ids) == {str(root_team.id), str(second_team.id)}


def test_api_teams_list_with_parent_teams_other_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should be able to list teams including parent teams.
Parent teams should not be listed if they don't have the service provider
or if the user does not belong to the organization.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()

other_organization = factories.OrganizationFactory(with_registration_id=True)
root_team = factories.TeamFactory(
name="Root",
service_providers=[service_provider],
organization=other_organization,
)
first_team = factories.TeamFactory(
name="First", parent_id=root_team.pk, organization=other_organization
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=other_organization,
)

factories.TeamAccessFactory(user=user, team=second_team, role="member")

with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
"/resource-server/v1.0/teams/",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)

assert response.status_code == HTTP_200_OK
response_data = response.json()
assert response_data["count"] == 1

team_ids = [team["id"] for team in response_data["results"]]
assert len(team_ids) == 1
assert set(team_ids) == {str(second_team.id)}
128 changes: 127 additions & 1 deletion src/backend/core/tests/resource_server_api/teams/test_retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from rest_framework.test import APIClient

from core import factories
from core.factories import UserFactory
from core.factories import TeamAccessFactory, UserFactory

pytestmark = pytest.mark.django_db

Expand Down Expand Up @@ -97,3 +97,129 @@ def test_api_teams_retrieve_authenticated_other_service_provider(
)

assert response.status_code == HTTP_404_NOT_FOUND


def test_api_teams_retrieve_authenticated_related_parent_same_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should be allowed to retrieve a parent team
of a team to which they are related, only if they belong to the
same organization.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()

root_team = factories.TeamFactory(
name="Root",
organization=organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=organization,
)
TeamAccessFactory(user=user, team=second_team, role="member")

with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{first_team.pk}/",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)

assert response.status_code == status.HTTP_200_OK
assert response.json() == {
"id": str(first_team.pk),
"name": first_team.name,
"created_at": first_team.created_at.isoformat().replace("+00:00", "Z"),
"updated_at": first_team.updated_at.isoformat().replace("+00:00", "Z"),
}


def test_api_teams_retrieve_authenticated_related_parent_other_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should be allowed to retrieve a parent team
of a team to which they are related, only if they belong to the
same organization.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()

other_organization = factories.OrganizationFactory(with_registration_id=True)
root_team = factories.TeamFactory(
name="Root",
organization=other_organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=other_organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=other_organization,
)
TeamAccessFactory(user=user, team=second_team, role="member")

with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{first_team.pk}/",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)

assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}


def test_api_teams_retrieve_authenticated_related_child_same_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should NOT be allowed to retrieve a child team
of a team to which they are related, even if they belong to the
same organization.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()

root_team = factories.TeamFactory(
name="Root",
organization=organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=organization,
)
TeamAccessFactory(user=user, team=first_team, role="member")

with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{second_team.pk}/",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)

assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}
92 changes: 92 additions & 0 deletions src/backend/core/tests/resource_server_api/teams/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,95 @@ def test_api_teams_update_administrator_or_owner_of_another(
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values


@pytest.mark.parametrize("role", ["administrator", "member", "owner"])
def test_api_teams_update_parent_team(client, force_login_via_resource_server, role):
"""
Belonging to a team should NOT grant authorization to update
another parent team.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()

root_team = factories.TeamFactory(
name="Root",
organization=organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=organization,
)
factories.TeamAccessFactory(user=user, team=second_team, role=role)

with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.patch(
f"/resource-server/v1.0/teams/{first_team.pk}/",
{
"name": "New name",
},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)

assert response.status_code == HTTP_403_FORBIDDEN
assert response.json() == {
"detail": "You do not have permission to perform this action."
}

first_team.refresh_from_db()
assert first_team.name == "First"


@pytest.mark.parametrize("role", ["administrator", "member", "owner"])
def test_api_teams_update_child_team(client, force_login_via_resource_server, role):
"""
Belonging to a team should NOT grant authorization to update
another child team.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()

root_team = factories.TeamFactory(
name="Root",
organization=organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=organization,
)
factories.TeamAccessFactory(user=user, team=first_team, role=role)

with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.patch(
f"/resource-server/v1.0/teams/{second_team.pk}/",
{
"name": "New name",
},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)

assert response.status_code == HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}

second_team.refresh_from_db()
assert second_team.name == "Second"

0 comments on commit e7202a1

Please sign in to comment.