Logo Search packages:      
Sourcecode: ldaptor version File versions

test_server.py

00001 """
Test cases for ldaptor.protocols.ldap.ldapserver module.
"""

from twisted.trial import unittest
from twisted.trial.util import deferredResult
from twisted.internet import protocol, address
from twisted.python import components
from ldaptor import inmemory, interfaces, schema
from ldaptor.protocols.ldap import ldapserver, ldapclient, ldaperrors, fetchschema
from ldaptor.protocols import pureldap, pureber
from twisted.test import proto_helpers
from ldaptor.test import util, test_schema

class LDAPServerTest(unittest.TestCase):
    def setUp(self):
        self.root = inmemory.ReadOnlyInMemoryLDAPEntry(
            dn='dc=example,dc=com',
            attributes={ 'dc': 'example',
                         })
        self.stuff = self.root.addChild(
            rdn='ou=stuff',
            attributes={
            'objectClass': ['a', 'b'],
            'ou': ['stuff'],
            })
        self.thingie = self.stuff.addChild(
            rdn='cn=thingie',
            attributes={
            'objectClass': ['a', 'b'],
            'cn': ['thingie'],
            })
        self.another = self.stuff.addChild(
            rdn='cn=another',
            attributes={
            'objectClass': ['a', 'b'],
            'cn': ['another'],
            })
        server = ldapserver.LDAPServer()
        server.factory = self.root
        server.transport = proto_helpers.StringTransport()
        server.connectionMade()
        self.server = server
    
    def test_bind(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(pureldap.LDAPBindRequest(), id=4)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(pureldap.LDAPBindResponse(resultCode=0), id=4)))

    def test_bind_success(self):
        self.thingie['userPassword'] = ['{SSHA}yVLLj62rFf3kDAbzwEU0zYAVvbWrze8='] # "secret"
        self.server.dataReceived(str(pureldap.LDAPMessage(pureldap.LDAPBindRequest(
            dn='cn=thingie,ou=stuff,dc=example,dc=com',
            auth='secret'), id=4)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(resultCode=0,
                                      matchedDN='cn=thingie,ou=stuff,dc=example,dc=com'),
            id=4)))

    def test_bind_invalidCredentials_badPassword(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(dn='cn=thingie,ou=stuff,dc=example,dc=com',
                                     auth='invalid'),
            id=734)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(
            resultCode=ldaperrors.LDAPInvalidCredentials.resultCode),
            id=734)))

    def test_bind_invalidCredentials_nonExisting(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(dn='cn=non-existing,dc=example,dc=com',
                                     auth='invalid'),
            id=78)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(
            resultCode=ldaperrors.LDAPInvalidCredentials.resultCode),
            id=78)))

    def test_bind_badVersion_1_anonymous(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(version=1),
            id=32)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(
            resultCode=ldaperrors.LDAPProtocolError.resultCode,
            errorMessage='Version 1 not supported'),
            id=32)))

    def test_bind_badVersion_2_anonymous(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(version=2),
            id=32)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(
            resultCode=ldaperrors.LDAPProtocolError.resultCode,
            errorMessage='Version 2 not supported'),
            id=32)))

    def test_bind_badVersion_4_anonymous(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(version=4),
            id=32)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(
            resultCode=ldaperrors.LDAPProtocolError.resultCode,
            errorMessage='Version 4 not supported'),
            id=32)))

    def test_bind_badVersion_4_nonExisting(self):
        # TODO make a test just like this one that would pass authentication
        # if version was correct, to ensure we don't leak that info either.
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(version=4,
                                     dn='cn=non-existing,dc=example,dc=com',
                                     auth='invalid'),
            id=11)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(
            resultCode=ldaperrors.LDAPProtocolError.resultCode,
            errorMessage='Version 4 not supported'),
            id=11)))

    def test_unbind(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(pureldap.LDAPUnbindRequest(), id=7)))
        self.assertEquals(self.server.transport.value(),
                          '')

    def test_search_outOfTree(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPSearchRequest(
            baseObject='dc=invalid',
            ), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultDone(resultCode=ldaperrors.LDAPNoSuchObject.resultCode),
            id=2)),
                          )

    def test_search_matchAll_oneResult(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPSearchRequest(
            baseObject='cn=thingie,ou=stuff,dc=example,dc=com',
            ), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='cn=thingie,ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('cn', ['thingie']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultDone(resultCode=0),
            id=2)),
                          )

    def test_search_matchAll_manyResults(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPSearchRequest(
            baseObject='ou=stuff,dc=example,dc=com',
            ), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('ou', ['stuff']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='cn=another,ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('cn', ['another']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='cn=thingie,ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('cn', ['thingie']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultDone(resultCode=0),
            id=2)),
                          )

    def test_search_scope_oneLevel(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPSearchRequest(
            baseObject='ou=stuff,dc=example,dc=com',
            scope=pureldap.LDAP_SCOPE_singleLevel,
            ), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='cn=thingie,ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('cn', ['thingie']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='cn=another,ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('cn', ['another']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultDone(resultCode=0),
            id=2)),
                          )

    def test_search_scope_wholeSubtree(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPSearchRequest(
            baseObject='ou=stuff,dc=example,dc=com',
            scope=pureldap.LDAP_SCOPE_wholeSubtree,
            ), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('ou', ['stuff']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='cn=another,ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('cn', ['another']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='cn=thingie,ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('cn', ['thingie']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultDone(resultCode=0),
            id=2)),
                          )

    def test_search_scope_baseObject(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPSearchRequest(
            baseObject='ou=stuff,dc=example,dc=com',
            scope=pureldap.LDAP_SCOPE_baseObject,
            ), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='ou=stuff,dc=example,dc=com',
            attributes=[ ('objectClass', ['a', 'b']),
                         ('ou', ['stuff']),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultDone(resultCode=0),
            id=2)),
                          )

    def test_rootDSE(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPSearchRequest(
            baseObject='',
            scope=pureldap.LDAP_SCOPE_baseObject,
            filter=pureldap.LDAPFilter_present('objectClass'),
            ), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultEntry(
            objectName='',
            attributes=[ ('supportedLDAPVersion', ['3']),
                         ('namingContexts', ['dc=example,dc=com']),
                         ('supportedExtension', [
            pureldap.LDAPPasswordModifyRequest.oid.value,
            ]),
                         ]),
            id=2))
                          + str(pureldap.LDAPMessage(
            pureldap.LDAPSearchResultDone(resultCode=ldaperrors.Success.resultCode),
            id=2)),
                          )

    def test_delete(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPDelRequest(str(self.thingie.dn)), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPDelResponse(resultCode=0),
            id=2)),
                          )
        d = self.stuff.children()
        children = deferredResult(d)
        self.assertEquals(children, [self.another])

    def test_unknownRequest(self):
        # make server miss one of the handle_* attributes
        # without having to modify the LDAPServer class
        class MockServer(ldapserver.LDAPServer):
            handle_LDAPBindRequest = property()
        self.server.__class__ = MockServer
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(), id=2)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPExtendedResponse(resultCode=ldaperrors.LDAPProtocolError.resultCode,
                                          responseName='1.3.6.1.4.1.1466.20036',
                                          errorMessage='Unknown request'), id=2)))

    def test_control_unknown_critical(self):
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(), id=2,
            controls=[('42.42.42.42', True, None),
                      ])))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(
            resultCode=ldaperrors.LDAPUnavailableCriticalExtension.resultCode,
            errorMessage='Unknown control 42.42.42.42'), id=2)))

    def test_control_unknown_nonCritical(self):
        self.thingie['userPassword'] = ['{SSHA}yVLLj62rFf3kDAbzwEU0zYAVvbWrze8='] # "secret"
        self.server.dataReceived(str(pureldap.LDAPMessage(
            pureldap.LDAPBindRequest(dn='cn=thingie,ou=stuff,dc=example,dc=com',
                                     auth='secret'),
            controls=[('42.42.42.42', False, None)],
            id=4)))
        self.assertEquals(self.server.transport.value(),
                          str(pureldap.LDAPMessage(
            pureldap.LDAPBindResponse(resultCode=0,
                                      matchedDN='cn=thingie,ou=stuff,dc=example,dc=com'),
            id=4)))


