Private
Server IP : 195.201.23.43  /  Your IP : 3.140.184.21
Web Server : Apache
System : Linux webserver2.vercom.be 5.4.0-192-generic #212-Ubuntu SMP Fri Jul 5 09:47:39 UTC 2024 x86_64
User : kdecoratie ( 1041)
PHP Version : 7.1.33-63+ubuntu20.04.1+deb.sury.org+1
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/python/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/twisted/python/threadpool.py
# -*- test-case-name: twisted.test.test_threadpool -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
twisted.python.threadpool: a pool of threads to which we dispatch tasks.

In most cases you can just use C{reactor.callInThread} and friends
instead of creating a thread pool directly.
"""

from __future__ import division, absolute_import

import threading

from twisted._threads import pool as _pool
from twisted.python import log, context
from twisted.python.failure import Failure
from twisted.python._oldstyle import _oldStyle


WorkerStop = object()



@_oldStyle
class ThreadPool:
    """
    This class (hopefully) generalizes the functionality of a pool of threads
    to which work can be dispatched.

    L{callInThread} and L{stop} should only be called from a single thread.

    @ivar started: Whether or not the thread pool is currently running.
    @type started: L{bool}

    @ivar threads: List of workers currently running in this thread pool.
    @type threads: L{list}

    @ivar _pool: A hook for testing.
    @type _pool: callable compatible with L{_pool}
    """
    min = 5
    max = 20
    joined = False
    started = False
    workers = 0
    name = None

    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)
    _pool = staticmethod(_pool)

    def __init__(self, minthreads=5, maxthreads=20, name=None):
        """
        Create a new threadpool.

        @param minthreads: minimum number of threads in the pool
        @type minthreads: L{int}

        @param maxthreads: maximum number of threads in the pool
        @type maxthreads: L{int}

        @param name: The name to give this threadpool; visible in log messages.
        @type name: native L{str}
        """
        assert minthreads >= 0, 'minimum is negative'
        assert minthreads <= maxthreads, 'minimum is greater than maximum'
        self.min = minthreads
        self.max = maxthreads
        self.name = name
        self.threads = []

        def trackingThreadFactory(*a, **kw):
            thread = self.threadFactory(*a, name=self._generateName(), **kw)
            self.threads.append(thread)
            return thread

        def currentLimit():
            if not self.started:
                return 0
            return self.max

        self._team = self._pool(currentLimit, trackingThreadFactory)


    @property
    def workers(self):
        """
        For legacy compatibility purposes, return a total number of workers.

        @return: the current number of workers, both idle and busy (but not
            those that have been quit by L{ThreadPool.adjustPoolsize})
        @rtype: L{int}
        """
        stats = self._team.statistics()
        return stats.idleWorkerCount + stats.busyWorkerCount


    @property
    def working(self):
        """
        For legacy compatibility purposes, return the number of busy workers as
        expressed by a list the length of that number.

        @return: the number of workers currently processing a work item.
        @rtype: L{list} of L{None}
        """
        return [None] * self._team.statistics().busyWorkerCount


    @property
    def waiters(self):
        """
        For legacy compatibility purposes, return the number of idle workers as
        expressed by a list the length of that number.

        @return: the number of workers currently alive (with an allocated
            thread) but waiting for new work.
        @rtype: L{list} of L{None}
        """
        return [None] * self._team.statistics().idleWorkerCount


    @property
    def _queue(self):
        """
        For legacy compatibility purposes, return an object with a C{qsize}
        method that indicates the amount of work not yet allocated to a worker.

        @return: an object with a C{qsize} method.
        """
        class NotAQueue(object):
            def qsize(q):
                """
                Pretend to be a Python threading Queue and return the
                number of as-yet-unconsumed tasks.

                @return: the amount of backlogged work not yet dispatched to a
                    worker.
                @rtype: L{int}
                """
                return self._team.statistics().backloggedWorkCount
        return NotAQueue()

    q = _queue                  # Yes, twistedchecker, I want a single-letter
                                # attribute name.


    def start(self):
        """
        Start the threadpool.
        """
        self.joined = False
        self.started = True
        # Start some threads.
        self.adjustPoolsize()
        backlog = self._team.statistics().backloggedWorkCount
        if backlog:
            self._team.grow(backlog)


    def startAWorker(self):
        """
        Increase the number of available workers for the thread pool by 1, up
        to the maximum allowed by L{ThreadPool.max}.
        """
        self._team.grow(1)


    def _generateName(self):
        """
        Generate a name for a new pool thread.

        @return: A distinctive name for the thread.
        @rtype: native L{str}
        """
        return "PoolThread-%s-%s" % (self.name or id(self), self.workers)


    def stopAWorker(self):
        """
        Decrease the number of available workers by 1, by quitting one as soon
        as it's idle.
        """
        self._team.shrink(1)


    def __setstate__(self, state):
        setattr(self, "__dict__", state)
        ThreadPool.__init__(self, self.min, self.max)


    def __getstate__(self):
        state = {}
        state['min'] = self.min
        state['max'] = self.max
        return state


    def callInThread(self, func, *args, **kw):
        """
        Call a callable object in a separate thread.

        @param func: callable object to be called in separate thread

        @param args: positional arguments to be passed to C{func}

        @param kw: keyword args to be passed to C{func}
        """
        self.callInThreadWithCallback(None, func, *args, **kw)


    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        """
        Call a callable object in a separate thread and call C{onResult} with
        the return value, or a L{twisted.python.failure.Failure} if the
        callable raises an exception.

        The callable is allowed to block, but the C{onResult} function must not
        block and should perform as little work as possible.

        A typical action for C{onResult} for a threadpool used with a Twisted
        reactor would be to schedule a L{twisted.internet.defer.Deferred} to
        fire in the main reactor thread using C{.callFromThread}.  Note that
        C{onResult} is called inside the separate thread, not inside the
        reactor thread.

        @param onResult: a callable with the signature C{(success, result)}.
            If the callable returns normally, C{onResult} is called with
            C{(True, result)} where C{result} is the return value of the
            callable.  If the callable throws an exception, C{onResult} is
            called with C{(False, failure)}.

            Optionally, C{onResult} may be L{None}, in which case it is not
            called at all.

        @param func: callable object to be called in separate thread

        @param args: positional arguments to be passed to C{func}

        @param kw: keyword arguments to be passed to C{func}
        """
        if self.joined:
            return
        ctx = context.theContextTracker.currentContext().contexts[-1]

        def inContext():
            try:
                result = inContext.theWork()
                ok = True
            except:
                result = Failure()
                ok = False

            inContext.theWork = None
            if inContext.onResult is not None:
                inContext.onResult(ok, result)
                inContext.onResult = None
            elif not ok:
                log.err(result)

        # Avoid closing over func, ctx, args, kw so that we can carefully
        # manage their lifecycle.  See
        # test_threadCreationArgumentsCallInThreadWithCallback.
        inContext.theWork = lambda: context.call(ctx, func, *args, **kw)
        inContext.onResult = onResult

        self._team.do(inContext)


    def stop(self):
        """
        Shutdown the threads in the threadpool.
        """
        self.joined = True
        self.started = False
        self._team.quit()
        for thread in self.threads:
            thread.join()


    def adjustPoolsize(self, minthreads=None, maxthreads=None):
        """
        Adjust the number of available threads by setting C{min} and C{max} to
        new values.

        @param minthreads: The new value for L{ThreadPool.min}.

        @param maxthreads: The new value for L{ThreadPool.max}.
        """
        if minthreads is None:
            minthreads = self.min
        if maxthreads is None:
            maxthreads = self.max

        assert minthreads >= 0, 'minimum is negative'
        assert minthreads <= maxthreads, 'minimum is greater than maximum'

        self.min = minthreads
        self.max = maxthreads
        if not self.started:
            return

        # Kill of some threads if we have too many.
        if self.workers > self.max:
            self._team.shrink(self.workers - self.max)
        # Start some threads if we have too few.
        if self.workers < self.min:
            self._team.grow(self.min - self.workers)


    def dumpStats(self):
        """
        Dump some plain-text informational messages to the log about the state
        of this L{ThreadPool}.
        """
        log.msg('waiters: %s' % (self.waiters,))
        log.msg('workers: %s' % (self.working,))
        log.msg('total: %s'   % (self.threads,))
Private