Private
Server IP : 195.201.23.43  /  Your IP : 3.141.244.88
Web Server : Apache
System : Linux webserver2.vercom.be 5.4.0-192-generic #212-Ubuntu SMP Fri Jul 5 09:47:39 UTC 2024 x86_64
User : kdecoratie ( 1041)
PHP Version : 7.1.33-63+ubuntu20.04.1+deb.sury.org+1
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/twisted/test//test_persisted.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

from __future__ import division, absolute_import

# System Imports
import sys

from twisted.trial import unittest

try:
    import cPickle as pickle
except ImportError:
    import pickle

import io

try:
    from cStringIO import StringIO as _oldStyleCStringIO
except ImportError:
    skipStringIO = "No cStringIO available."
else:
    skipStringIO = None

try:
    import copyreg
except:
    import copy_reg as copyreg

# Twisted Imports
from twisted.persisted import styles, aot, crefutil
from twisted.python.compat import _PY3


class VersionTests(unittest.TestCase):
    def test_nullVersionUpgrade(self):
        global NullVersioned
        class NullVersioned(object):
            def __init__(self):
                self.ok = 0
        pkcl = pickle.dumps(NullVersioned())
        class NullVersioned(styles.Versioned, object):
            persistenceVersion = 1
            def upgradeToVersion1(self):
                self.ok = 1
        mnv = pickle.loads(pkcl)
        styles.doUpgrade()
        assert mnv.ok, "initial upgrade not run!"

    def test_versionUpgrade(self):
        global MyVersioned
        class MyVersioned(styles.Versioned):
            persistenceVersion = 2
            persistenceForgets = ['garbagedata']
            v3 = 0
            v4 = 0

            def __init__(self):
                self.somedata = 'xxx'
                self.garbagedata = lambda q: 'cant persist'

            def upgradeToVersion3(self):
                self.v3 += 1

            def upgradeToVersion4(self):
                self.v4 += 1
        mv = MyVersioned()
        assert not (mv.v3 or mv.v4), "hasn't been upgraded yet"
        pickl = pickle.dumps(mv)
        MyVersioned.persistenceVersion = 4
        obj = pickle.loads(pickl)
        styles.doUpgrade()
        assert obj.v3, "didn't do version 3 upgrade"
        assert obj.v4, "didn't do version 4 upgrade"
        pickl = pickle.dumps(obj)
        obj = pickle.loads(pickl)
        styles.doUpgrade()
        assert obj.v3 == 1, "upgraded unnecessarily"
        assert obj.v4 == 1, "upgraded unnecessarily"
    
    def test_nonIdentityHash(self):
        global ClassWithCustomHash
        class ClassWithCustomHash(styles.Versioned):
            def __init__(self, unique, hash):
                self.unique = unique
                self.hash = hash
            def __hash__(self):
                return self.hash
        
        v1 = ClassWithCustomHash('v1', 0)
        v2 = ClassWithCustomHash('v2', 0)

        pkl = pickle.dumps((v1, v2))
        del v1, v2
        ClassWithCustomHash.persistenceVersion = 1
        ClassWithCustomHash.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)
        v1, v2 = pickle.loads(pkl)
        styles.doUpgrade()
        self.assertEqual(v1.unique, 'v1')
        self.assertEqual(v2.unique, 'v2')
        self.assertTrue(v1.upgraded)
        self.assertTrue(v2.upgraded)
    
    def test_upgradeDeserializesObjectsRequiringUpgrade(self):
        global ToyClassA, ToyClassB
        class ToyClassA(styles.Versioned):
            pass
        class ToyClassB(styles.Versioned):
            pass
        x = ToyClassA()
        y = ToyClassB()
        pklA, pklB = pickle.dumps(x), pickle.dumps(y)
        del x, y
        ToyClassA.persistenceVersion = 1
        def upgradeToVersion1(self):
            self.y = pickle.loads(pklB)
            styles.doUpgrade()
        ToyClassA.upgradeToVersion1 = upgradeToVersion1
        ToyClassB.persistenceVersion = 1
        ToyClassB.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)

        x = pickle.loads(pklA)
        styles.doUpgrade()
        self.assertTrue(x.y.upgraded)



class VersionedSubClass(styles.Versioned):
    pass



class SecondVersionedSubClass(styles.Versioned):
    pass



class VersionedSubSubClass(VersionedSubClass):
    pass



class VersionedDiamondSubClass(VersionedSubSubClass, SecondVersionedSubClass):
    pass



