Server IP : 195.201.23.43 / Your IP : 18.221.182.97 Web Server : Apache System : Linux webserver2.vercom.be 5.4.0-192-generic #212-Ubuntu SMP Fri Jul 5 09:47:39 UTC 2024 x86_64 User : kdecoratie ( 1041) PHP Version : 7.1.33-63+ubuntu20.04.1+deb.sury.org+1 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /lib/python3/dist-packages/twisted/conch/client/ |
Upload File : |
# -*- test-case-name: twisted.conch.test.test_default -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Accesses the key agent for user authentication. Maintainer: Paul Swartz """ import os from twisted.conch.ssh import agent, channel, keys from twisted.internet import protocol, reactor from twisted.python import log class SSHAgentClient(agent.SSHAgentClient): def __init__(self): agent.SSHAgentClient.__init__(self) self.blobs = [] def getPublicKeys(self): return self.requestIdentities().addCallback(self._cbPublicKeys) def _cbPublicKeys(self, blobcomm): log.msg('got %i public keys' % len(blobcomm)) self.blobs = [x[0] for x in blobcomm] def getPublicKey(self): """ Return a L{Key} from the first blob in C{self.blobs}, if any, or return L{None}. """ if self.blobs: return keys.Key.fromString(self.blobs.pop(0)) return None class SSHAgentForwardingChannel(channel.SSHChannel): def channelOpen(self, specificData): cc = protocol.ClientCreator(reactor, SSHAgentForwardingLocal) d = cc.connectUNIX(os.environ['SSH_AUTH_SOCK']) d.addCallback(self._cbGotLocal) d.addErrback(lambda x:self.loseConnection()) self.buf = '' def _cbGotLocal(self, local): self.local = local self.dataReceived = self.local.transport.write self.local.dataReceived = self.write def dataReceived(self, data): self.buf += data def closed(self): if self.local: self.local.loseConnection() self.local = None class SSHAgentForwardingLocal(protocol.Protocol): passPrivate