Logo Search packages:      
Sourcecode: ldaptor version File versions

ldifdelta.py

from twisted.python.failure import Failure
from twisted.internet import error
from ldaptor.protocols.ldap import ldifprotocol
from ldaptor import delta, entry

WAIT_FOR_CHANGETYPE = 'WAIT_FOR_CHANGETYPE'
WAIT_FOR_MOD_SPEC = 'WAIT_FOR_MOD_SPEC'
IN_MOD_SPEC = 'IN_MOD_SPEC'
IN_ADD_ENTRY = 'IN_ADD_ENTRY'
IN_DELETE = 'IN_DELETE'

00012 class LDIFDeltaMissingChangeTypeError(ldifprotocol.LDIFParseError):
    """LDIF delta entry has no changetype."""
    pass

00016 class LDIFDeltaUnknownModificationError(ldifprotocol.LDIFParseError):
    """LDIF delta modification has unknown mod-spec."""
    pass

00020 class LDIFDeltaModificationMissingEndDashError(ldifprotocol.LDIFParseError):
    """LDIF delta modification has no ending dash."""
    pass

00024 class LDIFDeltaModificationDifferentAttributeTypeError(ldifprotocol.LDIFParseError):
    """The attribute type for the change is not the as in the mod-spec header line."""
    pass

00028 class LDIFDeltaAddMissingAttributesError(ldifprotocol.LDIFParseError):
    """Add operation needs to have atleast one attribute type and value."""
    pass

00032 class LDIFDeltaDeleteHasJunkAfterChangeTypeError(ldifprotocol.LDIFParseError):
    """Delete operation takes no attribute types or values."""
    pass

class LDIFDelta(ldifprotocol.LDIF):
    def state_WAIT_FOR_DN(self, line):
        super(LDIFDelta, self).state_WAIT_FOR_DN(line)
        if self.mode == ldifprotocol.IN_ENTRY:
            self.mode = WAIT_FOR_CHANGETYPE

    def state_WAIT_FOR_CHANGETYPE(self, line):
        assert self.dn is not None, 'self.dn must be set when in entry'
        assert self.data is not None, 'self.data must be set when in entry'

        if line == '':
            raise LDIFDeltaMissingChangeTypeError, self.dn

        key, val = self._parseLine(line)

        if key != 'changetype':
            raise LDIFDeltaMissingChangeTypeError, (self.dn, key, val)

        if val == 'modify':
            self.modifications = []
            self.mode = WAIT_FOR_MOD_SPEC
        elif val == 'add':
            self.mode = IN_ADD_ENTRY
        elif val == 'delete':
            self.mode = IN_DELETE
        elif val == 'modrdn' or val == 'moddn':
            raise NotImplementedError #TODO

    MOD_SPEC_TO_DELTA = {
        'add': delta.Add,
        'delete': delta.Delete,
        'replace': delta.Replace,
        }

    def state_WAIT_FOR_MOD_SPEC(self, line):
        if line == '':
            # end of entry
            self.mode = ldifprotocol.WAIT_FOR_DN
            m = delta.ModifyOp(dn=self.dn,
                               modifications=self.modifications)
            self.dn = None
            self.data = None
            self.modifications = None
            self.gotEntry(m)
            return

        key, val = self._parseLine(line)

        if key not in self.MOD_SPEC_TO_DELTA:
            raise LDIFDeltaUnknownModificationError, \
                  (self.dn, key)

        self.mod_spec = key
        self.mod_spec_attr = val
        self.mod_spec_data = []
        self.mode = IN_MOD_SPEC

    def state_IN_MOD_SPEC(self, line):
        if line == '':
            raise LDIFDeltaModificationMissingEndDashError

        if line == '-':
            mod = self.MOD_SPEC_TO_DELTA[self.mod_spec]
            de = mod(self.mod_spec_attr, self.mod_spec_data)
            self.modifications.append(de)
            del self.mod_spec
            del self.mod_spec_attr
            del self.mod_spec_data
            self.mode = WAIT_FOR_MOD_SPEC
            return

        key, val = self._parseLine(line)

        if key != self.mod_spec_attr:
            raise LDIFDeltaModificationDifferentAttributeTypeError, \
                  (key, self.mod_spec_attr)

        self.mod_spec_data.append(val)

    def state_IN_ADD_ENTRY(self, line):
        assert self.dn is not None, 'self.dn must be set when in entry'
        assert self.data is not None, 'self.data must be set when in entry'

        if line == '':
            # end of entry
            if not self.data:
                raise LDIFDeltaAddMissingAttributesError, \
                      self.dn
            self.mode = ldifprotocol.WAIT_FOR_DN
            o = delta.AddOp(entry.BaseLDAPEntry(dn=self.dn,
                                                attributes=self.data))
            self.dn = None
            self.data = None
            self.gotEntry(o)
            return

        key, val = self._parseLine(line)

        if not key in self.data:
            self.data[key] = []

        self.data[key].append(val)

    def state_IN_DELETE(self, line):
        assert self.dn is not None, 'self.dn must be set when in entry'

        if line == '':
            # end of entry
            self.mode = ldifprotocol.WAIT_FOR_DN
            o = delta.DeleteOp(dn=self.dn)
            self.dn = None
            self.data = None
            self.gotEntry(o)
            return

        raise LDIFDeltaDeleteHasJunkAfterChangeTypeError, \
              (self.dn, line)

def fromLDIFFile(f):
    """Read LDIF data from a file."""
    from twisted.trial import util

    p = LDIFDelta()
    l = []
    p.gotEntry = l.append
    while 1:
        data = f.read()
        if not data:
            break
        p.dataReceived(data)
    p.connectionLost(Failure(error.ConnectionDone()))

    return l

Generated by  Doxygen 1.6.0   Back to index