Skip to content

Import path: gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fmqp

fmqp

go
import "gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fmqp"

Index

Constants

go
const DefaultMessageUUIDHeaderKey = "_brokers_message_uuid"

go
const (
    KDefaultContentType string = "application/json"
)

go
const KPackageName = "fmqp"

Variables

go
var GetFountainInstance = Lib.GetFountainInstance

go
var GetFountainManager = Lib.GetFountainManager

Sử dụng khi config instance ở dạng key:value; Nếu config instance ở dạng key:array thì sử dụng hàm InstallFountainInstances Nếu config ở dạng key:array thì sẽ chỉ install config phần tử đầu tiên mà thôi

Install with config format <key>:<value>; eg: fmqp:<value>

Usage:

config.yaml:

	fmqp:
	  name: default_name
	  ...

	code.go

	fmqp.InstallFountainInstance()

 fmqp.WithConfigKey("fmqp").InstallFountainInstance()
go
var InstallFountainInstance = Lib.InstallFountainInstance

Sử dụng khi config instance ở dạng key:array<value>; Sẽ luôn cố gắng khởi tạo kể cả khi config ở dạng key:value

Install with config format <key>:array<value>; eg: fmqp:array<value>

Usage:

config.yaml:

fmqp:
  - name: default_name
    ...

code.go

fmqp.InstallFountainInstances()

fmqp.WithConfigKey("fmqp").InstallFountainInstances()
go
var InstallFountainInstances = Lib.InstallFountainInstances

Truy cập thẳng tới bộ quản lý thư viện

go
var Lib = lib_3rd.NewLib(newClient, lib_3rd.WithDefaultConfigFunc[config, Client](DefaultConfig))

go
var WithConfigKey = Lib.WithConfigKey

func GenerateQueueNameTopicName

go
func GenerateQueueNameTopicName(topic string) string

GenerateQueueNameTopicName generates queueName equal to the topic.

func WithAmqpConfig

go
func WithAmqpConfig(amqpConfig *amqp.Config) lib_3rd.Option[config]

WithAmqpConfig sets the AMQP configuration

func WithConfig

go
func WithConfig(conf *config) lib_3rd.Option[config]

WithAddrs set addresses

func WithConsumeConfig

go
func WithConsumeConfig(consumeConfig ConsumeConfig) lib_3rd.Option[config]

WithConsumeConfig sets the consume configuration

func WithDSN

go
func WithDSN(dsn string) lib_3rd.Option[config]

WithDSN sets the DSN

func WithEnvironment

go
func WithEnvironment(env string) lib_3rd.Option[config]

func WithExchangeConfig

go
func WithExchangeConfig(exchangeConfig ExchangeConfig) lib_3rd.Option[config]

WithExchangeConfig sets the exchange configuration

func WithLogger

go
func WithLogger(logger ...flog.FlogInf) lib_3rd.Option[config]

func WithMarshaler

go
func WithMarshaler(marshaler Marshaler) lib_3rd.Option[config]

WithMarshaler sets the marshaler

func WithName

go
func WithName(name string) lib_3rd.Option[config]

func WithPublishConfig

go
func WithPublishConfig(publishConfig PublishConfig) lib_3rd.Option[config]

WithPublishConfig sets the publish configuration

func WithQueueBindConfig

go
func WithQueueBindConfig(queueBindConfig QueueBindConfig) lib_3rd.Option[config]

WithQueueBindConfig sets the queue bind configuration

func WithQueueConfig

go
func WithQueueConfig(queueConfig QueueConfig) lib_3rd.Option[config]

WithQueueConfig sets the queue configuration

func WithReconnectConfig

go
func WithReconnectConfig(reconnectConfig *ReconnectConfig) lib_3rd.Option[config]

WithReconnectConfig sets the reconnect configuration

func WithTLSConfig

go
func WithTLSConfig(tlsConfig *tls.Config) lib_3rd.Option[config]

