BMa @sdZdgZddlZddlZddlZddlZddlZddlZyddlZWne k rdZYnXddl m Z ddl m Z ddl m Z ddl mZdd l mZdd l mZdd l mZdd l mZdd lmZddlmZddZeedrLddZn ddZGddde jZGdddejejZGdddeZGdddeZ GdddeZ!dS)zEvent loop using a selector and related classes. A selector is a "notify-when-ready" multiplexer. For a subclass which also includes support for signal handling, see the unix_events sub-module. BaseSelectorEventLoopN) base_events)compat) constants)events)futures) selectors) transports)sslproto) coroutine)loggerc CsAy|j|}Wntk r+dSYnXt|j|@SdS)NF)get_keyKeyErrorboolr)selectorfdZeventkeyrZd?d@Z edAdBZ!dCdDZ"dEdFZ#dGdHZ$dIdJZ%dKdLZ&dMdNZ'dOdPZ(S)QrzJSelector event loop. See events.EventLoop for API specification. Ncsatj|dkr%tj}tjd|jj||_|j t j |_ dS)NzUsing selector: %s) super__init__r ZDefaultSelectorr debug __class____name__ _selector_make_self_pipeweakrefWeakValueDictionary _transports)selfr)r!rrr<s     zBaseSelectorEventLoop.__init__extraservercCst||||||S)N)_SelectorSocketTransport)r(rprotocolwaiterr)r*rrr_make_socket_transportFsz,BaseSelectorEventLoop._make_socket_transport server_sideFserver_hostnamec Cs{tjs:|j||||d|d|d|d|Stj||||||} t||| d|d|| jS)Nr/r0r)r*)r Z_is_sslproto_available_make_legacy_ssl_transportZ SSLProtocolr+Z_app_transport) r(rawsockr, sslcontextr-r/r0r)r*Z ssl_protocolrrr_make_ssl_transportKs     z)BaseSelectorEventLoop._make_ssl_transportc Cs"t||||||||| S)N)_SelectorSslTransport) r(r2r,r3r-r/r0r)r*rrrr1Zsz0BaseSelectorEventLoop._make_legacy_ssl_transportcCst||||||S)N)_SelectorDatagramTransport)r(rr,addressr-r)rrr_make_datagram_transportds z.BaseSelectorEventLoop._make_datagram_transportcsh|jrtd|jr(dS|jtj|jdk rd|jjd|_dS)Nz!Cannot close a running event loop)Z is_running RuntimeError is_closed_close_self_pipercloser#)r()r!rrr<is      zBaseSelectorEventLoop.closecCs tdS)N)NotImplementedError)r(rrr _socketpairtsz!BaseSelectorEventLoop._socketpaircCsU|j|jj|jjd|_|jjd|_|jd8_dS)Nr)_remove_reader_ssockfilenor<_csock _internal_fds)r(rrrr;ws     z&BaseSelectorEventLoop._close_self_pipecCsg|j\|_|_|jjd|jjd|jd7_|j|jj|jdS)NFr)r>r@rB setblockingrC _add_readerrA_read_from_self)r(rrrr$s z%BaseSelectorEventLoop._make_self_pipecCsdS)Nr)r(datarrr_process_self_datasz(BaseSelectorEventLoop._process_self_datac Cs_xXy*|jjd}|sP|j|Wqtk rDwYqtk rVPYqXqWdS)Ni)r@recvrHInterruptedErrorBlockingIOError)r(rGrrrrFs  z%BaseSelectorEventLoop._read_from_selfc Cs[|j}|dk rWy|jdWn.tk rV|jrRtjdddYnXdS)Nsz3Fail to write a null byte into the self-pipe socketexc_infoT)rBsendOSError_debugr r )r(Zcsockrrr_write_to_selfs     z$BaseSelectorEventLoop._write_to_selfdcCs,|j|j|j|||||dS)N)rErA_accept_connection)r(protocol_factoryrr3r*backlogrrr_start_servingsz$BaseSelectorEventLoop._start_servingc Cs[xTt|D]F}yB|j\}}|jrGtjd||||jdWntttfk rvdSYq t k r} z| j t j t j t j t jfkr |jddd| d|i|j|j|jtj|j|||||nWYdd} ~ Xq Xd|i} |j||| ||} |j| q WdS)Nz#%r got a new connection from %r: %rFmessagez&socket.accept() out of system resource exceptionrpeername)rangeacceptrOr r rDrKrJConnectionAbortedErrorrNerrnoZEMFILEZENFILEZENOBUFSZENOMEMcall_exception_handlerr?rAZ call_laterrZACCEPT_RETRY_DELAYrU_accept_connection2Z create_task) r(rSrr3r*rT_connaddrexcr)rZrrrrRs4         z(BaseSelectorEventLoop._accept_connectionc cs$d}d}y|}|j}|rZ|j|||d|ddd|d|}n$|j||d|d|d|}y |EdHWn|jYnXWnytk r} zY|jr ddd| i} |dk r|| d <|dk r|| d <|j| WYdd} ~ XnXdS) Nr-r/Tr)r*rVz3Error on transport creation for incoming connectionrWr, transport) create_futurer4r.r< ExceptionrOr]) r(rSr`r)r3r*r,rcr-rbcontextrrrr^s4            z)BaseSelectorEventLoop._accept_connection2c CsNy|j|}Wntk r%Yn%X|jsJtdj||dS)Nz.File descriptor {!r} is used by transport {!r})r'r is_closingr9format)r(rrcrrr_ensure_fd_no_transports  z-BaseSelectorEventLoop._ensure_fd_no_transportc Gs|jtj|||}y|jj|}Wn1tk rh|jj|tj|dfYnSX|j|j }\}}|jj ||tjB||f|dk r|j dS)N) _check_closedrHandler#rrregisterr EVENT_READrGmodifycancel) r(rcallbackargshandlermaskreaderwriterrrrrEs    z!BaseSelectorEventLoop._add_readerc Cs|jrdSy|jj|}Wntk r>dSYn{X|j|j}\}}|tjM}|s|jj|n|jj ||d|f|dk r|j dSdSdS)NFT) r:r#rrrrGr rm unregisterrnro)r(rrrsrtrurrrr?s     z$BaseSelectorEventLoop._remove_readerc Gs|jtj|||}y|jj|}Wn1tk rh|jj|tjd|fYnSX|j|j }\}}|jj ||tjB||f|dk r|j dS)N) rjrrkr#rrrlr EVENT_WRITErGrnro) r(rrprqrrrrsrtrurrr _add_writer(s    z!BaseSelectorEventLoop._add_writerc Cs|jrdSy|jj|}Wntk r>dSYn{X|j|j}\}}|tjM}|s|jj|n|jj |||df|dk r|j dSdSdS)zRemove a writer callback.FNT) r:r#rrrrGr rwrvrnro)r(rrrsrtrurrr_remove_writer7s     z$BaseSelectorEventLoop._remove_writercGs |j||j|||S)zAdd a reader callback.)rirE)r(rrprqrrr add_readerNs z BaseSelectorEventLoop.add_readercCs|j||j|S)zRemove a reader callback.)rir?)r(rrrr remove_readerSs z#BaseSelectorEventLoop.remove_readercGs |j||j|||S)zAdd a writer callback..)rirx)r(rrprqrrr add_writerXs z BaseSelectorEventLoop.add_writercCs|j||j|S)zRemove a writer callback.)riry)r(rrrr remove_writer]s z#BaseSelectorEventLoop.remove_writercCsM|jr'|jdkr'td|j}|j|d|||S)zReceive data from the socket. The return value is a bytes object representing the data received. The maximum amount of data to be received at once is specified by nbytes. This method is a coroutine. rzthe socket must be non-blockingF)rO gettimeout ValueErrorrd _sock_recv)r(rnfutrrr sock_recvbs   zBaseSelectorEventLoop.sock_recvcCs|j}|r|j||jr/dSy|j|}Wnhttfk r{|j||j|d||Yn?tk r}z|j |WYdd}~XnX|j |dS)NT) rAr{ cancelledrIrKrJrzrre set_exception set_result)r(r registeredrrrrGrbrrrrqs   # z BaseSelectorEventLoop._sock_recvcCsc|jr'|jdkr'td|j}|rR|j|d||n |jd|S)aSend data to the socket. The socket must be connected to a remote socket. This method continues to send data from data until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. This method is a coroutine. rzthe socket must be non-blockingFN)rOr~rrd _sock_sendallr)r(rrGrrrr sock_sendalls    z"BaseSelectorEventLoop.sock_sendallcCs|j}|r|j||jr/dSy|j|}WnSttfk rbd}Yn6tk r}z|j|dSWYdd}~XnX|t|kr|j dn5|r||d}|j ||j |d||dS)NrT) rAr}rrMrKrJrerlenrr|r)r(rrrrGrrrbrrrrs"     z#BaseSelectorEventLoop._sock_sendallccs|jr'|jdkr'tdttd sI|jtjkrtj|d|jd|j d|}|j s|EdH|j d\}}}}}|j }|j ||||EdHS)zTConnect to a remote socket at address. This method is a coroutine. rzthe socket must be non-blockingAF_UNIXrrloopN)rOr~rhasattrrrrrZ_ensure_resolvedrdoneresultrd _sock_connect)r(rr7Zresolvedr_rrrr sock_connects "!   z"BaseSelectorEventLoop.sock_connectcCs|j}y|j|Wnttfk ro|jtj|j||j||j |||Yn?t k r}z|j |WYdd}~XnX|j ddS)N) rAZconnectrKrJZadd_done_callback functoolspartial_sock_connect_doner|_sock_connect_cbrerr)r(rrr7rrbrrrrs   z#BaseSelectorEventLoop._sock_connectcCs|j|dS)N)r})r(rrrrrrsz(BaseSelectorEventLoop._sock_connect_donecCs|jrdSy>|jtjtj}|dkrMt|d|fWnIttfk rhYn?tk r}z|j |WYdd}~XnX|j ddS)NrzConnect call failed %s) rZ getsockoptrZ SOL_SOCKETZSO_ERRORrNrKrJrerr)r(rrr7errrbrrrrs   z&BaseSelectorEventLoop._sock_connect_cbcCsJ|jr'|jdkr'td|j}|j|d||S)a|Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. This method is a coroutine. rzthe socket must be non-blockingF)rOr~rrd _sock_accept)r(rrrrr sock_accepts   z!BaseSelectorEventLoop.sock_acceptcCs|j}|r|j||jr/dSy#|j\}}|jdWnettfk r|j||j|d|YnEt k r}z|j |WYdd}~XnX|j ||fdS)NFT) rAr{rrZrDrKrJrzrrerr)r(rrrrr`r7rbrrrrs     z"BaseSelectorEventLoop._sock_acceptcCsx|D]\}}|j|j}\}}|tj@rk|dk rk|jr^|j|n |j||tj@r|dk r|jr|j|q|j|qWdS)N) fileobjrGr rmZ _cancelledr?Z _add_callbackrwry)r(Z event_listrrsrrtrurrr_process_events s   z%BaseSelectorEventLoop._process_eventscCs!|j|j|jdS)N)r?rAr<)r(rrrr _stop_servingsz#BaseSelectorEventLoop._stop_serving))r" __module__ __qualname____doc__rr.r4r1r8r<r>r;r$rHrFrPrUrRr r^rirEr?rxryrzr{r|r}rrrrrrrrrrrrrr)r!rr6sV          (#                  cseZdZdZeZdZddfddZddZdd Z d d Z d d Z ddZ ddZ ejrddZdddZddZddZddZS)_SelectorTransportiNc stj||||jd<|j|jdz<%s> )r!r"rappendrr_loopr:rr#r rmrwget_write_buffer_sizejoin)r(inforstatebufsizerrr__repr__>s*        z_SelectorTransport.__repr__cCs|jddS)N) _force_close)r(rrrabortZsz_SelectorTransport.abortcCs ||_dS)N)r)r(r,rrr set_protocol]sz_SelectorTransport.set_protocolcCs|jS)N)r)r(rrr get_protocol`sz_SelectorTransport.get_protocolcCs|jS)N)r)r(rrrrgcsz_SelectorTransport.is_closingcCsn|jr dSd|_|jj|j|jsj|jd7_|jj|j|jj|jddS)NTr) rrr?rrrry call_soon_call_connection_lost)r(rrrr<fs   z_SelectorTransport.closecCs4|jdk r0tjd|t|jjdS)Nzunclosed transport %r)rwarningswarnResourceWarningr<)r(rrr__del__tsz_SelectorTransport.__del__zFatal error on transportc Csyt|tjr=|jjrhtjd||ddn+|jjd|d|d|d|ji|j |dS)Nz%r: %srLTrVrWrcr,) isinstancerZ_FATAL_ERROR_IGNOREr get_debugr r r]rr)r(rbrVrrr _fatal_errorys z_SelectorTransport._fatal_errorcCs|jr dS|jr6|jj|jj|j|js[d|_|jj|j|jd7_|jj|j |dS)NTr) rrclearrryrrr?rr)r(rbrrrrs     z_SelectorTransport._force_closec Csuz|jr|jj|Wd|jjd|_d|_d|_|j}|dk rp|jd|_XdS)N)rrZconnection_lostrr<rrZ_detach)r(rbr*rrrrs        z(_SelectorTransport._call_connection_lostcCs t|jS)N)rr)r(rrrrsz(_SelectorTransport.get_write_buffer_sizei)r"rrmax_size bytearrayrrrrrrrrgr<rZPY34rrrrrrr)r!rrs         rcseZdZdddfddZddZddZdd Zd d Zd d ZddZ ddZ S)r+Ncstj|||||d|_d|_t|j|jj|jj ||jj|jj |j |j |dk r|jjt j|ddS)NF)rr_eof_pausedrrrrrconnection_maderEr _read_readyr_set_result_unless_cancelled)r(rrr,r-r)r*)r!rrrs    z!_SelectorSocketTransport.__init__cCsi|jrtd|jr*tdd|_|jj|j|jjretjd|dS)Nz#Cannot pause_reading() when closingzAlready pausedTz%r pauses reading) rr9rrr?rrr r )r(rrr pause_readings     z&_SelectorSocketTransport.pause_readingcCsg|jstdd|_|jr+dS|jj|j|j|jjrctj d|dS)Nz Not pausedFz%r resumes reading) rr9rrrErrrr r )r(rrrresume_readings    z'_SelectorSocketTransport.resume_readingcCs|jr dSy|jj|j}WnLttfk r@Yntk rt}z|j|dWYdd}~XnnX|r|jj |nT|j j rt j d||jj}|r|j j|jn |jdS)Nz$Fatal read error on socket transportz%r received EOF)rrrIrrKrJrerr data_receivedrrr r eof_receivedr?rr<)r(rGrb keep_openrrrrs  #z$_SelectorSocketTransport._read_readycCsNt|tttfs1tdt|j|jrFtd|sPdS|j r|j t j krxt j d|j d7_ dS|js0y|jj|}WnPttfk rYnStk r}z|j|ddSWYdd}~XnX||d}|sdS|jj|j|j|jj||jdS)Nz1data argument must be a bytes-like object, not %rz%Cannot call write() after write_eof()zsocket.send() raised exception.rz%Fatal write error on socket transport)rbytesr memoryview TypeErrorrr"rr9rr!LOG_THRESHOLD_FOR_CONNLOST_WRITESr warningrrrMrKrJrerrrxr _write_readyextend_maybe_pause_protocol)r(rGrrbrrrwrites4     z_SelectorSocketTransport.writecCs |jr dSy|jj|j}Wnlttfk r@Yntk r}z5|jj|j |jj |j |dWYdd}~XnrX|r|jd|=|j |js|jj|j |j r|jdn|jr|jjtjdS)Nz%Fatal write error on socket transport)rrrMrrKrJrerryrrr_maybe_resume_protocolrrrshutdownrSHUT_WR)r(rrbrrrr s&  #    z%_SelectorSocketTransport._write_readycCs6|jr dSd|_|js2|jjtjdS)NT)rrrrrr)r(rrr write_eof"s    z"_SelectorSocketTransport.write_eofcCsdS)NTr)r(rrr can_write_eof)sz&_SelectorSocketTransport.can_write_eof) r"rrrrrrrrrrrr)r!rr+s   #  r+cseZdZeZdddddfddZdddZddZd d Zd d Z d dZ ddZ ddZ ddZ S)r5NFc stdkrtd|s0tj||}d|ddi} |rY| rY|| d<|j|| } tj|| ||| d|_||_||_ ||_ d|_ |j j d||jjrtjd||jj} nd} |j| dS)Nzstdlib ssl module not availabler/Zdo_handshake_on_connectFr0r3z%r starts SSL handshake)sslr9r Z_create_transport_contextZ wrap_socketrrr_server_hostname_waiter _sslcontextrrupdaterrr r time _on_handshake) r(rr2r,r3r-r/r0r)r*Z wrap_kwargsZsslsock start_time)r!rrr1s*          z_SelectorSslTransport.__init__cCs^|jdkrdS|jjsQ|dk rA|jj|n|jjdd|_dS)N)rrrr)r(rbrrr_wakeup_waiterUs z$_SelectorSslTransport._wakeup_waiterc%Csy|jjWntjk rH|jj|j|j|dSYntjk r||jj |j|j|dSYnt k r}z|jj rt j d|dd|jj|j|jj|j|jj|j|t|trdSWYdd}~XnX|jj|j|jj|j|jj}t|jds|jr|jjtjkrytj||jWnhtk r}zH|jj rt j d|dd|jj|j|dSWYdd}~XnX|jjd|d|jjd|jjd |jd |_d |_ |jj|j|j!d|_"|jj#|j$j%||jj#|j|jj r|jj&|}t j'd ||d dS) Nz%r: SSL handshake failedrLTZcheck_hostnamez1%r: SSL handshake failed on matching the hostnamepeercertcipher compressionZ ssl_objectFz%r: SSL handshake took %.1f msg@@)(rZ do_handshakerSSLWantReadErrorrrErrSSLWantWriteErrorrx BaseExceptionrr rr?ryr<rrreZ getpeercertrrrZ verify_modeZ CERT_NONEZmatch_hostnamerrrr_read_wants_write_write_wants_readrrrrrrr )r(rrbrZdtrrrr_sb               z#_SelectorSslTransport._on_handshakecCsi|jrtd|jr*tdd|_|jj|j|jjretjd|dS)Nz#Cannot pause_reading() when closingzAlready pausedTz%r pauses reading) rr9rrr?rrr r )r(rrrrs     z#_SelectorSslTransport.pause_readingcCsg|jstdd|_|jr+dS|jj|j|j|jjrctj d|dS)Nz Not pausedFz%r resumes reading) rr9rrrErrrr r )r(rrrrs    z$_SelectorSslTransport.resume_readingcCsr|jr dS|jrKd|_|j|jrK|jj|j|jy|jj|j }Wnt t t j fk rYnt jk rd|_|jj|j|jj|j|jYntk r}z|j|dWYdd}~XnmX|r|jj|nSzE|jjr=tjd||jj}|r_tjdWd|jXdS)NFTz!Fatal read error on SSL transportz%r received EOFz?returning true from eof_received() has no effect when using ssl)rrrrrrxrrrIrrKrJrrrrr?rerrrrr r rrr<)r(rGrbrrrrrs4      #z!_SelectorSslTransport._read_readycCs|jr dS|jrTd|_|j|jp8|jsT|jj|j|j|jrAy|j j |j}Wnt t t jfk rd}Ynt jk rd}|jj|jd|_YnYtk r*}z9|jj|j|jj|j|ddSWYdd}~XnX|rA|jd|=|j|js}|jj|j|jr}|jddS)NFrTz"Fatal write error on SSL transport)rrrrrrrErrrrMrKrJrrrryrrerrrr)r(rrbrrrrs8           z"_SelectorSslTransport._write_readycCst|tttfs1tdt|j|s;dS|jrv|jtj krct j d|jd7_dS|j s|j j|j|j|j j||jdS)Nz1data argument must be a bytes-like object, not %rzsocket.send() raised exception.r)rrrrrrr"rrrr rrrrxrrrr)r(rGrrrrs   z_SelectorSslTransport.writecCsdS)NFr)r(rrrrsz#_SelectorSslTransport.can_write_eof)r"rrrrrrrrrrrrrrr)r!rr5-s " ?  " # r5csgeZdZejZdddfddZddZddZddd Z d d Z S) r6Ncstj||||||_|jj|jj||jj|jj|j|j |dk r|jjt j |ddS)N) rr_addressrrrrrErrrr)r(rrr,r7r-r))r!rrrs  z#_SelectorDatagramTransport.__init__cCstdd|jDS)Ncss!|]\}}t|VqdS)N)r).0rGr_rrr 'szC_SelectorDatagramTransport.get_write_buffer_size..)sumr)r(rrrr&sz0_SelectorDatagramTransport.get_write_buffer_sizecCs|jr dSy|jj|j\}}Wnttfk rFYn|tk rz}z|jj|WYdd}~XnHt k r}z|j |dWYdd}~XnX|jj ||dS)Nz&Fatal read error on datagram transport) rrZrecvfromrrKrJrNrerror_receivedrerZdatagram_received)r(rGrarbrrrr)s "#z&_SelectorDatagramTransport._read_readycCst|tttfs1tdt|j|s;dS|jro|d|jfkrotd|jf|j r|jr|j t j krt j d|j d7_ dS|jsy7|jr|jj|n|jj||dSWnttfk r&|jj|j|jYnqtk r^}z|jj|dSWYdd}~Xn9tk r}z|j|ddSWYdd}~XnX|jjt||f|jdS)Nz1data argument must be a bytes-like object, not %rz#Invalid address: must be None or %szsocket.send() raised exception.rz'Fatal write error on datagram transport)rrrrrrr"rrrrrr rrrrMsendtorKrJrrxr _sendto_readyrNrrrerrr)r(rGrarbrrrr7s<    z!_SelectorDatagramTransport.sendtocCs:x|jr|jj\}}y3|jr@|jj|n|jj||Wqttfk r|jj||fPYqt k r}z|j j |dSWYdd}~Xqt k r}z|j |ddSWYdd}~XqXqW|j|js6|jj|j|jr6|jddS)Nz'Fatal write error on datagram transport)rpopleftrrrMrrKrJ appendleftrNrrrerrrryrrr)r(rGrarbrrrr^s*      z(_SelectorDatagramTransport._sendto_ready) r"rr collectionsdequerrrrrrrr)r!rr6s    'r6)"r__all__rr\rrrr%r ImportErrorrrrrrr r r Z coroutinesr logr rrrZ BaseEventLooprZ_FlowControlMixinZ Transportrr+r5r6rrrrs@