ISc@sdZddlZddlZddlZddlZddlZddlZddlZddlZej dfkrddl j Z n ddl Z ddl mZddl mZddl mZddl mZddl mZdd l mZdd l mZdd l mZdd l mZd ZdZejeZdefdYZdefdYZdefdYZ defdYZ!dS(sCore connection objectsiNi(t __version__(tcallback(tchannel(t credentials(t exceptions(tframe(t heartbeat(tutils(tspecs[Pika: Write buffer exceeded warning threshold at %i bytes and an estimated %i frames behindsPika Python Client Libraryt ParameterscBs eZdZeZdZdZejZ dZ dZ dZ dZ dZdZdZeZiZd ZdZd Zd Zd Zd ZdZdZdZdZdZdZdZdZ dZ!dZ"dZ#dZ$dZ%dZ&RS(sBase connection parameters class definition :param str DEFAULT_HOST: 'localhost' :param int DEFAULT_PORT: 5672 :param str DEFAULT_VIRTUAL_HOST: '/' :param str DEFAULT_USERNAME: 'guest' :param str DEFAULT_PASSWORD: 'guest' :param int DEFAULT_HEARTBEAT_INTERVAL: 0 :param int DEFAULT_CHANNEL_MAX: 0 :param int DEFAULT_FRAME_MAX: pika.spec.FRAME_MAX_SIZE :param str DEFAULT_LOCALE: 'en_US' :param int DEFAULT_CONNECTION_ATTEMPTS: 1 :param int|float DEFAULT_RETRY_DELAY: 2.0 :param int|float DEFAULT_SOCKET_TIMEOUT: 0.25 :param bool DEFAULT_SSL: False :param dict DEFAULT_SSL_OPTIONS: {} :param int DEFAULT_SSL_PORT: 5671 :param bool DEFAULT_BACKPRESSURE_DETECTION: False iit localhostten_UStguesti(g@g?i't/cCs|j|_|j|_|j|_|j|_|j|j |j |_ |j |_ |j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_dS(N(tDEFAULT_VIRTUAL_HOSTt virtual_hosttDEFAULT_BACKPRESSURE_DETECTIONtbackpressure_detectiontDEFAULT_CHANNEL_MAXt channel_maxtDEFAULT_CONNECTION_ATTEMPTStconnection_attemptst _credentialstDEFAULT_USERNAMEtDEFAULT_PASSWORDRtDEFAULT_FRAME_MAXt frame_maxtDEFAULT_HEARTBEAT_INTERVALRt DEFAULT_HOSTthosttDEFAULT_LOCALEtlocalet DEFAULT_PORTtporttDEFAULT_RETRY_DELAYt retry_delayt DEFAULT_SSLtssltDEFAULT_SSL_OPTIONSt ssl_optionstDEFAULT_SOCKET_TIMEOUTtsocket_timeout(tself((s3/usr/lib/python2.7/site-packages/pika/connection.pyt__init__Hs             cCs)d|jj|j|j|j|jfS(sERepresent the info about the instance. :rtype: str s+<%s host=%s port=%s virtual_host=%s ssl=%s>(t __class__t__name__RR!RR%(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt__repr__YscCstj||S(sReturn a plain credentials object for the specified username and password. :param str username: The username to use :param str password: The password to use :rtype: pika_credentials.PlainCredentials (tpika_credentialstPlainCredentials(R*tusernametpassword((s3/usr/lib/python2.7/site-packages/pika/connection.pyRcs cCs"t|tstdntS(sValidate that the backpressure detection option is a bool. :param bool backpressure_detection: The backpressure detection value :rtype: bool :raises: TypeError s%backpressure detection must be a bool(t isinstancetboolt TypeErrortTrue(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_backpressurenscCsIt|tstdn|dks6|dkrEtdntS(sValidate that the channel_max value is an int :param int channel_max: The value to validate :rtype: bool :raises: TypeError :raises: ValueError schannel_max must be an intiis$channel_max must be <= 65535 and > 0(R3tintR5t ValueErrorR6(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_channel_maxzs cCs=t|tstdn|dkr9tdntS(sValidate that the channel_max value is an int :param int connection_attempts: The value to validate :rtype: bool :raises: TypeError :raises: ValueError s"connection_attempts must be an intis'connection_attempts must be None or > 0(R3R8R5R9R6(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_connection_attemptss  cCs>x$tjD]}t||r tSq WtdtjdS(sValidate the credentials passed in are using a valid object type. :param pika.credentials.Credentials credentials: Credentials to validate :rtype: bool :raises: TypeError s)Credentials must be an object of type: %rN(R/t VALID_TYPESR3R6R5(R*Rtcredential_type((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_credentialss cCsXt|tstdn|tjkr9tjn|tjkrTtjnt S(s Validate that the frame_max value is an int and does not exceed the maximum frame size and is not less than the frame min size. :param int frame_max: The value to validate :rtype: bool :raises: TypeError :raises: InvalidMinimumFrameSize sframe_max must be an int( R3R8R5RtFRAME_MIN_SIZERtInvalidMinimumFrameSizetFRAME_MAX_SIZEtInvalidMaximumFrameSizeR6(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_frame_maxs   cCs=t|tstdn|dkr9tdntS(sValidate that the heartbeat_interval value is an int :param int heartbeat_interval: The value to validate :rtype: bool :raises: TypeError :raises: ValueError sheartbeat must be an intisheartbeat_interval must >= 0(R3R8R5R9R6(R*theartbeat_interval((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_heartbeat_intervals  cCs"t|tstdntS(sValidate that the host value is an str :param str|unicode host: The value to validate :rtype: bool :raises: TypeError s!host must be a str or unicode str(R3t basestringR5R6(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_hostscCs"t|tstdntS(sValidate that the locale value is an str :param str locale: The value to validate :rtype: bool :raises: TypeError slocale must be a str(R3RFR5R6(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_localescCs"t|tstdntS(sValidate that the port value is an int :param int port: The value to validate :rtype: bool :raises: TypeError sport must be an int(R3R8R5R6(R*R!((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_portscCs7tt|tt|tgs3tdntS(sValidate that the retry_delay value is an int or float :param int|float retry_delay: The value to validate :rtype: bool :raises: TypeError s"retry_delay must be a float or int(tanyR3R8tfloatR5R6(R*R#((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_retry_delayscCsRtt|tt|tgs3tdn|dksNtdntS(sValidate that the socket_timeout value is an int or float :param int|float socket_timeout: The value to validate :rtype: bool :raises: TypeError s%socket_timeout must be a float or intissocket_timeout must be > 0(RJR3R8RKR5R9R6(R*R)((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_socket_timeouts  cCs"t|tstdntS(sValidate the SSL toggle is a bool :param bool ssl: The SSL enabled/disabled value :rtype: bool :raises: TypeError sssl must be a bool(R3R4R5R6(R*R%((s3/usr/lib/python2.7/site-packages/pika/connection.pyt _validate_sslscCs/t|t r+|dk r+tdntS(sValidate the SSL options value is a dictionary. :param dict|None ssl_options: SSL Options to validate :rtype: bool :raises: TypeError s'ssl_options must be either None or dictN(R3tdicttNoneR5R6(R*R'((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_ssl_optionsscCs"t|tstdntS(sValidate that the virtual_host value is an str :param str virtual_host: The value to validate :rtype: bool :raises: TypeError svirtual_host must be a str(R3RFR5R6(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_validate_virtual_hosts('R-t __module__t__doc__tFalseRRRRRARRRRRR R"R(R$R&tDEFAULT_SSL_PORTRRR+R.RR7R:R;R>RCRERGRHRIRLRMRNRQRR(((s3/usr/lib/python2.7/site-packages/pika/connection.pyR "sD         tConnectionParameterscBsAeZdZdddddddddddddddZRS(sConnection parameters object that is passed into the connection adapter upon construction. :param str host: Hostname or IP Address to connect to :param int port: TCP port to connect to :param str virtual_host: RabbitMQ virtual host to use :param pika.credentials.Credentials credentials: auth credentials :param int channel_max: Maximum number of channels to allow :param int frame_max: The maximum byte size for an AMQP frame :param int heartbeat_interval: How often to send heartbeats :param bool ssl: Enable SSL :param dict ssl_options: Arguments passed to ssl.wrap_socket as :param int connection_attempts: Maximum number of retry attempts :param int|float retry_delay: Time to wait in seconds, before the next :param int|float socket_timeout: Use for high latency networks :param str locale: Set the locale value :param bool backpressure_detection: Toggle backpressure detection cCsEtt|j|s4|j|j|j}n|rU|j|rU||_n|dk r||j |r|||_ n|r|j |r||_ n|r|j |r||_n|dk r|j|r||_n|dk r |j|r ||_n| r-|j| r-| |_n|dk rT|j|rT||_n|dk r{|j|r{||_n| r|j| r| pt|_n| dk r|j| r| |_n| dk r|j| r| |_n| dk r|j | r| |_!n|dk rA|j"|rA||_#ndS(sCreate a new ConnectionParameters instance. :param str host: Hostname or IP Address to connect to :param int port: TCP port to connect to :param str virtual_host: RabbitMQ virtual host to use :param pika.credentials.Credentials credentials: auth credentials :param int channel_max: Maximum number of channels to allow :param int frame_max: The maximum byte size for an AMQP frame :param int heartbeat_interval: How often to send heartbeats :param bool ssl: Enable SSL :param dict ssl_options: Arguments passed to ssl.wrap_socket :param int connection_attempts: Maximum number of retry attempts :param int|float retry_delay: Time to wait in seconds, before the next :param int|float socket_timeout: Use for high latency networks :param str locale: Set the locale value :param bool backpressure_detection: Toggle backpressure detection N($tsuperRWR+RRRRGRRPRIR!RRRR>RR:RRCRRHRRERRNR%RQROR'R;RRLR#RMR)R7R(R*RR!RRRRRDR%R'RR#R)RR((s3/usr/lib/python2.7/site-packages/pika/connection.pyR+@sH!                 N(R-RSRTRPR+(((s3/usr/lib/python2.7/site-packages/pika/connection.pyRW,st URLParameterscBs eZdZdZdZRS(s^Connect to RabbitMQ via an AMQP URL in the format:: amqp://username:password@host:port/[?query-string] Ensure that the virtual host is URI encoded when specified. For example if you are using the default "/" virtual host, the value should be `%2f`. Valid query string values are: - backpressure_detection: Toggle backpressure detection, possible values are `t` or `f` - channel_max: Override the default maximum channel count value - connection_attempts: Specify how many times pika should try and reconnect before it gives up - frame_max: Override the default maximum frame size for communication - heartbeat_interval: Specify the number of seconds between heartbeat frames to ensure that the link between RabbitMQ and your application is up - locale: Override the default `en_US` locale value - ssl: Toggle SSL, possible values are `t`, `f` - ssl_options: Arguments passed to :meth:`ssl.wrap_socket` - retry_delay: The number of seconds to sleep before attempting to connect on connection failure. - socket_timeout: Override low level socket timeout value :param str url: The AMQP URL to connect to cCs$tt|j|j|dS(sUCreate a new URLParameters instance. :param str url: The URL value N(RXRYR+t _process_url(R*turl((s3/usr/lib/python2.7/site-packages/pika/connection.pyR+sc Cs|dd!dkr$d|d}ntj|}|jdkrNt|_n|j|jro|j|_n|js|jr|jr|jn|j |_qn!|j |jr|j|_n|j dk rt j|j |j|_nt|jdkr|j|_n@|jjd}tj|d}|j|rW||_ntj|j}x~|jD]p}||jd||<||jrt||||d=Z?d>Z@d?ZAeBd@ZCdYdAZDdBZEdCZFdDZGdEZHdFZIdGZJdHZKdIZLdJZMdKZNdLZOdMZPdNZQdYdYdOZRdPZSdQZTdRZUdSZVdTZWdYdUZXdVZYdWZZdXZ[RS(Zs%This is the core class that implements communication with RabbitMQ. This class should not be invoked directly but rather through the use of an adapter such as SelectConnection or BlockingConnection. :param pika.connection.Parameters parameters: Connection parameters :param method on_open_callback: Called when the connection is opened :param method on_open_error_callback: Called if the connection cant be opened :param method on_close_callback: Called when the connection is closed t_on_connection_backpressuret_on_connection_closedt_on_connection_errort_on_connection_openiiiiiiicCstj|_|jjd|j|p-|jt|rJ|j|n|r`|j|n|plt |_ |j |j dS(sConnection initialization expects an object that has implemented the Parameters class and a callback function to notify when we have successfully connected to the AMQP Broker. Available Parameters classes are the ConnectionParameters class and URLParameters class. :param pika.connection.Parameters parameters: Connection parameters :param method on_open_callback: Called when the connection is opened :param method on_open_error_callback: Called if the connection cant be opened :param method on_close_callback: Called when the connection is closed iN( RtCallbackManagert callbackstaddtON_CONNECTION_ERRORRxRUtadd_on_open_callbacktadd_on_close_callbackRWtparamst_init_connection_statetconnect(R*t parameterston_open_callbackton_open_error_callbackton_close_callback((s3/usr/lib/python2.7/site-packages/pika/connection.pyR+0s  cCs |jjd|j|tdS(sCall method "callback" when pika believes backpressure is being applied. :param method callback_method: The method to call iN(R{R|tON_CONNECTION_BACKPRESSURERU(R*tcallback_method((s3/usr/lib/python2.7/site-packages/pika/connection.pytadd_backpressure_callbackZscCs |jjd|j|tdS(sAdd a callback notification when the connection has closed. The callback will be passed the connection, the reply_code (int) and the reply_text (str), if sent by the remote server. :param method callback_method: Callback to call on close iN(R{R|tON_CONNECTION_CLOSEDRU(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyRdscCs |jjd|j|tdS(sAdd a callback notification when the connection has opened. :param method callback_method: Callback to call when open iN(R{R|tON_CONNECTION_OPENRU(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyR~nscCsE|r%|jjd|j|jn|jjd|j|tdS(sfAdd a callback notification when the connection can not be opened. The callback method should accept the connection object that could not connect, and an optional error message. :param method callback_method: Callback to call when can't connect :param bool remove_default: Remove default exception raising callback iN(R{tremoveR}RxR|RU(R*Rtremove_default((s3/usr/lib/python2.7/site-packages/pika/connection.pytadd_on_open_error_callbackvs  cCs tdS(s+Adapters should override to call the callback after the specified number of seconds have elapsed, using a timer, or a thread, or similar. :param int deadline: The number of seconds to wait to call callback :param method callback_method: The callback method N(tNotImplementedError(R*tdeadlineR((s3/usr/lib/python2.7/site-packages/pika/connection.pyt add_timeouts cCsW|s|j}n|j|||j|<|j||j|j|j|S(sCreate a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers. :param method on_open_callback: The callback when the channel is opened :param int channel_number: The channel number to use, defaults to the next available. :rtype: pika.channel.Channel (t_next_channel_numbert_create_channelt _channelst_add_channel_callbackstopen(R*Rtchannel_number((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs   isNormal shutdowncCs~|js|jrdS|jr2|j||n|j|jtjd||||f|_|jsz|j ndS(sDisconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel. :param int reply_code: The code number for the close :param str reply_text: The text reason for the close NsClosing connection (%s): %s( t is_closingt is_closedt_has_open_channelst_close_channelst_set_connection_statetCONNECTION_CLOSINGtLOGGERtinfotclosingt_on_close_ready(R*t reply_codet reply_text((s3/usr/lib/python2.7/site-packages/pika/connection.pytcloses   cCs|j|j|j}|s,|jS|jd8_tjd|j|jrtjd|jj |j |jj |j n>|j j d|j||||jj|_|j|jdS(sInvoke if trying to reconnect to a RabbitMQ server. Constructing the Connection object should connect on its own. is#Could not connect, %i attempts leftsRetrying in %i secondsiN(RtCONNECTION_INITt_adapter_connectt _on_connectedtremaining_connection_attemptsRtwarningRRR#RRR{tprocessR}RtCONNECTION_CLOSED(R*terror((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs     cCs tdS(sAdapters should override to call the callback after the specified number of seconds have elapsed, using a timer, or a thread, or similar. :param method callback_method: The callback to remove a timeout for N(R(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pytremove_timeoutsi cCs ||_dS(sAlter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback. :param int value: The multiplier value to set N(t _backpressure(R*tvalue((s3/usr/lib/python2.7/site-packages/pika/connection.pytset_backpressure_multiplierscCs|j|jkS(sK Returns a boolean reporting the current connection state. (tconnection_stateR(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyRscCs|j|jkS(sK Returns a boolean reporting the current connection state. (RR(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyRscCs|j|jkS(sK Returns a boolean reporting the current connection state. (RtCONNECTION_OPEN(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pytis_openscCs|jjdtS(seSpecifies if the server supports basic.nack on the active connection. :rtype: bool s basic.nack(tserver_capabilitiestgetRU(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt basic_nackscCs|jjdtS(sSpecifies if the server supports consumer cancel notification on the active connection. :rtype: bool tconsumer_cancel_notify(RRRU(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyR scCs|jjdtS(srSpecifies if the active connection supports exchange to exchange bindings. :rtype: bool texchange_exchange_bindings(RRRU(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs cCs|jjdtS(scSpecifies if the active connection can use publisher confirmations. :rtype: bool tpublisher_confirms(RRRU(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyR!scCs tdS(stSubclasses should override to set up the outbound socket connection. :raises: NotImplementedError N(R(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyR.scCs tdS(sSubclasses should override this to cause the underlying transport (socket) to close. :raises: NotImplementedError N(R(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_adapter_disconnect6scCs#|jj|tjj|jdS(sAdd the appropriate callbacks for the specified channel number. :param int channel_number: The channel number for the callbacks N(R{R|RtChanneltCloseOkt_on_channel_closeok(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyR?s  cCs#|jjdtjj|jdS(s_Add a callback for when a Connection.Start frame is received from the broker. iN(R{R|RRutStartt_on_connection_start(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_add_connection_start_callbackIscCs#|jjdtjj|jdS(s<Add a callback for when a Connection.Tune frame is received.iN(R{R|RRutTunet_on_connection_tune(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_add_connection_tune_callbackPscCs|j|7_dS(srAppend the bytes to the frame buffer. :param str value: The bytes to append to the frame buffer N(t _frame_buffer(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_append_frame_bufferTscCs|jjptjS(sReturn the suggested buffer size from the connection state/tune or the default if that is None. :rtype: int (RRRRA(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt _buffer_size\scCsG|jj|jjftjdd!krCtjtj|ndS(sInvoked when starting a connection to make sure it's a supported protocol. :param pika.frame.Method value: The frame to check :raises: ProtocolVersionMismatch iiN( tmethodt version_majort version_minorRtPROTOCOL_VERSIONRtProtocolVersionMismatchRtProtocolHeader(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_check_for_protocol_mismatchfs cCsTitd6dtjd6itd6td6td6td6td6d 6d d 6td 6S( sHReturn the client properties dictionary. :rtype: dict tproducts Python %stplatformtauthentication_failure_closes basic.nacksconnection.blockedRRt capabilitiessSee http://pika.rtfd.orgt informationtversion(tPRODUCTRtpython_versionR6R(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_client_propertiesss   cCsz|jrjxj|jjD]J}|j|jrI|j|j||q|j|=|jj|qWn t|_dS(sClose the open channels with the specified reply_code and reply_text. :param int reply_code: The code for why the channels are being closed :param str reply_text: The text reason for why the channels are closing N(RRRkRR{tcleanupRO(R*RRR((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs  cCst||p|p|S(sPass in two values, if a is 0, return b otherwise if b is 0, return a. If neither case matches return the smallest value. :param int a: The first value :param int b: The second value :rtype: int (tmin(R*tatb((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_combines cCstjdtdS(s>Attempt to connect to RabbitMQ :rtype: bool s1This method is deprecated, use Connection.connectN(twarningstwarntDeprecationWarning(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_connects cCstj|||S(sCreate a new channel using the specified channel number and calling back the method specified by on_open_callback :param int channel_number: The channel number to use :param method on_open_callback: The callback when the channel is opened (RR(R*RR((s3/usr/lib/python2.7/site-packages/pika/connection.pyRscCsT|jjdk rP|jjdkrPtjd|jjtj||jjSdS(sCreate a heartbeat checker instance if there is a heartbeat interval set. :rtype: pika.heartbeat.Heartbeat isCreating a HeartbeatChecker: %rN(RRRPRtdebugtHeartbeatChecker(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_create_heartbeat_checkers$  cCsn|j|jkrW|j|r=|j|j|jjntjd||jdS|j|jj|S(sDeliver the frame to the channel specified in the frame. :param pika.frame.Method value: The frame to deliver s'Received %r for non-existing channel %iN( RRt_is_basic_deliver_framet_reject_out_of_band_deliveryRt delivery_tagRRt_handle_content_frame(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_deliver_frame_to_channels   cCs|j|j}tg|jD]}t|^q}|||jkrtjt|t |||j j d|j |ndS(sAttempt to calculate if TCP backpressure is being applied due to our outbound buffer being larger than the average frame size over a window of frames. iN( t bytes_sentt frames_senttsumtoutbound_bufferRdRRRtBACKPRESSURE_WARNINGR8R{RR(R*tavg_frame_sizeRt buffer_size((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_detect_backpressures ( cCs|jr|jndS(s*If the connection is not closed, close it.N(RR(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_ensure_closeds cCs tdS(sAdapters should override to flush the contents of outbound_buffer out along the socket. :raises: NotImplementedError N(R(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_flush_outboundscCs|jjtjtjS(saCalculate the maximum amount of bytes that can be in a body frame. :rtype: int (RRRtFRAME_HEADER_SIZEtFRAME_END_SIZE(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_get_body_frame_max_lengthscCsY|jjj|j\}}|s?tj|jjjn|jjj||fS(sGet credentials for authentication. :param pika.frame.MethodFrame method_frame: The Connection.Start frame :rtype: tuple(str, str) (RRt response_forRRtAuthenticationErrortTYPEterase_credentials(R*t method_framet auth_typetresponse((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_get_credentialss cCs0tg|jjD]}|j|j^qS(sBReturns true if channels are open. :rtype: bool (RJRRkR(R*tnum((s3/usr/lib/python2.7/site-packages/pika/connection.pyRscCs|jj|j|jS(sReturn true if there are any callbacks pending for the specified frame. :param pika.frame.Method value: The frame to check :rtype: bool (R{tpendingRR(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_has_pending_callbacksscCs|j|jd|_tjg|_t|_t |_ |j j |_ d|_d|_d|_d|_d|_d|_d|_|jdS(sInitialize or reset all of the internal state variables for a given connection. On disconnect or reconnect all of the state needs to be wiped. ii s Not specifiedN(is Not specified(RRRPtserver_propertiest collectionstdequeRtbytesRRORRRRRtbytes_receivedRtframes_receivedRRRR(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs          cCst|tjjS(sReturns true if the frame is a Basic.Deliver :param pika.frame.Method frame_value: The frame to check :rtype: bool (R3RtBasictDeliver(R*t frame_value((s3/usr/lib/python2.7/site-packages/pika/connection.pyRAscCs |s tSt|jtjjS(sReturns true if the frame is a Connection.Close frame. :param pika.frame.Method value: The frame to check :rtype: bool (RUR3RRRutClose(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_is_connection_close_frameJscCst|tjS(sReturns true if the frame is a method frame. :param pika.frame.Frame value: The frame to evaluate :rtype: bool (R3RtMethod(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_is_method_frameUscCst|tjS(sMReturns True if it's a protocol header frame. :rtype: bool (R3RR(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_is_protocol_header_frame^scCs|jjptj}t|j|kr9tjngt|jj pTdgD])}|d|jj krX|d^qXdS(s^Return the next available channel number or raise on exception. :rtype: int ii( RRRt MAX_CHANNELSRdRRtNoFreeChannelstsortedRk(R*tlimittx((s3/usr/lib/python2.7/site-packages/pika/connection.pyRfs %cCs\y|j|j=Wn$tk r7tjd|jnX|jrX|j rX|jndS(sRemove the channel from the dict of channels when Channel.CloseOk is sent. :param pika.frame.Method method_frame: The response sChannel %r not in channelsN(RRtKeyErrorRRRRR(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyRrs  cCs<|jrtjddS|j|jd|jddS(sCalled when the Connection is in a state that it can close after a close has been requested. This happens, for example, when all of the channels are closed that were open when the close request was made. sInvoked while already closedNii(RRRt_send_connection_closeR(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs  cCs'|j|j|jtjdS(slInvoked when the socket is connected and it's time to start speaking AMQP with the broker. N(RtCONNECTION_PROTOCOLt _send_frameRR(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyRscCs|r3|j|r3|jj|jjf|_n|jrL|jjn|s_|jn|j|jd|jddS(sCalled when the connection is closed remotely. The from_adapter value will be true if the connection adapter has been disconnected from the broker and the method was invoked directly instead of by receiving a Connection.Close frame. :param pika.frame.Method: The Connection.Close frame :param bool from_adapter: Called by the connection adapter iiN( R RRRRRtstopRt_on_disconnect(R*Rt from_adapter((s3/usr/lib/python2.7/site-packages/pika/connection.pyRws    cCstj|p|jjdS(s{Default behavior when the connecting connection can not connect. :raises: exceptions.AMQPConnectionError N(RtAMQPConnectionErrorRR(R*tconnection_unusedt error_message((s3/usr/lib/python2.7/site-packages/pika/connection.pyRxs cCs^|jj|_|jjdtjj|j|j|j |jj d|j ||dS(s This is called once we have tuned the connection with the server and called the Connection.Open on the server and it has replied with Connection.Ok. iN( Rt known_hostsR{R|RRuR RwRRRR(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyRyscCsi|j|j|j|r+tjn|j||j||j|j|j |dS(sThis is called as a callback once we have received a Connection.Start from the server. :param pika.frame.Method method_frame: The frame received :raises: UnexpectedFrameError N( RtCONNECTION_STARTR RtUnexpectedFrameErrorRt_set_server_informationRt_send_connection_start_okR(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs    cCs|j|j|j|jj|jj|j_|j|jj|jj|j_|j|jj|jj|j_|j|_ |j |_|j |j dS(sFOnce the Broker sends back a Connection.Tune, we will set our tuning variables that have been returned to us and kick off the Heartbeat monitor if required, send our TuneOk and then the Connection. Open rpc call on channel 0. :param pika.frame.Method method_frame: The frame received N( RtCONNECTION_TUNERRRRRRRt_body_max_lengthRt_send_connection_tune_okt_send_connection_open(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs  cCsW|j|xC|jrR|j\}}|s5dS|j||j|qWdS(sThis is called by our Adapter, passing in the data from the socket. As long as we have buffer try and map out frame data. :param str data_in: The data that is available to read N(RRt _read_framet_trim_frame_buffert_process_frame(R*tdata_intconsumed_countR((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_on_data_availables   cCstjd|jj|jj|||j|jxa|jjD]P}||jkr`qEnt j |t j j ||}|j|j|qEW|j|||jdS(sInvoke passing in the reply_code and reply_text from internal methods to the adapter. Called from on_connection_closed and Heartbeat timeouts. :param str reply_code: The numeric close code :param str reply_text: The text close reason s,Disconnected from RabbitMQ at %s:%i (%s): %sN(RRRRR!RRRRkRR RRR t _on_closet$_process_connection_closed_callbackst_remove_connection_callbacks(R*RRRR((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs   cCsE|j|rA|j|rA|jj|j|j||tStS(sProcess the callbacks for the frame if the frame is a method frame and if it has any callbacks pending. :param pika.frame.Method frame_value: The frame to process :rtype: bool (R RR{RRRR6RU(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_process_callbacksscCs&|jjd|j||||dS(sProcess any callbacks that should be called when the connection is closed. :param str reason_code: The numeric code from RabbitMQ for the close :param str reason_text: The text reason fro closing iN(R{RR(R*t reason_codet reason_text((s3/usr/lib/python2.7/site-packages/pika/connection.pyR-.scCs|jdkrdS|jd7_|j|r5dSt|tjrp|jr`|jjqtj dn|j dkr|j |ndS(sProcess an inbound frame from the socket. :param frame_value: The frame to process :type frame_value: pika.frame.Frame | pika.frame.Method iNis4Received heartbeat frame without a heartbeat checker( t frame_typeRR/R3Rt HeartbeatRtreceivedRRRR(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyR(9s cCstj|jS(soTry and read from the frame buffer and decode a frame. :rtype tuple: (int, pika.frame.Frame) (Rt decode_frameR(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyR&WscCs3tjd|||j|tjj|dS(sReject a delivery on the specified channel number and delivery tag because said channel no longer exists. :param int channel_number: The channel number :param int delivery_tag: The delivery tag s0Rejected out-of-band delivery on channel %i (%s)N(RRt _send_methodRRtReject(R*RR((s3/usr/lib/python2.7/site-packages/pika/connection.pyR_s  cCs|jjt||dS(sRemove the specified method_frame callback if it is set for the specified channel number. :param int channel_number: The channel number to remove the callback on :param pika.object.Method: The method frame for the callback N(R{Rtstr(R*RR((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_remove_callbackkscCs%x|D]}|j||qWdS(sRemove the callbacks for the specified channel number and list of method frames. :param int channel_number: The channel number to remove the callback on :param list method_frames: The method frames for the callback N(R9(R*Rt method_framesR((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_remove_callbacksus cCs/|jdtjjtjjtjjgdS(s'Remove all callbacks for the connectioniN(R;RRuR RtOpen(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyR.s cCs|r%t|t r%tdn|rstj|sItdnx'|D]}|jj|||qPWn|j||dS(sMake an RPC call for the given callback, channel number and method. acceptable_replies lists out what responses we'll process from the server with the specified callback. :param int channel_number: The channel number for the RPC call :param pika.object.Method method_frame: The method frame to call :param method callback_method: The callback for the RPC response :param list acceptable_replies: The replies this RPC call expects s)acceptable_replies should be list or Nones,callback should be None, function or method.N(R3tlistR5Rt is_callableR{R|R6(R*RRRtacceptable_repliestreply((s3/usr/lib/python2.7/site-packages/pika/connection.pyt_rpcs  cCs;|jdtjj||dd|jtjjgdS(sSend a Connection.Close method frame. :param int reply_code: The reason for the close :param str reply_text: The text reason for the close iN(RARRuR RwR(R*RR((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs!cCs>|jdtjj|jjdt|jtjjgdS(sSend a Connection.Open frameitinsistN( RARRuR<RRR6RytOpenOk(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyR%scCs2|jdtjj|j|||jjdS(sSend a Connection.StartOk frame :param str authentication_type: The auth type value :param str response: The encoded value to send iN(R6RRutStartOkRRR(R*tauthentication_typeR((s3/usr/lib/python2.7/site-packages/pika/connection.pyR!scCs8|jdtjj|jj|jj|jjdS(sSend a Connection.TuneOk frameiN(R6RRutTuneOkRRRR(R*((s3/usr/lib/python2.7/site-packages/pika/connection.pyR$s cCs|jrtjddS|j}|jt|7_|jd7_|jj||j |j j r}|j ndS(sThis appends the fully generated frame to send to the broker to the output buffer which will be then sent via the connection adapter. :param frame_value: The frame to write :type frame_value: pika.frame.Frame|pika.frame.ProtocolHeader s#Attempted to send frame when closedNi( RRtcriticaltmarshalRRdRRtappendRRRR(R*Rtmarshaled_frame((s3/usr/lib/python2.7/site-packages/pika/connection.pyRs     c Cs|jtj||t|ts,dSt|d}|jtj|||d|drttj t ||j }xmt d|D]Y}||j }||j }||kr|}n|jtj ||d||!qWndS(sOConstructs a RPC method frame and then sends it to the broker. :param int channel_number: The channel number for the frame :param pika.object.Method method_frame: The method frame to send :param tuple content: If set, is a content frame, is tuple of properties and body. Nii(RRR R3ttupleRdtHeaderR8tmathtceilRKR#trangetBody( R*RRtcontenttlengthtchunkstchunktstarttend((s3/usr/lib/python2.7/site-packages/pika/connection.pyR6s   "    cCs ||_dS(seSet the connection state. :param int connection_state: The connection state to set N(R(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyRscCsM|jj|_|jjdt|_t|jdrI|jd=ndS(sSet the server properties and capabilities :param spec.connection.Start method_frame: The Connection.Start frame RN(RRRRORthasattr(R*R((s3/usr/lib/python2.7/site-packages/pika/connection.pyR s  cCs#|j||_|j|7_dS(sTrim the leading N bytes off the frame buffer and increment the counter that keeps track of how many bytes have been read/used from the socket. :param int byte_count: The number of bytes consumed N(RR(R*t byte_count((s3/usr/lib/python2.7/site-packages/pika/connection.pyR'sN(\R-RSRTRRR}RRRRRR"RRRPR+RRR~R6RRRRRRRtpropertyRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR R R RRRRRURwRxRyRRR+RR/R-R(R&RR9R;R.RARR%R!R$RR6RR R'(((s3/usr/lib/python2.7/site-packages/pika/connection.pyRus  &                   (                  ("RTRntsysRtloggingRMRRgRt version_infot urllib.parsetparseRatpikaRRRRR/RRRRRRRt getLoggerR-RtobjectR RWRYRu(((s3/usr/lib/python2.7/site-packages/pika/connection.pyts8          _