WithTLSConfig sets the TLS configuration

func WithTopologyBuilder

go
func WithTopologyBuilder(topologyBuilder TopologyBuilder) lib_3rd.Option[config]

WithTopologyBuilder sets the topology builder

type ChannelInf

go
type ChannelInf interface {
    // AMQPChannel trả về AMQP channel cơ bản.
    GetChannel() *amqp.Channel
    // DeliveryConfirmationEnabled trả về true nếu xác nhận delivery của các message đã publish được bật.
    DeliveryConfirmationEnabled() bool
    // Delivered chờ đến khi nhận được xác nhận delivery từ AMQP server và trả về true nếu delivery thành công, ngược lại trả về false. Nếu xác nhận delivery không được bật thì trả về true ngay lập tức.
    Delivered() bool
    // Close đóng channel.
    Close() error
}

type Client

go
type Client struct {
    *amqp.Connection
    // contains filtered or unexported fields
}

func (*Client) Close

go
func (c *Client) Close() error

func (*Client) Closed

go
func (c *Client) Closed() bool

func (*Client) Connected

go
func (c *Client) Connected() chan struct{}

func (*Client) GetConnection

go
func (c *Client) GetConnection() *amqp.Connection

func (*Client) IsConnected

go
func (c *Client) IsConnected() bool

func (*Client) NewDurablePubSubConfig

go
func (c *Client) NewDurablePubSubConfig(generateQueueName QueueNameGenerator) *config

func (*Client) NewDurableQueueConfig

go
func (c *Client) NewDurableQueueConfig() *config

func (*Client) NewNonDurablePubSubConfig

go
func (c *Client) NewNonDurablePubSubConfig(generateQueueName QueueNameGenerator) *config

func (*Client) NewNonDurableQueueConfig

go
func (c *Client) NewNonDurableQueueConfig() *config

func (*Client) NewPublisher

go
func (c *Client) NewPublisher(opts ...lib_3rd.Option[config]) *Publisher

func (*Client) NewSubscriber

go
func (c *Client) NewSubscriber(opts ...lib_3rd.Option[config]) *Subscriber

type ConnectionConfig

go
type ConnectionConfig struct {
    AmqpURI string

    TLSConfig  *tls.Config
    AmqpConfig *amqp.Config

    Reconnect *ReconnectConfig
}

type ConsumeConfig

go
type ConsumeConfig struct {
    // When true, message will be not requeued when nacked.
    NoRequeueOnNack bool `conf:"no_requeue_on_nack" json:"no_requeue_on_nack,omitempty"`

    // The consumer is identified by a string that is unique and scoped for all
    // consumers on this channel.  If you wish to eventually cancel the consumer, use
    // the same non-empty identifier in Channel.Cancel.  An empty string will cause
    // the library to generate a unique identity.  The consumer identity will be
    // included in every Delivery in the ConsumerTag field
    Consumer string `conf:"consumer" json:"consumer,omitempty"`

    // When exclusive is true, the server will ensure that this is the sole consumer
    // from this queue. When exclusive is false, the server will fairly distribute
    // deliveries across multiple consumers.
    Exclusive bool `conf:"exclusive" json:"exclusive,omitempty"`

    // The noLocal flag is not supported by RabbitMQ.
    NoLocal bool `conf:"no_local" json:"no_local,omitempty"`

    // When noWait is true, do not wait for the server to confirm the request and
    // immediately begin deliveries.  If it is not possible to consume, a channel
    // exception will be raised and the channel will be closed.
    NoWait bool `conf:"no_wait" json:"no_wait,omitempty"`

    QosConfig QosConfig `conf:",squash"`

    // Optional arguments can be provided that have specific semantics for the queue
    // or server.
    Arguments amqp.Table `conf:"-"`
}

type DefaultTopologyBuilder

go
type DefaultTopologyBuilder struct{}

func (*DefaultTopologyBuilder) BuildTopology

go
func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, opts ...lib_3rd.Option[config]) error

