RabbitMQ. Parte 3. Comprender las colas y enlaces

Queue(cola): una estructura de datos en el disco o en la RAM que almacena enlaces a mensajes y entrega sus copias consumers(a los consumidores). QueueEs un proceso de Erlang con un estado (donde los mensajes en sí pueden almacenarse en caché). 1 mil colas pueden ocupar unos 80 Mb.


Binding (enlace): una regla que le dice al intercambiador cuáles de las colas deben recibir mensajes.


Tabla de contenido



Colas temporales


Si la cola se crea con el conjunto de parámetros autoDelete, entonces esta cola adquiere la capacidad de eliminarse automáticamente . Estas colas generalmente se crean cuando el primer cliente se conecta y se eliminan cuando todos los clientes se desconectan.


exclusive, . , /, . exclusive , autoDelete .


:


  • ,
  • binding churn. , / . .


durable, /. Queue.Delete.


Highly Available


HA RabbitMQ. , , .


- HA , , HA . - , .


HA .


rabbitmq_26


:


  • HA . HA HA RabbitMQ (2-3 )


RPC . Queue.Declare, :



RabbitMQ.Client:


// ...
channel.QueueDeclare(
    queue: "my_queue",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null
);
// ...

  • queue — , .
  • durable — true, /
  • exclusive — true,
  • autoDelete — true,
  • arguments — . .

arguments

  • x-message-ttl(x-message-time-to-live) — . x-message-ttl, , . x-message-ttl . . . x-message-ttl, .
  • x-expires — . . , , Basic.Cancel . , Basic.Get. , . , .
  • x-max-length — . ,

rabbitmq_25


  • x-max-lenght-bytes — . ( )
  • x-overflow — . : drop-head ( ) reject-publish. drop-head, . reject-publish,
  • x-dead-letter-exchange — exchange, ,
  • x-dead-letter-routing-key —
  • x-max-priority — 255 (RabbitMQ 3.5.0 ). , . ,
  • x-queue-mode — . . . , , ,
  • x-queue-master-locator — ,
  • x-ha-policy — HA . all, . nodes,
  • x-ha-nodes — , HA

rabbitmq_10


, RPC Queue.DeclareOk. ( Queue.Declare), Channel.Close OperationInterruptedException, .


Queue.Declare . , , , .


Queue.Declare , Channel.Close OperationInterruptedException, 403 .


, >= 10 , , GC , , .


Queue


RabbitMQ guest (username: guest password: guest). , guest . Queues Add a new queue. :


rabbitmq_21


Add queues, .


rabbitmq_22


. , consumers, / , .


Binding


RPC . Queue.Bind, :



RabbitMQ.Client:


//...
channel.QueueBind(
    queue: queueName,
    exchange: "my_exchange",
    routingKey: "my_key",
    arguments: null
);
//...

  • queue —
  • exchange —
  • routingKey —
  • arguments —

rabbitmq_11


, RPC Queue.BindOk.


Binding


RabbitMQ guest (username: guest password: guest). , guest . Queues my_queue. bindings:


rabbitmq_23


Bind, :


rabbitmq_24


Code


En esta sección, describimos la cola y el enlace usando el código C #, por lo que si necesitábamos desarrollar una biblioteca. Quizás sea útil para la percepción.


public interface IQueue
    {        
        string Name { get; }

        /// <summary>
        ///       true,  queue   . 
        ///            
        ///       /. 
        ///       false,  queue     , 
        ///      /  
        /// </summary>
        bool IsDurable { get; }

        /// <summary>
        ///        true,  
        ///          
        ///       consumer-
        /// </summary>
        bool IsExclusive { get; }

        /// <summary>
        ///      . 
        ///       ,    .
        /// </summary>
        bool IsAutoDelete { get; }

        /// <summary>
        ///      
        /// </summary>
        IDictionary<string, object> Arguments { get; }
    }

public class Queue : IQueue
    {
        public Queue(
             string name, 
             bool isDurable = true, 
             bool isExclusive = false, 
             bool isAutoDelete = false, 
             IDictionary<string, object> arguments = null)
        {
            Name = name ??
                throw new ArgumentNullException(name, $"{name} must not be null");

            IsDurable = isDurable;
            IsExclusive = isExclusive;
            IsAutoDelete = isAutoDelete;
            Arguments = arguments ?? new Dictionary<string, object>();
        }

        public string Name { get; }
        public bool IsDurable { get; }
        public bool IsExclusive { get; }
        public bool IsAutoDelete { get; }
        public IDictionary<string, object> Arguments { get; }
    }

public static class QueueMode
    {       
        public const string Default = "default";
        /// <summary>
        ///      .     
        ///          ,   
        ///       
        /// </summary>
        public const string Lazy = "lazy";
    }

public interface IBinding
    {
        /// <summary>
        ///     ,    
        /// </summary>
        IExchange Exchange { get; }

        /// <summary>
        ///      
        /// </summary>
        string RoutingKey { get; }

        /// <summary>
        ///      
        /// </summary>
        IDictionary<string, object> Arguments { get; }
    }

public class Binding : IBinding
    {
        public Binding(
             IExchange exchange, 
             string routingKey, 
             IDictionary<string, object> arguments)
        {
            Exchange = exchange;
            RoutingKey = routingKey;
            Arguments = arguments;
        }

        public IExchange Exchange { get; }
        public string RoutingKey { get; }
        public IDictionary<string, object> Arguments { get; }
    }

All Articles