diff --git a/src/workos/authorization.py b/src/workos/authorization.py index 6e12f035..22ab0744 100644 --- a/src/workos/authorization.py +++ b/src/workos/authorization.py @@ -1,13 +1,16 @@ -from typing import Any, Dict, Optional, Protocol, Sequence +from typing import Any, Dict, Optional, Protocol, Sequence, Union from pydantic import TypeAdapter +from typing_extensions import TypedDict +from workos.types.authorization.access_evaluation import AccessEvaluation from workos.types.authorization.environment_role import ( EnvironmentRole, EnvironmentRoleList, ) from workos.types.authorization.organization_role import OrganizationRole from workos.types.authorization.permission import Permission +from workos.types.authorization.resource import Resource from workos.types.authorization.role import Role, RoleList from workos.types.list_resource import ( ListArgs, @@ -28,6 +31,31 @@ ) AUTHORIZATION_PERMISSIONS_PATH = "authorization/permissions" +AUTHORIZATION_RESOURCES_PATH = "authorization/resources" + + +class ParentResourceById(TypedDict): + parent_resource_id: str + + +class ParentResourceByExternalId(TypedDict): + parent_resource_external_id: str + parent_resource_type_slug: str + + +ParentResource = Union[ParentResourceById, ParentResourceByExternalId] + + +class CheckResourceById(TypedDict): + resource_id: str + + +class CheckResourceByExternalId(TypedDict): + resource_type: str + external_id: str + + +CheckResource = Union[CheckResourceById, CheckResourceByExternalId] _role_adapter: TypeAdapter[Role] = TypeAdapter(Role) @@ -161,6 +189,46 @@ def add_environment_role_permission( permission_slug: str, ) -> SyncOrAsync[EnvironmentRole]: ... + # Resources + + def get_resource(self, resource_id: str) -> SyncOrAsync[Resource]: ... + + def create_resource( + self, + *, + resource_type_slug: str, + organization_id: str, + external_id: str, + name: str, + parent: ParentResource, + description: Optional[str] = None, + ) -> SyncOrAsync[Resource]: ... + + def update_resource( + self, + resource_id: str, + *, + name: Optional[str] = None, + description: Optional[str] = None, + ) -> SyncOrAsync[Resource]: ... + + def delete_resource( + self, + resource_id: str, + *, + cascade_delete: Optional[bool] = None, + ) -> SyncOrAsync[None]: ... + + # Access Evaluation + + def check( + self, + organization_membership_id: str, + *, + resource: CheckResource, + relation: str, + ) -> SyncOrAsync[AccessEvaluation]: ... + class Authorization(AuthorizationModule): _http_client: SyncHTTPClient @@ -437,6 +505,104 @@ def add_environment_role_permission( return EnvironmentRole.model_validate(response) + # Resources + + def get_resource(self, resource_id: str) -> Resource: + response = self._http_client.request( + f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}", + method=REQUEST_METHOD_GET, + ) + + return Resource.model_validate(response) + + def create_resource( + self, + *, + resource_type_slug: str, + organization_id: str, + external_id: str, + name: str, + parent: ParentResource, + description: Optional[str] = None, + ) -> Resource: + json: Dict[str, Any] = { + "resource_type_slug": resource_type_slug, + "organization_id": organization_id, + "external_id": external_id, + "name": name, + **parent, + } + if description is not None: + json["description"] = description + + response = self._http_client.request( + AUTHORIZATION_RESOURCES_PATH, + method=REQUEST_METHOD_POST, + json=json, + ) + + return Resource.model_validate(response) + + def update_resource( + self, + resource_id: str, + *, + name: Optional[str] = None, + description: Optional[str] = None, + ) -> Resource: + json: Dict[str, Any] = {} + if name is not None: + json["name"] = name + if description is not None: + json["description"] = description + + response = self._http_client.request( + f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}", + method=REQUEST_METHOD_PATCH, + json=json, + ) + + return Resource.model_validate(response) + + def delete_resource( + self, + resource_id: str, + *, + cascade_delete: Optional[bool] = None, + ) -> None: + if cascade_delete is not None: + self._http_client.delete_with_body( + f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}", + json={"cascade_delete": cascade_delete}, + ) + else: + self._http_client.request( + f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}", + method=REQUEST_METHOD_DELETE, + ) + + # Access Evaluation + + def check( + self, + organization_membership_id: str, + *, + resource: CheckResource, + relation: str, + ) -> AccessEvaluation: + json: Dict[str, Any] = { + "resource": resource, + "relation": relation, + } + + response = self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/check", + method=REQUEST_METHOD_POST, + json=json, + ) + + return AccessEvaluation.model_validate(response) + class AsyncAuthorization(AuthorizationModule): _http_client: AsyncHTTPClient @@ -712,3 +878,101 @@ async def add_environment_role_permission( ) return EnvironmentRole.model_validate(response) + + # Resources + + async def get_resource(self, resource_id: str) -> Resource: + response = await self._http_client.request( + f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}", + method=REQUEST_METHOD_GET, + ) + + return Resource.model_validate(response) + + async def create_resource( + self, + *, + resource_type_slug: str, + organization_id: str, + external_id: str, + name: str, + parent: ParentResource, + description: Optional[str] = None, + ) -> Resource: + json: Dict[str, Any] = { + "resource_type_slug": resource_type_slug, + "organization_id": organization_id, + "external_id": external_id, + "name": name, + **parent, + } + if description is not None: + json["description"] = description + + response = await self._http_client.request( + AUTHORIZATION_RESOURCES_PATH, + method=REQUEST_METHOD_POST, + json=json, + ) + + return Resource.model_validate(response) + + async def update_resource( + self, + resource_id: str, + *, + name: Optional[str] = None, + description: Optional[str] = None, + ) -> Resource: + json: Dict[str, Any] = {} + if name is not None: + json["name"] = name + if description is not None: + json["description"] = description + + response = await self._http_client.request( + f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}", + method=REQUEST_METHOD_PATCH, + json=json, + ) + + return Resource.model_validate(response) + + async def delete_resource( + self, + resource_id: str, + *, + cascade_delete: Optional[bool] = None, + ) -> None: + if cascade_delete is not None: + await self._http_client.delete_with_body( + f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}", + json={"cascade_delete": cascade_delete}, + ) + else: + await self._http_client.request( + f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}", + method=REQUEST_METHOD_DELETE, + ) + + # Access Evaluation + + async def check( + self, + organization_membership_id: str, + *, + resource: CheckResource, + relation: str, + ) -> AccessEvaluation: + json: Dict[str, Any] = { + "resource": resource, + "relation": relation, + } + + response = await self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/check", + method=REQUEST_METHOD_POST, + json=json, + ) + + return AccessEvaluation.model_validate(response) diff --git a/src/workos/types/authorization/resource.py b/src/workos/types/authorization/resource.py index 917673c4..e699292b 100644 --- a/src/workos/types/authorization/resource.py +++ b/src/workos/types/authorization/resource.py @@ -1,4 +1,4 @@ -from typing import Literal, Optional +from typing import Any, Literal, Mapping, Optional from workos.types.workos_model import WorkOSModel diff --git a/tests/test_authorization_check.py b/tests/test_authorization_check.py new file mode 100644 index 00000000..8c37198b --- /dev/null +++ b/tests/test_authorization_check.py @@ -0,0 +1,110 @@ +from typing import Union + +import pytest +from tests.utils.syncify import syncify +from workos.authorization import AsyncAuthorization, Authorization + + +@pytest.mark.sync_and_async(Authorization, AsyncAuthorization) +class TestAuthorizationCheck: + @pytest.fixture(autouse=True) + def setup(self, module_instance: Union[Authorization, AsyncAuthorization]): + self.http_client = module_instance._http_client + self.authorization = module_instance + + # --- check: authorized result --- + + def test_check_authorized(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, {"authorized": True}, 200 + ) + + result = syncify( + self.authorization.check( + "om_01ABC123", + resource={"resource_id": "res_01ABC"}, + relation="viewer", + ) + ) + + assert result.authorized is True + assert request_kwargs["method"] == "post" + + # --- check: unauthorized result --- + + def test_check_unauthorized(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, {"authorized": False}, 200 + ) + + result = syncify( + self.authorization.check( + "om_01ABC123", + resource={"resource_id": "res_01ABC"}, + relation="editor", + ) + ) + + assert result.authorized is False + assert request_kwargs["method"] == "post" + + # --- check: resource by internal ID --- + + def test_check_resource_by_internal_id(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, {"authorized": True}, 200 + ) + + syncify( + self.authorization.check( + "om_01ABC123", + resource={"resource_id": "res_01ABC"}, + relation="viewer", + ) + ) + + assert request_kwargs["json"] == { + "resource": {"resource_id": "res_01ABC"}, + "relation": "viewer", + } + + # --- check: resource by external ID --- + + def test_check_resource_by_external_id(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, {"authorized": True}, 200 + ) + + syncify( + self.authorization.check( + "om_01ABC123", + resource={"resource_type": "document", "external_id": "my-doc-456"}, + relation="editor", + ) + ) + + assert request_kwargs["json"] == { + "resource": {"resource_type": "document", "external_id": "my-doc-456"}, + "relation": "editor", + } + + # --- check: URL construction --- + + def test_check_url_contains_org_membership_id( + self, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, {"authorized": True}, 200 + ) + + syncify( + self.authorization.check( + "om_01MEMBERSHIP", + resource={"resource_id": "res_01ABC"}, + relation="viewer", + ) + ) + + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01MEMBERSHIP/check" + ) diff --git a/tests/test_authorization_resource_crud.py b/tests/test_authorization_resource_crud.py new file mode 100644 index 00000000..9a23f587 --- /dev/null +++ b/tests/test_authorization_resource_crud.py @@ -0,0 +1,201 @@ +from typing import Union + +import pytest +from tests.utils.fixtures.mock_resource import MockResource +from tests.utils.syncify import syncify +from workos.authorization import AsyncAuthorization, Authorization + + +@pytest.mark.sync_and_async(Authorization, AsyncAuthorization) +class TestAuthorizationResourceCRUD: + @pytest.fixture(autouse=True) + def setup(self, module_instance: Union[Authorization, AsyncAuthorization]): + self.http_client = module_instance._http_client + self.authorization = module_instance + + @pytest.fixture + def mock_resource(self): + return MockResource(id="res_01ABC").dict() + + # --- get_resource --- + + def test_get_resource(self, mock_resource, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_resource, 200 + ) + + resource = syncify(self.authorization.get_resource("res_01ABC")) + + assert resource.id == "res_01ABC" + assert resource.object == "authorization_resource" + assert request_kwargs["method"] == "get" + assert request_kwargs["url"].endswith("/authorization/resources/res_01ABC") + + # --- create_resource --- + + def test_create_resource_required_fields_only( + self, mock_resource, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_resource, 201 + ) + + resource = syncify( + self.authorization.create_resource( + resource_type_slug="document", + organization_id="org_01EHT88Z8J8795GZNQ4ZP1J81T", + external_id="ext_123", + name="Test Resource", + parent={"parent_resource_id": "res_01PARENT"}, + ) + ) + + assert resource.id == "res_01ABC" + assert request_kwargs["method"] == "post" + assert request_kwargs["url"].endswith("/authorization/resources") + assert request_kwargs["json"] == { + "resource_type_slug": "document", + "organization_id": "org_01EHT88Z8J8795GZNQ4ZP1J81T", + "external_id": "ext_123", + "name": "Test Resource", + "parent_resource_id": "res_01PARENT", + } + + def test_create_resource_with_all_optional_fields( + self, mock_resource, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_resource, 201 + ) + + syncify( + self.authorization.create_resource( + resource_type_slug="document", + organization_id="org_01EHT88Z8J8795GZNQ4ZP1J81T", + external_id="ext_123", + name="Test Resource", + parent={"parent_resource_id": "res_01PARENT"}, + description="A test resource", + ) + ) + + assert request_kwargs["json"] == { + "resource_type_slug": "document", + "organization_id": "org_01EHT88Z8J8795GZNQ4ZP1J81T", + "external_id": "ext_123", + "name": "Test Resource", + "parent_resource_id": "res_01PARENT", + "description": "A test resource", + } + + def test_create_resource_with_parent_by_id( + self, mock_resource, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_resource, 201 + ) + + syncify( + self.authorization.create_resource( + resource_type_slug="document", + organization_id="org_01EHT88Z8J8795GZNQ4ZP1J81T", + external_id="ext_123", + name="Test Resource", + parent={"parent_resource_id": "res_01PARENT"}, + ) + ) + + assert request_kwargs["json"]["parent_resource_id"] == "res_01PARENT" + + def test_create_resource_with_parent_by_external_id( + self, mock_resource, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_resource, 201 + ) + + syncify( + self.authorization.create_resource( + resource_type_slug="document", + organization_id="org_01EHT88Z8J8795GZNQ4ZP1J81T", + external_id="ext_123", + name="Test Resource", + parent={ + "parent_resource_external_id": "ext_parent_456", + "parent_resource_type_slug": "folder", + }, + ) + ) + + assert request_kwargs["json"]["parent_resource_external_id"] == "ext_parent_456" + assert request_kwargs["json"]["parent_resource_type_slug"] == "folder" + + # --- update_resource --- + + def test_update_resource_with_name_and_description( + self, mock_resource, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_resource, 200 + ) + + resource = syncify( + self.authorization.update_resource( + "res_01ABC", + name="Updated Name", + description="Updated description", + ) + ) + + assert resource.id == "res_01ABC" + assert request_kwargs["method"] == "patch" + assert request_kwargs["url"].endswith("/authorization/resources/res_01ABC") + assert request_kwargs["json"] == { + "name": "Updated Name", + "description": "Updated description", + } + + def test_update_resource_without_optional_fields( + self, mock_resource, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_resource, 200 + ) + + syncify(self.authorization.update_resource("res_01ABC")) + + assert request_kwargs["method"] == "patch" + assert request_kwargs["json"] == {} + + # --- delete_resource --- + + def test_delete_resource_without_cascade( + self, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, + status_code=202, + headers={"content-type": "text/plain; charset=utf-8"}, + ) + + response = syncify(self.authorization.delete_resource("res_01ABC")) + + assert response is None + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith("/authorization/resources/res_01ABC") + + def test_delete_resource_with_cascade(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, + status_code=202, + headers={"content-type": "text/plain; charset=utf-8"}, + ) + + response = syncify( + self.authorization.delete_resource("res_01ABC", cascade_delete=True) + ) + + assert response is None + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith("/authorization/resources/res_01ABC") + assert request_kwargs["json"] == {"cascade_delete": True}