Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feature/user_role' into gtc_2794…
Browse files Browse the repository at this point in the history
…_protect_dataset_endpoints
  • Loading branch information
dmannarino committed May 8, 2024
2 parents 98808ab + e5c8e38 commit ce39e29
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
2 changes: 1 addition & 1 deletion app/authentication/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def get_user(token: str = Depends(oauth2_scheme)) -> User:
logger.info("Unauthorized user")
raise HTTPException(status_code=401, detail="Unauthorized")
else:
return User(**response.json()["data"])
return User(**response.json())


async def get_admin(user: User = Depends(get_user)) -> User:
Expand Down
20 changes: 8 additions & 12 deletions app/routes/authentication/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,10 @@ async def create_api_key(
Default keys are valid for one year
"""

user_id, user_role = user["id"], user["role"]

if api_key_data.never_expires and user_role != "ADMIN":
if api_key_data.never_expires and user.role != "ADMIN":
raise HTTPException(
status_code=400,
detail=f"Users with role {user_role} cannot set `never_expires` to True.",
detail=f"Users with role {user.role} cannot set `never_expires` to True.",
)

input_data = api_key_data.dict(by_alias=True)
Expand All @@ -86,15 +84,15 @@ async def create_api_key(

# Give a good error code/message if user is specifying an alias that exists for
# another one of his API keys.
prev_keys: List[ORMApiKey] = await api_keys.get_api_keys_from_user(user_id=user_id)
prev_keys: List[ORMApiKey] = await api_keys.get_api_keys_from_user(user_id=user.id)
for key in prev_keys:
if key.alias == api_key_data.alias:
raise HTTPException(
status_code=409,
detail="Key with specified alias already exists; use a different alias",
)

row: ORMApiKey = await api_keys.create_api_key(user_id=user_id, **input_data)
row: ORMApiKey = await api_keys.create_api_key(user_id=user.id, **input_data)

is_internal = api_key_is_internal(
api_key_data.domains, user_id=None, origin=origin, referrer=referrer
Expand Down Expand Up @@ -124,13 +122,13 @@ async def get_api_key(
User must own API Key or must be Admin to see details.
"""
user_id, role = user["id"], user["role"]

try:
row: ORMApiKey = await api_keys.get_api_key(api_key)
except RecordNotFoundError:
raise HTTPException(status_code=404, detail="The API Key does not exist.")

if role != "ADMIN" and row.user_id != user_id:
if user.role != "ADMIN" and row.user_id != user.id:
raise HTTPException(
status_code=403, detail="API Key is not associated with current user."
)
Expand All @@ -148,8 +146,7 @@ async def get_api_keys(
Default keys are valid for one year
"""
user_id = user["id"]
rows: List[ORMApiKey] = await api_keys.get_api_keys_from_user(user_id)
rows: List[ORMApiKey] = await api_keys.get_api_keys_from_user(user.id)
data = [ApiKey.from_orm(row) for row in rows]

return ApiKeysResponse(data=data)
Expand Down Expand Up @@ -191,7 +188,6 @@ async def delete_api_key(
API Key must belong to user.
"""
user_id = user["id"]
try:
row: ORMApiKey = await api_keys.get_api_key(api_key)
except RecordNotFoundError:
Expand All @@ -200,7 +196,7 @@ async def delete_api_key(
)

# TODO: we might want to allow admins to delete api keys of other users?
if not row.user_id == user_id:
if not row.user_id == user.id:
raise HTTPException(
status_code=403,
detail="The requested API key does not belong to the current user.",
Expand Down
20 changes: 18 additions & 2 deletions tests_v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,27 @@ def submit_batch_job(self, *args, **kwargs) -> uuid.UUID:


async def get_user_mocked() -> Tuple[str, str]:
return "userid_123", "USER"
return User(
id="userid_123",
name="Ms. User",
email="[email protected]",
createdAt="2021-06-13T03:18:23.000Z",
role="USER",
applications=[],
extraUserData={},
)


async def get_admin_mocked() -> Tuple[str, str]:
return "adminid_123", "ADMIN"
return User(
id="adminid_123",
name="Sir Admin",
email="[email protected]",
createdAt="2021-06-13T03:18:23.000Z",
role="ADMIN",
applications=[],
extraUserData={},
)


async def get_manager_mocked() -> User:
Expand Down

0 comments on commit ce39e29

Please sign in to comment.