func (DefaultTopologyBuilder) ExchangeDeclare

go
func (builder DefaultTopologyBuilder) ExchangeDeclare(channel *amqp.Channel, exchangeName string, opts ...lib_3rd.Option[config]) error

type ExchangeConfig

go
type ExchangeConfig struct {
    // Each exchange belongs to one of a set of exchange kinds/types implemented by
    // the server. The exchange types define the functionality of the exchange - i.e.
    // how messages are routed through it. Once an exchange is declared, its type
    // cannot be changed.  The common types are "direct", "fanout", "topic" and
    // "headers".
    Type string `conf:"type" json:"type,omitempty"`

    // GenerateName is generated based on the topic provided for Publish or Subscribe method.
    //
    // Exchange names starting with "amq." are reserved for pre-declared and
    // standardized exchanges. The client MAY declare an exchange starting with
    // "amq." if the passive option is set, or the exchange already exists.  Names can
    // consist of a non-empty sequence of letters, digits, hyphen, underscore,
    // period, or colon.
    GenerateName func(topic string) string `conf:"-"`

    // Exchanges declared as `internal` do not accept accept publishings. Internal
    // exchanges are useful when you wish to implement inter-exchange topologies
    // that should not be exposed to users of the broker.
    Internal bool `conf:"internal" json:"internal,omitempty"`

    // Exclusive queues are only accessible by the connection that declares them and
    // will be deleted when the connection closes.  Channels on other connections
    // will receive an error when attempting  to declare, bind, consume, purge or
    // delete a queue with the same name.
    Exclusive bool `conf:"exclusive" json:"exclusive,omitempty"`

    // Durable and Non-Auto-Deleted queues will survive server restarts and remain
    // when there are no remaining consumers or bindings.  Persistent publishings will
    // be restored in this queue on server restart.  These queues are only able to be
    // bound to durable exchanges.
    Durable bool `conf:"durable" json:"durable,omitempty"`

    // Non-Durable and Auto-Deleted exchanges will be deleted when there are no
    // remaining bindings and not restored on server restart.  This lifetime is
    // useful for temporary topologies that should not pollute the virtual host on
    // failure or after the consumers have completed.
    //
    // Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
    // running including when there are no remaining bindings.  This is useful for
    // temporary topologies that may have long delays between bindings.
    AutoDeleted bool `conf:"auto_deleted" json:"auto_deleted,omitempty"`

    // When noWait is true, the queue will assume to be declared on the server.  A
    // channel exception will arrive if the conditions are met for existing queues
    // or attempting to modify an existing queue from a different connection.
    NoWait bool `conf:"no_wait" json:"no_wait,omitempty"`

    // Optional amqp.Table of arguments that are specific to the server's implementation of
    // the exchange can be sent for exchange types that require extra parameters.
    Arguments amqp.Table `conf:"-"`
}

type Marshaler

Marshaler chuyển đổi message của brokers thành amqp.Publishing và ngược lại từ amqp.Delivery thành message của brokers.

go
type Marshaler interface {
    Marshal(msg *brokers.Message) (amqp.Publishing, error)
    Unmarshal(amqpMsg amqp.Delivery) (*brokers.Message, error)
}

type MarshalerCorrelating

MarshalerCorrelating sẽ truyền UUID qua AMQP native correlation ID thay vì dưới dạng header

go
type MarshalerCorrelating struct {
    // PostProcessPublishing có thể được sử dụng để thực hiện một số xử lý bổ sung với amqp.Publishing,
    // vd thêm CorrelationId và ContentType:
    //
    //  amqp.DefaultMarshaler{
    //		PostProcessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing {
    //			publishing.CorrelationId = "correlation"
    //			publishing.ContentType = "application/json"
    //
    //			return publishing
    //		},
    //	}
    PostProcessPublishing func(amqp.Publishing) amqp.Publishing

    // Khi true, DeliveryMode sẽ không được đặt thành Persistent.
    //
    // DeliveryMode Transient có nghĩa là thông lượng cao hơn, nhưng message sẽ không được
    // khôi phục khi broker khởi động lại. Delivery mode của publishings không liên quan
    // đến độ bền của các hàng đợi mà chúng cư trú. Các message Transient sẽ không được khôi phục
    // vào các hàng đợi bền vững, các message Persistent sẽ được khôi phục vào các hàng đợi bền vững
    // và bị mất trên các hàng đợi không bền vững trong quá trình khởi động lại server.
    NotPersistentDeliveryMode bool
}

