118118 upload_synapse_s3 ,
119119)
120120from synapseclient .core .dozer import doze
121-
121+ from typing import Union
122122
123123PRODUCTION_ENDPOINTS = {
124124 "repoEndpoint" : "https://repo-prod.prod.sagebase.org/repo/v1" ,
@@ -613,7 +613,12 @@ def invalidateAPIKey(self):
613613 self .restDELETE ("/secretKey" , endpoint = self .authEndpoint )
614614
615615 @memoize
616- def getUserProfile (self , id = None , sessionToken = None , refresh = False ):
616+ def getUserProfile (
617+ self ,
618+ id : Union [str , int , UserProfile , TeamMember ] = None ,
619+ sessionToken : str = None ,
620+ refresh : bool = False ,
621+ ) -> UserProfile :
617622 """
618623 Get the details about a Synapse user.
619624 Retrieves information on the current user if 'id' is omitted.
@@ -688,6 +693,22 @@ def _get_certified_passing_record(self, userid: int) -> dict:
688693 response = self .restGET (f"/user/{ userid } /certifiedUserPassingRecord" )
689694 return response
690695
696+ def _get_user_bundle (self , userid : int , mask : int ) -> dict :
697+ """Retrieve the user bundle for the given user.
698+
699+ :params userid: Synapse user Id
700+ :params mask: Bit field indicating which components to include in the bundle.
701+
702+ :returns: Synapse User Bundle
703+ https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/UserBundle.html
704+ """
705+ try :
706+ response = self .restGET (f"/user/{ userid } /bundle?mask={ mask } " )
707+ except SynapseHTTPError as ex :
708+ if ex .response .status_code == 404 :
709+ return None
710+ return response
711+
691712 def is_certified (self , user : typing .Union [str , int ]) -> bool :
692713 """Determines whether a Synapse user is a certified user.
693714
@@ -2056,7 +2077,7 @@ def _storeACL(self, entity, acl):
20562077 else :
20572078 return self .restPOST (uri , json .dumps (acl ))
20582079
2059- def _getUserbyPrincipalIdOrName (self , principalId = None ):
2080+ def _getUserbyPrincipalIdOrName (self , principalId : str = None ):
20602081 """
20612082 Given either a string, int or None finds the corresponding user where None implies PUBLIC
20622083
@@ -2087,7 +2108,11 @@ def _getUserbyPrincipalIdOrName(self, principalId=None):
20872108 "Unknown Synapse user (%s). %s." % (principalId , supplementalMessage )
20882109 )
20892110
2090- def getPermissions (self , entity , principalId = None ):
2111+ def getPermissions (
2112+ self ,
2113+ entity : Union [Entity , Evaluation , str , collections .abc .Mapping ],
2114+ principalId : str = None ,
2115+ ):
20912116 """Get the permissions that a user or group has on an Entity.
20922117
20932118 :param entity: An Entity or Synapse ID to lookup
@@ -2098,15 +2123,36 @@ def getPermissions(self, entity, principalId=None):
20982123 or an empty array
20992124
21002125 """
2101- # TODO: what if user has permissions by membership in a group?
2102- principalId = self ._getUserbyPrincipalIdOrName (principalId )
2126+ principal_id = self ._getUserbyPrincipalIdOrName (principalId )
21032127 acl = self ._getACL (entity )
2128+
2129+ team_list = self ._find_teams_for_principal (principal_id )
2130+ team_ids = [int (team .id ) for team in team_list ]
2131+ effective_permission_set = set ()
2132+
2133+ # This user_profile_bundle is being used to verify that the principal_id is a registered user of the system
2134+ user_profile_bundle = self ._get_user_bundle (principal_id , 1 )
2135+
2136+ # Loop over all permissions in the returned ACL and add it to the effective_permission_set
2137+ # if the principalId in the ACL matches
2138+ # 1) the one we are looking for,
2139+ # 2) a team the entity is a member of,
2140+ # 3) PUBLIC
2141+ # 4) A user_profile_bundle exists for the principal_id
21042142 for permissions in acl ["resourceAccess" ]:
2105- if "principalId" in permissions and permissions ["principalId" ] == int (
2106- principalId
2143+ if "principalId" in permissions and (
2144+ permissions ["principalId" ] == principal_id
2145+ or permissions ["principalId" ] in team_ids
2146+ or permissions ["principalId" ] == PUBLIC
2147+ or (
2148+ permissions ["principalId" ] == AUTHENTICATED_USERS
2149+ and user_profile_bundle is not None
2150+ )
21072151 ):
2108- return permissions ["accessType" ]
2109- return []
2152+ effective_permission_set = effective_permission_set .union (
2153+ permissions ["accessType" ]
2154+ )
2155+ return list (effective_permission_set )
21102156
21112157 def setPermissions (
21122158 self ,
@@ -3104,6 +3150,18 @@ def _findTeam(self, name):
31043150 for result in self ._GET_paginated ("/teams?fragment=%s" % name ):
31053151 yield Team (** result )
31063152
3153+ def _find_teams_for_principal (self , principal_id : str ) -> typing .Iterator [Team ]:
3154+ """
3155+ Retrieve a list of teams for the matching principal ID. If the principalId that is passed in is a team itself,
3156+ or not found, this will return a generator that yields no results.
3157+
3158+ :param principal_id: Identifier of a user or group.
3159+
3160+ :return: A generator that yields objects of type :py:class:`synapseclient.team.Team`
3161+ """
3162+ for result in self ._GET_paginated (f"/user/{ principal_id } /team" ):
3163+ yield Team (** result )
3164+
31073165 def getTeam (self , id ):
31083166 """
31093167 Finds a team with a given ID or name.
0 commit comments