Logo Search packages:      
Sourcecode: zope-pas version File versions  Download package

ZODBGroupManager.py

##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: ZODBGroupManager

$Id: ZODBGroupManager.py 65456 2006-02-25 18:51:23Z jens $
"""
from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from Globals import InitializeClass
from BTrees.OOBTree import OOBTree
from Products.PageTemplates.PageTemplateFile import PageTemplateFile

from Products.PluggableAuthService.interfaces.plugins \
    import IGroupEnumerationPlugin
from Products.PluggableAuthService.interfaces.plugins \
    import IGroupsPlugin

from Products.PluggableAuthService.permissions import ManageGroups
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.utils import Interface

00035 class IZODBGroupManager(Interface):
    """ Marker interface.
    """

manage_addZODBGroupManagerForm = PageTemplateFile(
    'www/zgAdd', globals(), __name__='manage_addZODBGroupManagerForm' )

def addZODBGroupManager( dispatcher, id, title=None, REQUEST=None ):
    """ Add a ZODBGroupManager to a Pluggable Auth Service. """

    zgm = ZODBGroupManager(id, title)
    dispatcher._setObject(zgm.getId(), zgm)

    if REQUEST is not None:
        REQUEST['RESPONSE'].redirect(
                                '%s/manage_workspace'
                                '?manage_tabs_message='
                                'ZODBGroupManager+added.'
                            % dispatcher.absolute_url())

00055 class ZODBGroupManager( BasePlugin ):

    """ PAS plugin for managing groups, and groups of groups in the ZODB
    """
    meta_type = 'ZODB Group Manager'

    security = ClassSecurityInfo()

    def __init__(self, id, title=None):

        self._id = self.id = id
        self.title = title
        self._groups = OOBTree()
        self._principal_groups = OOBTree()

    #
    #   IGroupEnumerationPlugin implementation
    #
    security.declarePrivate( 'enumerateGroups' )
00074     def enumerateGroups( self
                        , id=None
                        , title=None
                        , exact_match=False
                        , sort_by=None
                        , max_results=None
                        , **kw
                        ):

        """ See IGroupEnumerationPlugin.
        """
        group_info = []
        group_ids = []
        plugin_id = self.getId()

        if isinstance( id, str ):
            id = [ id ]

        if isinstance( title, str ):
            title = [ title ]

        if exact_match and ( id or title ):

            if id:
                group_ids.extend( id )
            elif title:
                group_ids.extend( title )

        if group_ids:
            group_filter = None

        else:   # Searching
            group_ids = self.listGroupIds()
            group_filter = _ZODBGroupFilter( id, title, **kw )

        for group_id in group_ids:

            if self._groups.get( group_id, None ):
                e_url = '%s/manage_groups' % self.getId()
                p_qs = 'group_id=%s' % group_id
                m_qs = 'group_id=%s&assign=1' % group_id

                info = {}
                info.update( self._groups[ group_id ] )

                info[ 'pluginid' ] = plugin_id
                info[ 'properties_url' ] = '%s?%s' % ( e_url, p_qs )
                info[ 'members_url' ] = '%s?%s' % ( e_url, m_qs )

                info[ 'id' ] = '%s%s' % (self.prefix, info['id'])

                if not group_filter or group_filter( info ):
                    group_info.append( info )

        return tuple( group_info )

    #
    #   IGroupsPlugin implementation
    #
    security.declarePrivate( 'getGroupsForPrincipal' )
00134     def getGroupsForPrincipal( self, principal, request=None ):

        """ See IGroupsPlugin.
        """
        unadorned = self._principal_groups.get( principal.getId(), () )
        return tuple(['%s%s' % (self.prefix, x) for x in unadorned])

    #
    #   (notional)IZODBGroupManager interface
    #
    security.declareProtected( ManageGroups, 'listGroupIds' )
00145     def listGroupIds( self ):

        """ -> ( group_id_1, ... group_id_n )
        """
        return self._groups.keys()

    security.declareProtected( ManageGroups, 'listGroupInfo' )
00152     def listGroupInfo( self ):

        """ -> ( {}, ...{} )

        o Return one mapping per group, with the following keys:

          - 'id' 
        """
        return self._groups.values()

    security.declareProtected( ManageGroups, 'getGroupInfo' )
00163     def getGroupInfo( self, group_id ):

        """ group_id -> {}
        """
        return self._groups[ group_id ]

    security.declarePrivate( 'addGroup' )
00170     def addGroup( self, group_id, title=None, description=None ):

        """ Add 'group_id' to the list of groups managed by this object.

        o Raise KeyError on duplicate.
        """
        if self._groups.get( group_id ) is not None:
            raise KeyError, 'Duplicate group ID: %s' % group_id

        self._groups[ group_id ] = { 'id' : group_id
                                   , 'title' : title
                                   , 'description' : description
                                   }

    security.declarePrivate( 'updateGroup' )
00185     def updateGroup( self, group_id, title, description ):

        """ Update properties for 'group_id'

        o Raise KeyError if group_id doesn't already exist.
        """
        self._groups[ group_id ].update({ 'title' : title
                                        , 'description' : description
                                        })
        self._groups[ group_id ] = self._groups[ group_id ]

    security.declarePrivate( 'removeGroup' )
00197     def removeGroup( self, group_id ):

        """ Remove 'role_id' from the list of roles managed by this
            object, removing assigned members from it before doing so.

        o Raise KeyError if 'group_id' doesn't already exist.
        """
        for principal_id in self._principal_groups.keys():
            self.removePrincipalFromGroup( principal_id, group_id )
        del self._groups[ group_id ]

    #
    #   Group assignment API
    #
    security.declareProtected( ManageGroups, 'listAvailablePrincipals' )
00212     def listAvailablePrincipals( self, group_id, search_id ):

        """ Return a list of principal IDs to that can belong to the group.

        o If supplied, 'search_id' constrains the principal IDs;  if not,
          return empty list.

        o Omit principals with existing assignments.
        """
        result = []

        if search_id:  # don't bother searching if no criteria

            parent = aq_parent( self )

            for info in parent.searchPrincipals( max_results=20
                                               , sort_by='id'
                                               , id=search_id
                                               , exact_match=False
                                               ):
                id = info[ 'id' ]
                title = info.get( 'title', id )
                if ( group_id not in self._principal_groups.get( id, () )
                 and group_id != id ):
                    result.append( ( id, title ) )

        return result

    security.declareProtected( ManageGroups, 'listAssignedPrincipals' )
00241     def listAssignedPrincipals( self, group_id ):

        """ Return a list of principal IDs belonging to a group.
        """
        result = []

        for k, v in self._principal_groups.items():
            if group_id in v:
                # should be one and only one mapping to 'k'

                parent = aq_parent( self )
                info = parent.searchPrincipals( id=k, exact_match=True )
                assert( len( info ) in ( 0, 1 ) )
                if len( info ) == 0:
                    title = '<%s: not found>' % k
                else:
                    title = info[0].get( 'title', k )
                result.append( ( k, title ) )

        return result

    security.declareProtected( ManageGroups, 'addPrincipalToGroup' )
00263     def addPrincipalToGroup( self, principal_id, group_id ):

        """ Add a principal to a group.

        o Return a boolean indicating whether a new assignment was created.

        o Raise KeyError if 'group_id' is unknown.
        """
        group_info = self._groups[ group_id ] # raise KeyError if unknown!

        current = self._principal_groups.get( principal_id, () )
        already = group_id in current

        if not already:
            new = current + ( group_id, )
            self._principal_groups[ principal_id ] = new

        return not already

    security.declareProtected( ManageGroups, 'removePrincipalFromGroup' )
00283     def removePrincipalFromGroup( self, principal_id, group_id ):

        """ Remove a prinicpal from from a group.

        o Return a boolean indicating whether the principal was already 
          a member of the group.

        o Raise KeyError if 'group_id' is unknown.

        o Ignore requests to remove a principal if not already a member
          of the group.
        """
        group_info = self._groups[ group_id ] # raise KeyError if unknown!

        current = self._principal_groups.get( principal_id, () )
        new = tuple( [ x for x in current if x != group_id ] )
        already = current != new

        if already:
            self._principal_groups[ principal_id ] = new

        return already

    #
    #   ZMI
    #
    manage_options = ( ( { 'label': 'Groups', 
                           'action': 'manage_groups', }
                         ,
                       )
                     + BasePlugin.manage_options
                     )

    security.declarePublic( 'manage_widgets' )
    manage_widgets = PageTemplateFile( 'www/zuWidgets'
                                     , globals()
                                     , __name__='manage_widgets'
                                     )

    security.declareProtected( ManageGroups, 'manage_groups' )
    manage_groups = PageTemplateFile( 'www/zgGroups'
                                    , globals()
                                    , __name__='manage_groups'
                                    )

    security.declareProtected( ManageGroups, 'manage_twoLists' )
    manage_twoLists = PageTemplateFile( '../www/two_lists'
                                      , globals()
                                      , __name__='manage_twoLists'
                                      )

    security.declareProtected( ManageGroups, 'manage_addGroup' )
00335     def manage_addGroup( self
                       , group_id
                       , title=None
                       , description=None
                       , RESPONSE=None
                       ):
        """ Add a group via the ZMI.
        """
        self.addGroup( group_id, title, description )

        message = 'Group+added'

        if RESPONSE is not None:
            RESPONSE.redirect( '%s/manage_groups?manage_tabs_message=%s'
                             % ( self.absolute_url(), message )
                             )

    security.declareProtected( ManageGroups, 'manage_updateGroup' )
00353     def manage_updateGroup( self
                          , group_id
                          , title
                          , description
                          , RESPONSE=None
                          ):
        """ Update a group via the ZMI.
        """
        self.updateGroup( group_id, title, description )

        message = 'Group+updated'

        if RESPONSE is not None:
            RESPONSE.redirect( '%s/manage_groups?manage_tabs_message=%s'
                             % ( self.absolute_url(), message )
                             )

    security.declareProtected( ManageGroups, 'manage_removeGroups' )
00371     def manage_removeGroups( self
                           , group_ids
                           , RESPONSE=None
                           ):
        """ Remove one or more groups via the ZMI.
        """
        group_ids = filter( None, group_ids )

        if not group_ids:
            message = 'no+groups+selected'

        else:

            for group_id in group_ids:
                self.removeGroup( group_id )

            message = 'Groups+removed'

        if RESPONSE is not None:
            RESPONSE.redirect( '%s/manage_groups?manage_tabs_message=%s'
                             % ( self.absolute_url(), message )
                             )

    security.declareProtected( ManageGroups, 'manage_addPrincipalsToGroup' )
00395     def manage_addPrincipalsToGroup( self
                                   , group_id
                                   , principal_ids
                                   , RESPONSE=None
                                   ):
        """ Add one or more principals to a group via the ZMI.
        """
        assigned = []

        for principal_id in principal_ids:
            if self.addPrincipalToGroup( principal_id, group_id ):
                assigned.append( principal_id )

        if not assigned:
            message = 'Principals+already+members+of+%s' % group_id
        else:
            message = '%s+added+to+%s' % ( '+'.join( assigned )
                                         , group_id
                                         )

        if RESPONSE is not None:
            RESPONSE.redirect( ( '%s/manage_groups?group_id=%s&assign=1'
                               + '&manage_tabs_message=%s'
                               ) % ( self.absolute_url(), group_id, message )
                             )

    security.declareProtected( ManageGroups
                             , 'manage_removePrincipalsFromGroup' 
                             )
00424     def manage_removePrincipalsFromGroup( self
                                        , group_id
                                        , principal_ids
                                        , RESPONSE=None
                                        ):
        """ Remove one or more principals from a group via the ZMI.
        """
        removed = []

        for principal_id in principal_ids:
            if self.removePrincipalFromGroup( principal_id, group_id ):
                removed.append( principal_id )

        if not removed:
            message = 'Principals+not+in+group+%s' % group_id
        else:
            message = 'Principals+%s+removed+from+%s' % ( '+'.join( removed )
                                                        , group_id
                                                        )

        if RESPONSE is not None:
            RESPONSE.redirect( ( '%s/manage_groups?group_id=%s&assign=1'
                               + '&manage_tabs_message=%s'
                               ) % ( self.absolute_url(), group_id, message )
                             )

classImplements( ZODBGroupManager
               , IZODBGroupManager
               , IGroupEnumerationPlugin
               , IGroupsPlugin
               )

InitializeClass( ZODBGroupManager )

class _ZODBGroupFilter:

    def __init__( self
                , id=None
                , title=None
                , **kw
                ):

        self._filter_ids = id
        self._filter_titles = title

    def __call__( self, group_info ):

        if self._filter_ids:

            key = 'id'
            to_test = self._filter_ids

        elif self._filter_titles:

            key = 'title'
            to_test = self._filter_titles

        else:
            return 1 # TODO:  try using 'kw'

        value = group_info.get( key )

        if not value:
            return 0

        for contained in to_test:
            if value.lower().find( contained.lower() ) >= 0:
                return 1

        return 0

Generated by  Doxygen 1.6.0   Back to index