func (MarshalerCorrelating) Marshal

go
func (cm MarshalerCorrelating) Marshal(msg *brokers.Message) (amqp.Publishing, error)

func (MarshalerCorrelating) Unmarshal

go
func (cm MarshalerCorrelating) Unmarshal(amqpMsg amqp.Delivery) (*brokers.Message, error)

type MarshalerDefault

go
type MarshalerDefault struct {
    // PostProcessPublishing có thể được sử dụng để xử lý thêm với amqp.Publishing,
    // vd thêm CorrelationId và ContentType:
    //
    //  fmqp.MarshalerDefault{
    //      PostProcessPublishing: func(publishing amqp.Publishing) amqp.Publishing {
    //          publishing.CorrelationId = "correlation"
    //          publishing.ContentType = "application/json"
    //
    //          return publishing
    //      },
    //  }
    PostProcessPublishing func(amqp.Publishing) amqp.Publishing

    // Khi true, DeliveryMode sẽ không được đặt thành Persistent.
    //
    // DeliveryMode Transient có nghĩa là thông lượng cao hơn, nhưng message sẽ không được
    // khôi phục khi broker khởi động lại. Delivery mode của publishings không liên quan
    // đến độ bền của các hàng đợi chứa chúng. Các message Transient sẽ không được
    // khôi phục vào các hàng đợi bền, các message Persistent sẽ được khôi phục vào
    // các hàng đợi bền và bị mất trên các hàng đợi không bền trong quá trình khởi động lại server.
    NotPersistentDeliveryMode bool

    // Header được sử dụng để lưu trữ và đọc UUID của message.
    //
    // Nếu giá trị trống, giá trị DefaultMessageUUIDHeaderKey sẽ được sử dụng.
    // Nếu header không tồn tại, giá trị trống sẽ được truyền dưới dạng UUID của message.
    MessageUUIDHeaderKey string
}

func (MarshalerDefault) Marshal

go
func (d MarshalerDefault) Marshal(msg *brokers.Message) (amqp.Publishing, error)

func (MarshalerDefault) Unmarshal

go
func (d MarshalerDefault) Unmarshal(amqpMsg amqp.Delivery) (*brokers.Message, error)

type MessageQueue

MessageQueue func;

go
type MessageQueue struct {
    MessageType string `json:"message_type"`
    Data        any    `json:"data"`
}

type PublishConfig

go
type PublishConfig struct {
    // GenerateRoutingKey is generated based on the topic provided for Publish.
    GenerateRoutingKey func(topic string) string `conf:"-"`

    // Publishings can be undeliverable when the mandatory flag is true and no queue is
    // bound that matches the routing key, or when the immediate flag is true and no
    // consumer on the matched queue is ready to accept the delivery.
    Mandatory bool `conf:"mandatory" json:"mandatory,omitempty"`

    // Publishings can be undeliverable when the mandatory flag is true and no queue is
    // bound that matches the routing key, or when the immediate flag is true and no
    // consumer on the matched queue is ready to accept the delivery.
    Immediate bool `conf:"immediate" json:"immediate,omitempty"`

    // With transactional enabled, all messages wil be added in transaction.
    Transactional bool `conf:"transactional" json:"transactional,omitempty"`

    // ChannelPoolSize specifies the size of a channel pool. All channels in the pool are opened when the publisher is
    // created. When a Publish operation is performed then a channel is taken from the pool to perform the operation and
    // then returned to the pool once the operation has finished. If all channels are in use then the Publish operation
    // waits until a channel is returned to the pool.
    // If this value is set to 0 (default) then channels are not pooled and a new channel is opened/closed for every
    // Publish operation.
    ChannelPoolSize int `conf:"channel_pool_size" json:"channel_pool_size,omitempty"`

    // ConfirmDelivery indicates whether the Publish function should wait until a confirmation is received from
    // the AMQP server in order to guarantee that the message is delivered. Setting this value to true may
    // negatively impact performance but will increase reliability.
    ConfirmDelivery bool `conf:"confirm_delivery" json:"confirm_delivery,omitempty"`
}

