Logo Search packages:      
Sourcecode: zope-pas version File versions  Download package

DomainAuthHelper.py

##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" DomainAuthHelper   Authentication Plugin for Domain authentication
"""

__doc__     = """ Authentication Plugin for Domain authentication """
__version__ = '$Revision: 69829 $'[11:-2]

# General Python imports
import socket, os, time, copy, re

# General Zope imports
from BTrees.OOBTree import OOBTree
from Globals import InitializeClass
from AccessControl import ClassSecurityInfo
from AccessControl.Permissions import manage_users
from Products.PageTemplates.PageTemplateFile import PageTemplateFile

# PluggableAuthService imports
from Products.PluggableAuthService.interfaces.plugins import \
    IAuthenticationPlugin
from Products.PluggableAuthService.interfaces.plugins import \
    IExtractionPlugin
from Products.PluggableAuthService.interfaces.plugins import \
    IRolesPlugin
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.utils import Interface

00042 class IDomainAuthHelper(Interface):
    """ Marker interface.
    """

_MATCH_EQUALS = 'equals'
_MATCH_ENDSWITH = 'endswith'
_MATCH_REGEX = 'regex'

manage_addDomainAuthHelperForm = PageTemplateFile(
    'www/daAdd', globals(), __name__='manage_addDomainAuthHelperForm' )

def manage_addDomainAuthHelper(self, id, title='', REQUEST=None):
    """ Factory method to instantiate a DomainAuthHelper """
    obj = DomainAuthHelper(id, title=title)
    self._setObject(id, obj)

    if REQUEST is not None:
        qs = 'manage_tabs_message=DomainAuthHelper+added.'
        my_url = self.absolute_url()
        REQUEST['RESPONSE'].redirect('%s/manage_workspace?%s' % (my_url, qs))


00064 class DomainAuthHelper(BasePlugin):
    """ Domain Authentication plugin for the PluggableAuthService """
    security = ClassSecurityInfo()
    meta_type = 'Domain Authentication Plugin'

    security.declareProtected(manage_users, 'manage_map')
    manage_map = PageTemplateFile('www/daMatches', globals())

    security.declareProtected(manage_users, 'manage_genericmap')
    manage_genericmap = PageTemplateFile('www/daGeneric', globals())

    manage_options = ( BasePlugin.manage_options[:1]
                     + ( { 'label'  : 'User Map'
                         , 'action' : 'manage_map'
                       # , 'help'   : ( 'PluggableAuthService'
                       #              ,'matches.stx')
                         }
                       , { 'label'  : 'Generic Map'
                         , 'action' : 'manage_genericmap'
                         }
                       )
                     + BasePlugin.manage_options[1:]
                     )


00089     def __init__(self, id, title=''):
        """ Initialize a new instance """
        self.id = id
        self.title = title
        self._domain_map = OOBTree()


    security.declarePrivate('extractCredentials')
00097     def extractCredentials(self, request):
        """ Extract credentials from 'request'.
        """
        creds = {}

        remote_host = request.get('REMOTE_HOST', '')
        if remote_host:
            creds['remote_host'] = request.get('REMOTE_HOST', '')

            try:
                creds['remote_address'] = request.getClientAddr()
            except AttributeError:
                creds['remote_address'] = request.get('REMOTE_ADDR', '')

        return creds

    security.declarePrivate('authenticateCredentials')
00114     def authenticateCredentials(self, credentials):
        """ Fulfill AuthenticationPlugin requirements """
        login = credentials.get('login', '')
        r_host = credentials.get('remote_host', '')
        r_address = credentials.get('remote_address', '')
        matches = self._findMatches(login, r_host, r_address)

        if len(matches) > 0:
            if login:
                return (login, login)
            else:
                best_match = matches[0]
                u_name = best_match.get('username', 'remote')
                return ( best_match.get('user_id', u_name)
                       , u_name
                       )

        return (None, None)


    security.declarePrivate( 'getRolesForPrincipal' )
00135     def getRolesForPrincipal(self, user, request=None):
        """ Fulfill RolesPlugin requirements """
        roles = []

        if request is None:
            # Without request there is no way I can do anything...
            return tuple(roles)

        uname = user.getUserName()

        if uname.find('Remote User') != -1:
            uname = ''

        matches = self._findMatches( uname
                                   , request.get('REMOTE_HOST', '')
                                   , request.getClientAddr()
                                   )

        # We want to grab the first match because it is the most specific
        if len(matches) > 0:
            roles = matches[0].get('roles', [])

        return tuple(roles)


    security.declarePrivate('_findMatches')
00161     def _findMatches(self, login, r_host='', r_address=''):
        """ Find the match """
        matches = []

        if not r_host and not r_address:
            return tuple(matches)
        
        all_info = list(self._domain_map.get(login, []))
        all_info.extend(self._domain_map.get(''))
        
        if not r_host:
            try:
                r_host = socket.gethostbyaddr(r_address)[0]
            except socket.herror: 
                pass

        if not r_address:
            try:
                r_address = socket.gethostbyname(r_host)
            except socket.herror :
                pass

        if not r_host and not r_address:
            return tuple(matches)

        candidates = [r_host, r_address]

        for match_info in all_info:
            m = []
            m_type = match_info['match_type']
            m_real = match_info['match_real']

            if m_type == _MATCH_EQUALS:
                m = [match_info for x in candidates if x == m_real]
            elif m_type == _MATCH_ENDSWITH:
                m = [match_info for x in candidates if x.endswith(m_real)]
            elif m_type == _MATCH_REGEX:
                m = [match_info for x in candidates if m_real.search(x)]

            matches.extend(m)

        return tuple(matches)


    security.declareProtected(manage_users, 'listMatchTypes')
00206     def listMatchTypes(self):
        """ Return a sequence of possible match types """
        return (_MATCH_EQUALS, _MATCH_ENDSWITH, _MATCH_REGEX)


    security.declareProtected(manage_users, 'listMappingsForUser')
00212     def listMappingsForUser(self, user_id=''):
        """ List the mappings for a specific user """
        result = []
        record = self._domain_map.get(user_id, [])

        for match_info in record:
            result.append( { 'match_type' : match_info['match_type']
                           , 'match_string' : match_info['match_string']
                           , 'match_id' : match_info['match_id']
                           , 'roles' : match_info['roles']
                           , 'username' : match_info['username']
                           } )
        
        return result


    security.declareProtected(manage_users, 'manage_addMapping')
00229     def manage_addMapping( self
                         , user_id=''
                         , match_type=''
                         , match_string=''
                         , username=''
                         , roles=[]
                         , REQUEST=None
                         ):
        """ Add a mapping for a user """
        msg = ''

        if match_type not in (_MATCH_EQUALS, _MATCH_ENDSWITH, _MATCH_REGEX):
            msg = 'Unknown match type %s' % match_type

        if not match_string:
            msg = 'No match string specified'

        if match_type == _MATCH_REGEX:
            try:
                re.compile(match_string, re.IGNORECASE)
            except re.error:
                msg = 'Invalid regular expression %s' % match_string

        if msg:
            if REQUEST is not None:
                return self.manage_map(manage_tabs_message=msg)

            raise ValueError, msg

        record = self._domain_map.get(user_id, [])
        if match_type == _MATCH_REGEX:
            real_match = re.compile(match_string, re.IGNORECASE)
        else:
            real_match = match_string

        match = { 'match_type' : match_type
                , 'match_string' : match_string
                , 'match_real' : real_match
                , 'match_id' : '%s_%s' % (user_id, str(time.time()))
                , 'username' : user_id or username or 'Remote User'
                , 'roles' : roles
                }

        if match not in record:
            record.append(match)
        else:
            msg = 'Match already exists'

        self._domain_map[user_id] = record

        if REQUEST is not None:
            msg = msg or 'Match added.'
            if user_id:
                return self.manage_map(manage_tabs_message=msg)
            else:
                return self.manage_genericmap(manage_tabs_message=msg)


    security.declareProtected(manage_users, 'manage_removeMappings')
00288     def manage_removeMappings(self, user_id='', match_ids=[], REQUEST=None):
        """ Remove mappings """
        msg = ''

        if len(match_ids) < 1:
            msg = 'No matches specified'

        record = self._domain_map.get(user_id, [])

        if len(record) < 1:
            msg = 'No mappings for user %s' % user_id

        if msg:
            if REQUEST is not None:
                return self.manage_map(manage_tabs_message=msg)
            else:
                return

        to_delete = [x for x in record if x['match_id'] in match_ids]

        for match in to_delete:
            record.remove(match)

        self._domain_map[user_id] = record

        if REQUEST is not None:
            msg = 'Matches deleted'
            if user_id:
                return self.manage_map(manage_tabs_message=msg)
            else:
                return self.manage_genericmap(manage_tabs_message=msg)

classImplements( DomainAuthHelper
               , IDomainAuthHelper
               , IExtractionPlugin
               , IAuthenticationPlugin
               , IRolesPlugin
               )

InitializeClass(DomainAuthHelper)


Generated by  Doxygen 1.6.0   Back to index