+e @sdZdgZddlZddlZddlZddlZddlZyddlZWnek rudZYnXddl m Z ddl m Z ddl m Z ddl m Z dd l mZdd l mZdd l mZdd l mZdd lmZddlmZddZGddde jZGdddejejZGdddeZGdddeZGdddeZdS)zEvent loop using a selector and related classes. A selector is a "notify-when-ready" multiplexer. For a subclass which also includes support for signal handling, see the unix_events sub-module. BaseSelectorEventLoopN) base_events)compat) constants)events)futures) selectors) transports)sslproto) coroutine)loggerc CsAy|j|}Wntk r+dSYnXt|j|@SdS)NF)get_keyKeyErrorboolr)selectorfdZeventkeyrd?Z d@dAZ!dBdCZ"dDdEZ#S)FrzJSelector event loop. See events.EventLoop for API specification. NcsUtj|dkr(tj}ntjd|jj||_|j dS)NzUsing selector: %s) super__init__r ZDefaultSelectorr debug __class____name__ _selector_make_self_pipe)selfr)rrrr0s    zBaseSelectorEventLoop.__init__extraservercCst||||||S)N)_SelectorSocketTransport)rsockprotocolwaiterrr rrr_make_socket_transport9sz,BaseSelectorEventLoop._make_socket_transport server_sideFserver_hostnamec Cs{tjs:|j||||d|d|d|d|Stj||||||} t||| d|d|| jS)Nr&r'rr )r Z_is_sslproto_available_make_legacy_ssl_transportZ SSLProtocolr!Z_app_transport) rrawsockr# sslcontextr$r&r'rr Z ssl_protocolrrr_make_ssl_transport>s     z)BaseSelectorEventLoop._make_ssl_transportc Cs"t||||||||| S)N)_SelectorSslTransport) rr)r#r*r$r&r'rr rrrr(Msz0BaseSelectorEventLoop._make_legacy_ssl_transportcCst||||||S)N)_SelectorDatagramTransport)rr"r#addressr$rrrr_make_datagram_transportWs z.BaseSelectorEventLoop._make_datagram_transportcsn|jrtdn|jr+dS|jtj|jdk rj|jjd|_ndS)Nz!Cannot close a running event loop)Z is_running RuntimeError is_closed_close_self_pipercloser)r)rrrr3\s     zBaseSelectorEventLoop.closecCs tdS)N)NotImplementedError)rrrr _socketpairgsz!BaseSelectorEventLoop._socketpaircCsU|j|jj|jjd|_|jjd|_|jd8_dS)Nr) remove_reader_ssockfilenor3_csock _internal_fds)rrrrr2js     z&BaseSelectorEventLoop._close_self_pipecCsg|j\|_|_|jjd|jjd|jd7_|j|jj|jdS)NFr)r5r7r9 setblockingr: add_readerr8_read_from_self)rrrrrrs z%BaseSelectorEventLoop._make_self_pipecCsdS)Nr)rdatarrr_process_self_datazsz(BaseSelectorEventLoop._process_self_datac Csbx[y-|jjd}|s"Pn|j|Wqtk rGwYqtk rYPYqXqWdS)Ni)r7recvr?InterruptedErrorBlockingIOError)rr>rrrr=}s  z%BaseSelectorEventLoop._read_from_selfc Csa|j}|dk r]y|jdWq]tk rY|jrUtjdddnYq]XndS)Nsz3Fail to write a null byte into the self-pipe socketexc_infoT)r9sendOSError_debugr r)rZcsockrrr_write_to_selfs     z$BaseSelectorEventLoop._write_to_selfcCs)|j|j|j||||dS)N)r<r8_accept_connection)rprotocol_factoryr"r*r rrr_start_servingsz$BaseSelectorEventLoop._start_servingc CsDyE|j\}}|jr7tjd|||n|jdWntttfk rbYntk r }z|j t j t j t j t j fkr|jidd6|d6|d6|j|j|jtj|j||||nWYdd}~Xn6Xi|d6}|j|||||} |j| dS)Nz#%r got a new connection from %r: %rFz&socket.accept() out of system resourcemessage exceptionsocketpeername)acceptrFr rr;rBrAConnectionAbortedErrorrEerrnoZEMFILEZENFILEZENOBUFSZENOMEMcall_exception_handlerr6r8Z call_laterrZACCEPT_RETRY_DELAYrJ_accept_connection2Z create_task) rrIr"r*r connaddrexcrrOrrrrHs0       z(BaseSelectorEventLoop._accept_connectionc cs5d}d}y|}tjd|}|r`|j|||d|ddd|d|}n$|j||d|d|d|}y |DdHWn|jYnXWntk r0} zd|jridd6| d 6} |dk r|| d modifycancel) rrcallbackargshandlermaskreaderwriterrrrr<s    z BaseSelectorEventLoop.add_readerc Cs|jrdSy|jj|}Wntk r>dSYn{X|j|j}\}}|tjM}|s|jj|n|jj ||d|f|dk r|j dSdSdS)zRemove a reader callback.FNT) r1rrrrr>r r_ unregisterr`ra)rrrrerfrgrrrr6s     z#BaseSelectorEventLoop.remove_readerc Gs|jtj|||}y|jj|}Wn1tk rh|jj|tjd|fYnVX|j|j }\}}|jj ||tjB||f|dk r|j ndS)zAdd a writer callback..N) r\rr]rrrr^r EVENT_WRITEr>r`ra) rrrbrcrdrrerfrgrrr add_writer s    z BaseSelectorEventLoop.add_writerc Cs|jrdSy|jj|}Wntk r>dSYn{X|j|j}\}}|tjM}|s|jj|n|jj |||df|dk r|j dSdSdS)zRemove a writer callback.FNT) r1rrrrr>r rirhr`ra)rrrrerfrgrrr remove_writers     z#BaseSelectorEventLoop.remove_writercCsV|jr*|jdkr*tdntjd|}|j|d|||S)zReceive data from the socket. The return value is a bytes object representing the data received. The maximum amount of data to be received at once is specified by nbytes. This method is a coroutine. rzthe socket must be non-blockingrWF)rF gettimeout ValueErrorrrY _sock_recv)rr"nfutrrr sock_recv2s zBaseSelectorEventLoop.sock_recvcCs|j}|r"|j|n|jr2dSy|j|}Wnhttfk r~|j||j|d||Yn?tk r}z|j |WYdd}~XnX|j |dS)NT) r8r6 cancelledr@rBrAr<rnrZ set_exception set_result)rrp registeredr"rorr>rVrrrrnAs  # z BaseSelectorEventLoop._sock_recvcCsl|jr*|jdkr*tdntjd|}|r[|j|d||n |jd|S)aSend data to the socket. The socket must be connected to a remote socket. This method continues to send data from data until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. This method is a coroutine. rzthe socket must be non-blockingrWFN)rFrlrmrrY _sock_sendallrt)rr"r>rprrr sock_sendallVs  z"BaseSelectorEventLoop.sock_sendallcCs|j}|r"|j|n|jr2dSy|j|}WnSttfk red}Yn6tk r}z|j|dSWYdd}~XnX|t|kr|j dn8|r||d}n|j ||j |d||dS)NrT) r8rkrrrDrBrArZrslenrtrjrv)rrprur"r>rrorVrrrrvjs"    z#BaseSelectorEventLoop._sock_sendallcCs|jr*|jdkr*tdntjd|}ytj||Wn2tk r}z|j|WYdd}~XnX|j||||S)aConnect to a remote socket at address. The address must be already resolved to avoid the trap of hanging the entire event loop when the address requires doing a DNS lookup. For example, it must be an IP address, not an hostname, for AF_INET and AF_INET6 address families. Use getaddrinfo() to resolve the hostname asynchronously. This method is a coroutine. rzthe socket must be non-blockingrWN) rFrlrmrrYrZ_check_resolved_addressrs _sock_connect)rr"r.rperrrrr sock_connects  z"BaseSelectorEventLoop.sock_connectcCs|j}y|j|Wnttfk ro|jtj|j||j||j |||Yn?t k r}z|j |WYdd}~XnX|j ddS)N) r8ZconnectrBrAZadd_done_callback functoolspartial_sock_connect_donerj_sock_connect_cbrZrsrt)rrpr"r.rrVrrrrys    z#BaseSelectorEventLoop._sock_connectcCs|j|dS)N)rk)rrrprrrr~sz(BaseSelectorEventLoop._sock_connect_donecCs|jrdSyA|jtjtj}|dkrPt|d|fnWnIttfk rkYn?tk r}z|j |WYdd}~XnX|j ddS)NrzConnect call failed %s) rrZ getsockoptrMZ SOL_SOCKETZSO_ERRORrErBrArZrsrt)rrpr"r.rzrVrrrrs   z&BaseSelectorEventLoop._sock_connect_cbcCsS|jr*|jdkr*tdntjd|}|j|d||S)a|Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. This method is a coroutine. rzthe socket must be non-blockingrWF)rFrlrmrrY _sock_accept)rr"rprrr sock_accepts z!BaseSelectorEventLoop.sock_acceptcCs|j}|r"|j|n|jr2dSy#|j\}}|jdWnettfk r|j||j|d|YnEt k r}z|j |WYdd}~XnX|j ||fdS)NFT) r8r6rrrOr;rBrAr<rrZrsrt)rrprur"rrTr.rVrrrrs    z"BaseSelectorEventLoop._sock_acceptcCsx|D]\}}|j|j}\}}|tj@rn|dk rn|jr^|j|qn|j|n|tj@r|dk r|jr|j|q|j|qqWdS)N) fileobjr>r r_Z _cancelledr6Z _add_callbackrirk)rZ event_listrrerrfrgrrr_process_eventss  z%BaseSelectorEventLoop._process_eventscCs!|j|j|jdS)N)r6r8r3)rr"rrr _stop_servingsz#BaseSelectorEventLoop._stop_serving)$r __module__ __qualname____doc__rr%r+r(r/r3r5r2rr?r=rGrJrHr rSr<r6rjrkrqrnrwrvr{ryr~rrrrrrr)rrr*sL          !#               cseZdZdZeZdZddfddZddZdd Z d d Z d d Z e j r~ddZndddZddZddZddZS)_SelectorTransportiNc stj||||jd<|j|jdz<%s> )rrrappendrr_loopr1rrr r_riget_write_buffer_sizejoin)rinforstatebufsizerrr__repr__s*       z_SelectorTransport.__repr__cCs|jddS)N) _force_close)rrrrabort+sz_SelectorTransport.abortcCs|jS)N)r)rrrr is_closing.sz_SelectorTransport.is_closingcCs^|jr dSd|_|jj|j|jsZ|jd7_|jj|jdndS)NTr)rrr6rrr call_soon_call_connection_lost)rrrrr31s   z_SelectorTransport.closecCs7|jdk r3tjd|t|jjndS)Nzunclosed transport %r)rwarningswarnResourceWarningr3)rrrr__del__>sz_SelectorTransport.__del__zFatal error on transportcCst|tttfrF|jjrutjd||ddqun/|jji|d6|d6|d6|j d6|j |dS)Nz%r: %srCTrKrLrXr#) isinstanceBrokenPipeErrorConnectionResetErrorrPr get_debugr rrRrr)rrVrKrrr _fatal_errorCs  z_SelectorTransport._fatal_errorcCs|jr dS|jr9|jj|jj|jn|jsad|_|jj|jn|jd7_|jj|j |dS)NTr) rrclearrrkrrr6rr)rrVrrrrRs     z_SelectorTransport._force_closec Cs{z |jr|jj|nWd|jjd|_d|_d|_|j}|dk rv|jd|_nXdS)N)rrZconnection_lostrr3rrZ_detach)rrVr rrrr^s        z(_SelectorTransport._call_connection_lostcCs t|jS)N)rxr)rrrrrlsz(_SelectorTransport.get_write_buffer_sizei)rrrmax_size bytearrayrrrrrrr3rZPY34rrrrrrr)rrrs      rcseZdZdddfddZddZddZdd Zd d Zd d ZddZ ddZ S)r!Ncstj|||||d|_d|_|jj|jj||jj|jj|j |j |dk r|jjt j |dndS)NF) rr_eof_pausedrrrconnection_mader<r _read_readyr_set_result_unless_cancelled)rrWr"r#r$rr )rrrrrs   z!_SelectorSocketTransport.__init__cCsr|jrtdn|jr0tdnd|_|jj|j|jjrntjd|ndS)Nz#Cannot pause_reading() when closingzAlready pausedTz%r pauses reading) rr0rrr6rrr r)rrrr pause_readings   z&_SelectorSocketTransport.pause_readingcCsm|jstdnd|_|jr.dS|jj|j|j|jjritj d|ndS)Nz Not pausedFz%r resumes reading) rr0rrr<rrrr r)rrrrresume_readings   z'_SelectorSocketTransport.resume_readingcCsy|jj|j}WnLttfk r3Yntk rg}z|j|dWYdd}~XnqX|r|jj|nW|j j rt j d|n|jj }|r|j j|jn |jdS)Nz$Fatal read error on socket transportz%r received EOF)rr@rrBrArZrr data_receivedrrr r eof_receivedr6rr3)rr>rV keep_openrrrrs#z$_SelectorSocketTransport._read_readycCsVt|tttfs0tdt|n|jrHtdn|sRdS|jr|jt j kr}t j dn|jd7_dS|j s8y|jj|}WnPttfk rYnStk r}z|j|ddSWYdd}~XnX||d}|sdS|jj|j|jn|j j||jdS)Nz#data argument must be byte-ish (%r)z%Cannot call write() after write_eof()zsocket.send() raised exception.rz%Fatal write error on socket transport)rbytesr memoryview TypeErrortyperr0rr!LOG_THRESHOLD_FOR_CONNLOST_WRITESr warningrrrDrBrArZrrrjr _write_readyextend_maybe_pause_protocol)rr>rorVrrrwrites4   z_SelectorSocketTransport.writecCsy|jj|j}Wnlttfk r3Yntk r}z5|jj|j|jj |j |dWYdd}~Xn{X|r|jd|=n|j |js|jj|j|j r|j dq|jr|jjtjqndS)Nz%Fatal write error on socket transport)rrDrrBrArZrrkrrr_maybe_resume_protocolrrrshutdownrMSHUT_WR)rrorVrrrrs" #    z%_SelectorSocketTransport._write_readycCs9|jr dSd|_|js5|jjtjndS)NT)rrrrrMr)rrrr write_eofs    z"_SelectorSocketTransport.write_eofcCsdS)NTr)rrrr can_write_eofsz&_SelectorSocketTransport.can_write_eof) rrrrrrrrrrrrr)rrr!ps   #  r!cseZdZeZdddddfddZdddZddZd d Zd d Z d dZ ddZ ddZ ddZ S)r,NFc stdkrtdn|s6tj||}ni|d6dd6} |rd| rd|| drVrrrrrzs0     #z!_SelectorSslTransport._read_readycCs|jrMd|_|j|jp+|jsM|jj|j|jqMn|jr@y|jj |j}Wnt t t j fk rd}Ynt jk rd}|jj|jd|_YnYtk r#}z9|jj|j|jj|j|ddSWYdd}~XnX|r@|jd|=q@n|j|js|jj|j|jr|jdqndS)NFrTz"Fatal write error on SSL transport)rrrrrr<rrrrDrBrArrrrkrrZrrrr)rrorVrrrrs4          z"_SelectorSslTransport._write_readycCst|tttfs0tdt|n|s:dS|jrx|jtjkret j dn|jd7_dS|j s|j j |j|jn|j j||jdS)Nz#data argument must be byte-ish (%r)zsocket.send() raised exception.r)rrrrrrrrrr rrrrjrrrr)rr>rrrrs  z_SelectorSslTransport.writecCsdS)NFr)rrrrrsz#_SelectorSslTransport.can_write_eof)rrrrrrrrrrrrrrrr)rrr,s " ?  ! r,csgeZdZejZdddfddZddZddZddd Z d d Z S) r-Ncstj||||||_|jj|jj||jj|jj|j|j |dk r|jjt j |dndS)N) rr_addressrrrrr<rrrr)rrWr"r#r.r$r)rrrrs  z#_SelectorDatagramTransport.__init__cCstdd|jDS)Ncss!|]\}}t|VqdS)N)rx).0r>_rrr szC_SelectorDatagramTransport.get_write_buffer_size..)sumr)rrrrrsz0_SelectorDatagramTransport.get_write_buffer_sizecCsy|jj|j\}}Wnttfk r9Yn|tk rm}z|jj|WYdd}~XnHtk r}z|j |dWYdd}~XnX|jj ||dS)Nz&Fatal read error on datagram transport) rZrecvfromrrBrArErerror_receivedrZrZdatagram_received)rr>rUrVrrrrs"#z&_SelectorDatagramTransport._read_readycCst|tttfs0tdt|n|s:dS|jrq|d|jfkrqtd|jfn|jr|jr|jt j krt j dn|jd7_dS|j sy7|jr|jj|n|jj||dSWqttfk r+|jj|j|jYqtk rc}z|jj|dSWYdd}~Xqtk r}z|j|ddSWYdd}~XqXn|j jt||f|jdS)Nz#data argument must be byte-ish (%r)z#Invalid address: must be None or %szsocket.send() raised exception.rz'Fatal write error on datagram transport)rrrrrrrrmrrrr rrrrDsendtorBrArrjr _sendto_readyrErrrZrrr)rr>rUrVrrrrs<   z!_SelectorDatagramTransport.sendtocCs@x|jr|jj\}}y3|jr@|jj|n|jj||Wqttfk r|jj||fPYqt k r}z|j j |dSWYdd}~Xqt k r}z|j |ddSWYdd}~XqXqW|j|js<|jj|j|jr<|jdq<ndS)Nz'Fatal write error on datagram transport)rpopleftrrrDrrBrA appendleftrErrrZrrrrkrrr)rr>rUrVrrrrs*      z(_SelectorDatagramTransport._sendto_ready) rrr collectionsdequerrrrrrrr)rrr-s    'r-)r__all__rrQr|rMrr ImportErrorrrrrrr r r Z coroutinesr logr rZ BaseEventLooprZ_FlowControlMixinZ Transportrr!r,r-rrrrs8