class TestSchema(unittest.TestCase):
    def setUp(self):
        db = inmemory.ReadOnlyInMemoryLDAPEntry('', {})
        com = db.addChild('dc=com',
                          {'objectClass': ['dcObject'],
                           'dc': ['com'],
                           })
        com.addChild('dc=example',
                     {'objectClass': ['dcObject'],
                      'dc': ['example'],
                      'subschemaSubentry': ['cn=schema'],
                      })
        db.addChild('cn=schema',
                    {'objectClass': ['TODO'],
                     'cn': ['schema'],
                     'attributeTypes': [test_schema.AttributeType_KnownValues.knownValues[0][0]],
                     'objectClasses': [test_schema.OBJECTCLASSES['organization'],
                                       test_schema.OBJECTCLASSES['organizationalUnit'],
                                       ],
                     })
        
        class LDAPServerFactory(protocol.ServerFactory):
            protocol = ldapserver.LDAPServer
            def __init__(self, root):
                self.root = root

        components.registerAdapter(lambda x: x.root,
                                   LDAPServerFactory,
                                   interfaces.IConnectedLDAPEntry)
        serverFactory = LDAPServerFactory(db)

        self.client = ldapclient.LDAPClient()
        server = serverFactory.buildProtocol(address.IPv4Address('TCP', 'localhost', '1024'))
        util.returnConnected(server, self.client)

    def testSimple(self):
        d = fetchschema.fetch(self.client, 'dc=example,dc=com')
        (attributeTypes, objectClasses) = util.pumpingDeferredResult(d)

        self.failUnlessEqual([str(x) for x in attributeTypes],
                             [str(schema.AttributeTypeDescription(x)) for x in [
            test_schema.AttributeType_KnownValues.knownValues[0][0],
            ]])

      self.failUnlessEqual([str(x) for x in objectClasses],
                             [str(schema.ObjectClassDescription(x)) for x in [
            test_schema.OBJECTCLASSES['organization'],
            test_schema.OBJECTCLASSES['organizationalUnit'],
            ]])

Generated by  Doxygen 1.6.0   Back to index