Server IP : 195.201.23.43 / Your IP : 18.222.164.159 Web Server : Apache System : Linux webserver2.vercom.be 5.4.0-192-generic #212-Ubuntu SMP Fri Jul 5 09:47:39 UTC 2024 x86_64 User : kdecoratie ( 1041) PHP Version : 7.1.33-63+ubuntu20.04.1+deb.sury.org+1 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /lib/python3/dist-packages/requests_unixsocket/ |
Upload File : |
import socket from requests.adapters import HTTPAdapter from requests.compat import urlparse, unquote try: import http.client as httplib except ImportError: import httplib try: from requests.packages import urllib3 except ImportError: import urllib3 # The following was adapted from some code from docker-py # https://github.com/docker/docker-py/blob/master/docker/transport/unixconn.py class UnixHTTPConnection(httplib.HTTPConnection, object): def __init__(self, unix_socket_url, timeout=60): """Create an HTTP connection to a unix domain socket :param unix_socket_url: A URL with a scheme of 'http+unix' and the netloc is a percent-encoded path to a unix domain socket. E.g.: 'http+unix://%2Ftmp%2Fprofilesvc.sock/status/pid' """ super(UnixHTTPConnection, self).__init__('localhost', timeout=timeout) self.unix_socket_url = unix_socket_url self.timeout = timeout self.sock = None def __del__(self): # base class does not have d'tor if self.sock: self.sock.close() def connect(self): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) socket_path = unquote(urlparse(self.unix_socket_url).netloc) sock.connect(socket_path) self.sock = sock class UnixHTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool): def __init__(self, socket_path, timeout=60): super(UnixHTTPConnectionPool, self).__init__( 'localhost', timeout=timeout) self.socket_path = socket_path self.timeout = timeout def _new_conn(self): return UnixHTTPConnection(self.socket_path, self.timeout) class UnixAdapter(HTTPAdapter): def __init__(self, timeout=60, pool_connections=25, *args, **kwargs): super(UnixAdapter, self).__init__(*args, **kwargs) self.timeout = timeout self.pools = urllib3._collections.RecentlyUsedContainer( pool_connections, dispose_func=lambda p: p.close() ) def get_connection(self, url, proxies=None): proxies = proxies or {} proxy = proxies.get(urlparse(url.lower()).scheme) if proxy: raise ValueError('%s does not support specifying proxies' % self.__class__.__name__) with self.pools.lock: pool = self.pools.get(url) if pool: return pool pool = UnixHTTPConnectionPool(url, self.timeout) self.pools[url] = pool return pool def request_url(self, request, proxies): return request.path_url def close(self): self.pools.clear()Private