type Publisher

go
type Publisher struct {
    *Client
    // contains filtered or unexported fields
}

func (*Publisher) Close

go
func (p *Publisher) Close() error

func (*Publisher) Publish

go
func (p *Publisher) Publish(topic string, messages ...*brokers.Message) (err error)

Publish gửi các message đến AMQP broker. Publish sẽ chặn cho đến khi broker nhận và lưu message. Publish luôn an toàn với các thread.

Topic của brokers trong Publish không được ánh xạ đến topic của AMQP, nhưng tùy thuộc vào cấu hình có thể được ánh xạ đến exchange, queue hoặc routing key. Để mô tả chi tiết về ánh xạ danh pháp, vui lòng kiểm tra đoạn "Nomenclature" trong file doc.go.

type QosConfig

Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.

go
type QosConfig struct {
    // With a prefetch count greater than zero, the server will deliver that many
    // messages to consumers before acknowledgments are received.  The server ignores
    // this option when consumers are started with noAck because no acknowledgments
    // are expected or sent.
    //
    // In order to defeat that we can set the prefetch count with the value of 1.
    // This tells RabbitMQ not to give more than one message to a worker at a time.
    // Or, in other words, don't dispatch a new message to a worker until it has
    // processed and acknowledged the previous one.
    // Instead, it will dispatch it to the next worker that is not still busy.
    PrefetchCount int `conf:"prefetch_count" json:"prefetch_count,omitempty"`

    // With a prefetch size greater than zero, the server will try to keep at least
    // that many bytes of deliveries flushed to the network before receiving
    // acknowledgments from the consumers.  This option is ignored when consumers are
    // started with noAck.
    PrefetchSize int `conf:"prefetch_size" json:"prefetch_size,omitempty"`

    // When global is true, these Qos settings apply to all existing and future
    // consumers on all channels on the same connection.  When false, the Channel.Qos
    // settings will apply to all existing and future consumers on this channel.
    //
    // Please see the RabbitMQ Consumer Prefetch documentation for an explanation of
    // how the global flag is implemented in RabbitMQ, as it differs from the
    // AMQP 0.9.1 specification in that global Qos settings are limited in scope to
    // channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
    Global bool `conf:"global" json:"global,omitempty"`
}

type QueueBindConfig

QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.

go
type QueueBindConfig struct {
    GenerateRoutingKey func(topic string) string `conf:"-"`

    // When noWait is false and the queue could not be bound, the channel will be
    // closed with an error.
    NoWait bool `conf:"no_wait" json:"no_wait,omitempty"`

    // Optional amqpe.Table of arguments that are specific to the server's implementation of
    // the queue bind can be sent for queue bind types that require extra parameters.
    Arguments amqp.Table `conf:"-"`
}

type QueueConfig