class AybabtuTests(unittest.TestCase):
    """
    L{styles._aybabtu} gets all of classes in the inheritance hierarchy of its
    argument that are strictly between L{Versioned} and the class itself.
    """

    def test_aybabtuStrictEmpty(self):
        """
        L{styles._aybabtu} of L{Versioned} itself is an empty list.
        """
        self.assertEqual(styles._aybabtu(styles.Versioned), [])


    def test_aybabtuStrictSubclass(self):
        """
        There are no classes I{between} L{VersionedSubClass} and L{Versioned},
        so L{styles._aybabtu} returns an empty list.
        """
        self.assertEqual(styles._aybabtu(VersionedSubClass), [])


    def test_aybabtuSubsubclass(self):
        """
        With a sub-sub-class of L{Versioned}, L{styles._aybabtu} returns a list
        containing the intervening subclass.
        """
        self.assertEqual(styles._aybabtu(VersionedSubSubClass),
                         [VersionedSubClass])


    def test_aybabtuStrict(self):
        """
        For a diamond-shaped inheritance graph, L{styles._aybabtu} returns a
        list containing I{both} intermediate subclasses.
        """
        self.assertEqual(
            styles._aybabtu(VersionedDiamondSubClass),
            [VersionedSubSubClass, VersionedSubClass, SecondVersionedSubClass])



class MyEphemeral(styles.Ephemeral):

    def __init__(self, x):
        self.x = x


class EphemeralTests(unittest.TestCase):

    def test_ephemeral(self):
        o = MyEphemeral(3)
        self.assertEqual(o.__class__, MyEphemeral)
        self.assertEqual(o.x, 3)
        
        pickl = pickle.dumps(o)
        o = pickle.loads(pickl)
        
        self.assertEqual(o.__class__, styles.Ephemeral)
        self.assertFalse(hasattr(o, 'x'))


class Pickleable:

    def __init__(self, x):
        self.x = x
    
    def getX(self):
        return self.x



class NotPickleable(object):
    """
    A class that is not pickleable.
    """

    def __reduce__(self):
        """
        Raise an exception instead of pickling.
        """
        raise TypeError("Not serializable.")



class CopyRegistered(object):
    """
    A class that is pickleable only because it is registered with the
    C{copyreg} module.
    """

    def __init__(self):
        """
        Ensure that this object is normally not pickleable.
        """
        self.notPickleable = NotPickleable()



class CopyRegisteredLoaded(object):
    """
    L{CopyRegistered} after unserialization.
    """



def reduceCopyRegistered(cr):
    """
    Externally implement C{__reduce__} for L{CopyRegistered}.

    @param cr: The L{CopyRegistered} instance.

    @return: a 2-tuple of callable and argument list, in this case
        L{CopyRegisteredLoaded} and no arguments.
    """
    return CopyRegisteredLoaded, ()



copyreg.pickle(CopyRegistered, reduceCopyRegistered)

class A:
    """
    dummy class
    """
    def amethod(self):
        pass

class B:
    """
    dummy class
    """
    def bmethod(self):
        pass

def funktion():
    pass

class PicklingTests(unittest.TestCase):
    """Test pickling of extra object types."""
    
    def test_module(self):
        pickl = pickle.dumps(styles)
        o = pickle.loads(pickl)
        self.assertEqual(o, styles)


    def test_classMethod(self):
        """
        After importing L{twisted.persisted.styles}, it is possible to pickle
        classmethod objects.
        """
        pickl = pickle.dumps(Pickleable.getX)
        o = pickle.loads(pickl)
        self.assertEqual(o, Pickleable.getX)

    if sys.version_info > (3, 4):
        test_classMethod.skip = (
            "As of Python 3.4 it is no longer possible to globally change "
            "the behavior of function pickling."
        )


    def test_instanceMethod(self):
        obj = Pickleable(4)
        pickl = pickle.dumps(obj.getX)
        o = pickle.loads(pickl)
        self.assertEqual(o(), 4)
        self.assertEqual(type(o), type(obj.getX))
    
    def test_stringIO(self):
        f = _oldStyleCStringIO()
        f.write("abc")
        pickl = pickle.dumps(f)
        o = pickle.loads(pickl)
        self.assertEqual(type(o), type(f))
        self.assertEqual(f.getvalue(), "abc")

    if skipStringIO:
        test_stringIO.skip = skipStringIO



class StringIOTransitionTests(unittest.TestCase):
    """
    When pickling a cStringIO in Python 2, it should unpickle as a BytesIO or a
    StringIO in Python 3, depending on the type of its contents.
    """

    if not _PY3:
        skip = "In Python 2 we can still unpickle cStringIO as such."


    def test_unpickleBytesIO(self):
        """
        A cStringIO pickled with bytes in it will yield an L{io.BytesIO} on
        python 3.
        """
        pickledStringIWithText = (
            b"ctwisted.persisted.styles\nunpickleStringI\np0\n"
            b"(S'test'\np1\nI0\ntp2\nRp3\n."
        )
        loaded = pickle.loads(pickledStringIWithText)
        self.assertIsInstance(loaded, io.StringIO)
        self.assertEqual(loaded.getvalue(), u"test")



class EvilSourceror:
    def __init__(self, x):
        self.a = self
        self.a.b = self
        self.a.b.c = x

class NonDictState:
    def __getstate__(self):
        return self.state
    def __setstate__(self, state):
        self.state = state

