a dz@s~dZddlZddlmZddlZddlZddlZddlm Z ddl Z ddl Z ddl m Z ddlZddlZddlZe ZdaGdddZd d Ze ed Zd ZGd ddeZGdddZddZGdddeZGdddeZGdddeZ Gddde Z!ddZ"ddZ#d-dd Z$d!d"Z%Gd#d$d$e j&Z'da(da)d%d&Z*d'd(Z+Gd)d*d*ej,Z-Gd+d,d,ej.Z/dS).z"Brian Quinlan (brian@sweetapp.com)N)_base)Queue)partialFc@s,eZdZddZddZddZddZd S) _ThreadWakeupcCsd|_tjdd\|_|_dS)NF)Zduplex)_closedmpZPipe_reader_writerselfr ?/opt/alt/python39/lib64/python3.9/concurrent/futures/process.py__init__Csz_ThreadWakeup.__init__cCs$|js d|_|j|jdSNT)rr closerr r r r rGs z_ThreadWakeup.closecCs|js|jddS)N)rr Z send_bytesr r r r wakeupMsz_ThreadWakeup.wakeupcCs |js|jr|jqdSN)rrZpollZ recv_bytesr r r r clearQs z_ThreadWakeup.clearN)__name__ __module__ __qualname__rrrrr r r r rBsrcCs@datt}|D]\}}|q|D]\}}|q*dSr)_global_shutdownlist_threads_wakeupsitemsrjoin)r_ thread_wakeuptr r r _python_exitWs     r =c@seZdZddZddZdS)_RemoteTracebackcCs ||_dSrtb)r r%r r r rwsz_RemoteTraceback.__init__cCs|jSrr$r r r r __str__ysz_RemoteTraceback.__str__N)rrrrr&r r r r r#vsr#c@seZdZddZddZdS)_ExceptionWithTracebackcCs8tt|||}d|}||_d|j_d||_dS)Nz """ %s""") tracebackformat_exceptiontyperexc __traceback__r%)r r,r%r r r r}s  z _ExceptionWithTraceback.__init__cCst|j|jffSr) _rebuild_excr,r%r r r r __reduce__sz"_ExceptionWithTraceback.__reduce__N)rrrrr/r r r r r'|sr'cCst||_|Sr)r# __cause__)r,r%r r r r.s r.c@seZdZddZdS) _WorkItemcCs||_||_||_||_dSr)futurefnargskwargs)r r2r3r4r5r r r rsz_WorkItem.__init__Nrrrrr r r r r1sr1c@seZdZdddZdS) _ResultItemNcCs||_||_||_dSr)work_id exceptionresult)r r8r9r:r r r rsz_ResultItem.__init__)NNr6r r r r r7sr7c@seZdZddZdS) _CallItemcCs||_||_||_||_dSr)r8r3r4r5)r r8r3r4r5r r r rsz_CallItem.__init__Nr6r r r r r;sr;cs*eZdZdfdd ZfddZZS) _SafeQueuercs&||_||_||_tj||ddS)N)ctx)pending_work_items shutdown_lockrsuperr)r max_sizer=r>r?r __class__r r rsz_SafeQueue.__init__cst|trtt|||j}tdd||_ |j |j d}|j |jWdn1sj0Y|dur|j|nt||dS)Nz """ {}"""r() isinstancer;r)r*r+r-r#formatrr0r>popr8r?rrr2 set_exceptionr@_on_queue_feeder_error)r eobjr% work_itemrBr r rHs (z!_SafeQueue._on_queue_feeder_error)r)rrrrrH __classcell__r r rBr r<sr<cgs,t|}tt||}|s dS|VqdSr)ziptuple itertoolsislice) chunksize iterablesitchunkr r r _get_chunkss rUcsfdd|DS)Ncsg|] }|qSr r ).0r4r3r r rz"_process_chunk..r )r3rTr rWr _process_chunks rYc Cs`z|t|||dWnBtyZ}z*t||j}|t||dWYd}~n d}~00dS)N)r:r9r9)putr7 BaseExceptionr'r-) result_queuer8r:r9rIr,r r r _sendback_results   r^c Cs|dur:z ||Wn$ty8tjjdddYdS0|jdd}|dur`|tdSz|j|j i|j }Wn@ty}z(t ||j }t ||j|dWYd}~nd}~00t ||j|d~~q:dS)NzException in initializer:T)exc_infoblockrZ)r:)r\rZLOGGERZcriticalgetr[osgetpidr3r4r5r'r-r^r8) call_queuer] initializerinitargsZ call_itemrrIr,r r r _process_workers$     &ricsleZdZfddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ ddZ ZS)_ExecutorManagerThreadcsf|j|_|j|_|j|jfdd}t|||_|j|_|j |_ |j |_ |j |_|j|_tdS)NcSs<tjd||Wdn1s.0YdS)Nz?Executor collected: triggering callback for QueueManager wakeup)rutildebugr)rrr?r r r weakref_cbs z3_ExecutorManagerThread.__init__..weakref_cb)_executor_manager_thread_wakeupr_shutdown_lockr?weakrefrefexecutor_reference _processes processes _call_queuere _result_queuer] _work_idswork_ids_queue_pending_work_itemsr>r@r)r executorrmrBr r rs  z_ExecutorManagerThread.__init__cCs||\}}}|r(||dS|durX||~|}|durV|j~|r||j s| dSqdSr) add_call_item_to_queuewait_result_broken_or_wakeupterminate_brokenprocess_result_itemrr_idle_worker_semaphorereleaseis_shutting_downflag_executor_shutting_downr>join_executor_internals)r result_item is_brokencauserzr r r run:s"   z_ExecutorManagerThread.runcCs~|jrdSz|jjdd}Wntjy6YdS0|j|}|jrn|jj t ||j |j |j ddq|j|=qqdS)NFr`T)reZfullrxrbqueueEmptyr>r2Zset_running_or_notify_cancelr[r;r3r4r5)r r8rKr r r r{Zs"    z-_ExecutorManagerThread.add_call_item_to_queuec Cs|jj}|jj}||g}ddt|jD}tj||}d}d}d}||vrz| }d}Wqt y} z t t | | | j}WYd} ~ qd} ~ 00n ||vrd}|j|jWdn1s0Y|||fS)NcSsg|] }|jqSr )sentinelrVpr r r rX{rzG_ExecutorManagerThread.wait_result_broken_or_wakeup..TF)r]rrrrtvaluesrZ connectionwaitZrecvr\r)r*r+r-r?r) r Z result_readerZ wakeup_readerZreadersZworker_sentinelsZreadyrrrrIr r r r|qs&,(z3_ExecutorManagerThread.wait_result_broken_or_wakeupcCsrt|tr2|j|}||jsn|dSn<|j|jd}|durn|jr`|j |jn|j |j dSr) rDintrtrFrrr>r8r9r2rGZ set_resultr:)r rrrKr r r r~s  z*_ExecutorManagerThread.process_result_itemcCs|}tp|dup|jSr)rrr_shutdown_thread)r rzr r r rs z'_ExecutorManagerThread.is_shutting_downcCs|}|dur d|_d|_d}td}|durHtdd|d|_|jD]\}}|j |~qR|j |j D] }|q~|dS)NzKA child process terminated abruptly, the process pool is not usable anymoreTz^A process in the process pool was terminated abruptly while the future was running or pending.z ''' r(z''')rr_brokenrBrokenProcessPoolr#rr0r>rr2rGrrtrZ terminater)r rrzZbper8rKrr r r r}s"   z'_ExecutorManagerThread.terminate_brokencCs|}|dur|d|_|jr|i}|jD]\}}|js*|||<q*||_z|jWqLt j yrYqvYqL0qLd|_dS)NTF) rrr_cancel_pending_futuresr>rr2ZcancelrxZ get_nowaitrr)r rzZnew_pending_work_itemsr8rKr r r rs   z2_ExecutorManagerThread.flag_executor_shutting_downc Csl|}d}||krh|dkrht||D]8}z|jd|d7}Wq,tjybYq Yq,0q,q dS)Nrr!)get_n_children_aliverangereZ put_nowaitrZFull)r Zn_children_to_stopZn_sentinels_sentir r r shutdown_workerss   z'_ExecutorManagerThread.shutdown_workerscCsh||j|j|j|jWdn1sB0Y|jD] }|qVdSr) rrerZ join_threadr?rrtrrr rr r r rs  (z._ExecutorManagerThread.join_executor_internalscCstdd|jDS)Ncss|]}|VqdSr)Zis_aliverr r r rz>_ExecutorManagerThread.get_n_children_alive..)sumrtrr r r r r sz+_ExecutorManagerThread.get_n_children_alive)rrrrrr{r|r~rr}rrrrrLr r rBr rjs +  & rjc Cshtrtrttdaztd}Wnttfy:YdS0|dkrHdS|dkrTdSd|attdS)NTSC_SEM_NSEMS_MAXz@system provides too few semaphores (%d available, 256 necessary))_system_limits_checked_system_limitedNotImplementedErrorrcsysconfAttributeError ValueError)Z nsems_maxr r r _check_system_limitss rccs&|D]}||r|VqqdSr)reverserF)iterableZelementr r r _chain_from_iterable_of_lists,src@s eZdZdS)rN)rrrr r r r r8srcs~eZdZdddZddZddZd d Zd d Zd dZe j jj e_ dddfdd Z dddddZ e j j j e _ ZS)ProcessPoolExecutorNr cCsJt|dur6tpd|_tjdkrntt|j|_n8|dkrHtdn tjdkrh|tkrhtdt||_|dur~t }||_ |j j dddk|_ |durt|std ||_||_d|_i|_d|_t|_td|_d|_d|_i|_d|_t|_|jt }t!||j |j|j|jd |_"d |j"_#|$|_%t&'|_(dS) Nr!Zwin32rz"max_workers must be greater than 0zmax_workers must be <= F)Z allow_noneforkzinitializer must be a callable)rAr=r>r?rT))rrc cpu_count _max_workerssysplatformmin_MAX_WINDOWS_WORKERSrrZ get_context _mp_contextZget_start_method#_safe_to_dynamically_spawn_childrencallable TypeError _initializer _initargs_executor_manager_threadrsr threadingZLockroZ Semaphorerr _queue_countryrrrnEXTRA_QUEUED_CALLSr<ruZ _ignore_epipeZ SimpleQueuervrrrw)r Z max_workersZ mp_contextrfrgZ queue_sizer r r r@sZ         zProcessPoolExecutor.__init__cCs<|jdur8|js|t||_|j|jt|j<dSr)rr_launch_processesrjstartrnrr r r r _start_executor_manager_threads   z2ProcessPoolExecutor._start_executor_manager_threadcCs2|jjddrdSt|j}||jkr.|dS)NF)Zblocking)racquirelenrsr_spawn_process)r Z process_countr r r _adjust_process_counts   z)ProcessPoolExecutor._adjust_process_countcCs$tt|j|jD] }|qdSr)rrrsrr)r rr r r rsz%ProcessPoolExecutor._launch_processescCs8|jjt|j|j|j|jfd}|||j|j <dS)N)targetr4) rZProcessrirurvrrrrspidrr r r rsz"ProcessPoolExecutor._spawn_processcOs|j|jrt|j|jr&tdtr2tdt}t||||}||j |j <|j |j |j d7_ |j |jr|||WdS1s0YdS)Nz*cannot schedule new futures after shutdownz6cannot schedule new futures after interpreter shutdownr!)rorrr RuntimeErrorrrZFuturer1ryrrwr[rnrrrr)r r3r4r5fwr r r submits"   zProcessPoolExecutor.submitr!)timeoutrQcs:|dkrtdtjtt|t|d|i|d}t|S)Nr!zchunksize must be >= 1.rQ)r)rr@maprrYrUr)r r3rrQrRZresultsrBr r rs zProcessPoolExecutor.mapTF)cancel_futurescCs|j0||_d|_|jdur(|jWdn1s<0Y|jdur^|r^|jd|_d|_|jdur|r|j d|_d|_ d|_dSr) rorrrnrrrrurvrrs)r rrr r r shutdowns (  zProcessPoolExecutor.shutdown)NNNr )T)rrrrrrrrrrExecutor__doc__rrrLr r rBr r?s U   r)NN)0 __author__rcZconcurrent.futuresrrZmultiprocessingrZmultiprocessing.connectionZmultiprocessing.queuesrrrp functoolsrrOrr)WeakKeyDictionaryrrrr Z_register_atexitrr Exceptionr#r'r.objectr1r7r;r<rUrYr^riZThreadrjrrrrZBrokenExecutorrrrr r r r .sN       )