go
type QueueConfig struct {
    // GenerateRoutingKey is generated based on the topic provided for Subscribe.
    GenerateName QueueNameGenerator `conf:"-"`

    // Exclusive queues are only accessible by the connection that declares them and
    // will be deleted when the connection closes.  Channels on other connections
    // will receive an error when attempting  to declare, bind, consume, purge or
    // delete a queue with the same name.
    Exclusive bool `conf:"exclusive" json:"exclusive,omitempty"`

    // Durable and Non-Auto-Deleted queues will survive server restarts and remain
    // when there are no remaining consumers or bindings.  Persistent publishings will
    // be restored in this queue on server restart.  These queues are only able to be
    // bound to durable exchanges.
    Durable bool `conf:"durable" json:"durable,omitempty"`

    // Non-Durable and Auto-Deleted exchanges will be deleted when there are no
    // remaining bindings and not restored on server restart.  This lifetime is
    // useful for temporary topologies that should not pollute the virtual host on
    // failure or after the consumers have completed.
    //
    // Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
    // running including when there are no remaining bindings.  This is useful for
    // temporary topologies that may have long delays between bindings.
    AutoDelete bool `conf:"auto_delete" json:"auto_delete,omitempty"`

    // When noWait is true, the queue will assume to be declared on the server.  A
    // channel exception will arrive if the conditions are met for existing queues
    // or attempting to modify an existing queue from a different connection.
    NoWait bool `conf:"no_wait" json:"no_wait,omitempty"`

    // Optional amqpe.Table of arguments that are specific to the server's implementation of
    // the queue can be sent for queue types that require extra parameters.
    Arguments amqp.Table `conf:"-"`
}

type QueueNameGenerator

QueueNameGenerator generates QueueName based on the topic.

go
type QueueNameGenerator func(topic string) string

func GenerateQueueNameConstant

go
func GenerateQueueNameConstant(queueName string) QueueNameGenerator

GenerateQueueNameConstant generate queue name equal to queueName.

func GenerateQueueNameTopicNameWithSuffix

go
func GenerateQueueNameTopicNameWithSuffix(suffix string) QueueNameGenerator

GenerateQueueNameTopicNameWithSuffix generates queue name equal to:

topic + "_" + suffix

type ReconnectConfig

go
type ReconnectConfig struct {
    BackoffInitialInterval     time.Duration `conf:"backoff_initial_interval" json:"backoff_initial_interval,omitempty"`
    BackoffRandomizationFactor float64       `conf:"backoff_randomization_factor" json:"backoff_randomization_factor,omitempty"`
    BackoffMultiplier          float64       `conf:"backoff_multiplier" json:"backoff_multiplier,omitempty"`
    BackoffMaxInterval         time.Duration `conf:"backoff_max_interval" json:"backoff_max_interval,omitempty"`
}

func DefaultReconnectConfig

go
func DefaultReconnectConfig() *ReconnectConfig

type Subscriber

go
type Subscriber struct {
    *Client
    // contains filtered or unexported fields
}

func (*Subscriber) Close

go
func (s *Subscriber) Close() error

Close đóng tất cả các subscriptions với các kênh đầu ra của chúng.

func (*Subscriber) Subscribe

go
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)

Subscribe tiêu thụ messages từ AMQP broker.

Topic của brokers trong Subscribe không được ánh xạ tới topic của AMQP, nhưng tùy thuộc vào cấu hình có thể được ánh xạ tới exchange, queue hoặc routing key. Để mô tả chi tiết về ánh xạ danh pháp, vui lòng kiểm tra đoạn "Nomenclature" trong file doc.go.

func (*Subscriber) SubscribeInitialize

go
func (s *Subscriber) SubscribeInitialize(topic string) (err error)

type TopologyBuilder

TopologyBuilder chịu trách nhiệm khai báo exchange, queues và queues binding.

TopologyBuilder mặc định là DefaultTopologyBuilder. Nếu cần topology tùy chỉnh, hãy implement TopologyBuilder của riêng và truyền vào amqp.Config:

conf := NewDurablePubSubConfig()
conf.TopologyBuilder = MyProCustomBuilder{}
go
type TopologyBuilder interface {
    BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, opts ...lib_3rd.Option[config]) error
    ExchangeDeclare(channel *amqp.Channel, exchangeName string, opts ...lib_3rd.Option[config]) error
}

Generated by gomarkdoc