class AOTTests(unittest.TestCase):
    def test_simpleTypes(self):
        obj = (1, 2.0, 3j, True, slice(1, 2, 3), 'hello', u'world',
               sys.maxsize + 1, None, Ellipsis)
        rtObj = aot.unjellyFromSource(aot.jellyToSource(obj))
        self.assertEqual(obj, rtObj)


    def test_methodSelfIdentity(self):
        a = A()
        b = B()
        a.bmethod = b.bmethod
        b.a = a
        im_ = aot.unjellyFromSource(aot.jellyToSource(b)).a.bmethod
        self.assertEqual(aot._selfOfMethod(im_).__class__,
                         aot._classOfMethod(im_))


    def test_methodNotSelfIdentity(self):
        """
        If a class change after an instance has been created,
        L{aot.unjellyFromSource} shoud raise a C{TypeError} when trying to
        unjelly the instance.
        """
        a = A()
        b = B()
        a.bmethod = b.bmethod
        b.a = a
        savedbmethod = B.bmethod
        del B.bmethod
        try:
            self.assertRaises(TypeError, aot.unjellyFromSource,
                              aot.jellyToSource(b))
        finally:
            B.bmethod = savedbmethod


    def test_unsupportedType(self):
        """
        L{aot.jellyToSource} should raise a C{TypeError} when trying to jelly
        an unknown type without a C{__dict__} property or C{__getstate__}
        method.
        """
        class UnknownType(object):
            @property
            def __dict__(self):
                raise AttributeError()
        self.assertRaises(TypeError, aot.jellyToSource, UnknownType())


    def test_basicIdentity(self):
        # Anyone wanting to make this datastructure more complex, and thus this
        # test more comprehensive, is welcome to do so.
        aj = aot.AOTJellier().jellyToAO
        d = {'hello': 'world', "method": aj}
        l = [1, 2, 3,
             "he\tllo\n\n\"x world!",
             u"goodbye \n\t\u1010 world!",
             1, 1.0, 100 ** 100, unittest, aot.AOTJellier, d,
             funktion
             ]
        t = tuple(l)
        l.append(l)
        l.append(t)
        l.append(t)
        uj = aot.unjellyFromSource(aot.jellyToSource([l, l]))
        assert uj[0] is uj[1]
        assert uj[1][0:5] == l[0:5]


    def test_nonDictState(self):
        a = NonDictState()
        a.state = "meringue!"
        assert aot.unjellyFromSource(aot.jellyToSource(a)).state == a.state


    def test_copyReg(self):
        """
        L{aot.jellyToSource} and L{aot.unjellyFromSource} honor functions
        registered in the pickle copy registry.
        """
        uj = aot.unjellyFromSource(aot.jellyToSource(CopyRegistered()))
        self.assertIsInstance(uj, CopyRegisteredLoaded)


    def test_funkyReferences(self):
        o = EvilSourceror(EvilSourceror([]))
        j1 = aot.jellyToAOT(o)
        oj = aot.unjellyFromAOT(j1)

        assert oj.a is oj
        assert oj.a.b is oj.b
        assert oj.c is not oj.c.c


    def test_circularTuple(self):
        """
        L{aot.jellyToAOT} can persist circular references through tuples.
        """
        l = []
        t = (l, 4321)
        l.append(t)
        j1 = aot.jellyToAOT(l)
        oj = aot.unjellyFromAOT(j1)
        self.assertIsInstance(oj[0], tuple)
        self.assertIs(oj[0][0], oj)
        self.assertEqual(oj[0][1], 4321)



class CrefUtilTests(unittest.TestCase):
    """
    Tests for L{crefutil}.
    """

    def test_dictUnknownKey(self):
        """
        L{crefutil._DictKeyAndValue} only support keys C{0} and C{1}.
        """
        d = crefutil._DictKeyAndValue({})
        self.assertRaises(RuntimeError, d.__setitem__, 2, 3)


    def test_deferSetMultipleTimes(self):
        """
        L{crefutil._Defer} can be assigned a key only one time.
        """
        d = crefutil._Defer()
        d[0] = 1
        self.assertRaises(RuntimeError, d.__setitem__, 0, 1)


    def test_containerWhereAllElementsAreKnown(self):
        """
        A L{crefutil._Container} where all of its elements are known at
        construction time is nonsensical and will result in errors in any call
        to addDependant.
        """
        container = crefutil._Container([1, 2, 3], list)
        self.assertRaises(AssertionError,
                          container.addDependant, {}, "ignore-me")


    def test_dontPutCircularReferencesInDictionaryKeys(self):
        """
        If a dictionary key contains a circular reference (which is probably a
        bad practice anyway) it will be resolved by a
        L{crefutil._DictKeyAndValue}, not by placing a L{crefutil.NotKnown}
        into a dictionary key.
        """
        self.assertRaises(AssertionError,
                          dict().__setitem__, crefutil.NotKnown(), "value")


    def test_dontCallInstanceMethodsThatArentReady(self):
        """
        L{crefutil._InstanceMethod} raises L{AssertionError} to indicate it
        should not be called.  This should not be possible with any of its API
        clients, but is provided for helping to debug.
        """
        self.assertRaises(AssertionError,
                          crefutil._InstanceMethod(
                              "no_name", crefutil.NotKnown(), type))



testCases = [VersionTests, EphemeralTests, PicklingTests]

Private