Skip to content

Import path: gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fnat

fnat

go
import "gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fnat"

Index

Constants

reserved header cho NATSMarshaler để gửi UUID

go
const FountainBrokersUUIDHdr = "_brokers_message_uuid"

go
const KPackageName = "fmqtt"

go
const PackageName = "fnat"

TermSignal nếu duration này được trả về, event sẽ bị term

go
const TermSignal = time.Duration(-1)

Variables

go
var GetFountainInstance = Lib.GetFountainInstance

go
var GetFountainManager = Lib.GetFountainManager

Sử dụng khi config instance ở dạng key:value; Nếu config instance ở dạng key:array thì sử dụng hàm InstallFountainInstances Nếu config ở dạng key:array thì sẽ chỉ install config phần tử đầu tiên mà thôi

Install with config format <key>:<value>; eg: fnat:<value>

Usage:

config.yaml:

	fnat:
	  name: default_name
	  ...

	code.go

	fnat.InstallFountainInstance()

 fnat.WithConfigKey("fnat").InstallFountainInstance()
go
var InstallFountainInstance = Lib.InstallFountainInstance

Sử dụng khi config instance ở dạng key:value; Nếu config ở dạng key:array thì sẽ chỉ install config phần tử đầu tiên mà thôi

Install with config format <key>:<value>; eg: fnat:<value>

Usage:

config.yaml:

	fnat:
	  name: default_name
	  ...

	code.go

	fnat.InstallFountainInstanceOnce()

 fnat.WithConfigKey("fnat").InstallFountainInstanceOnce()
go
var InstallFountainInstanceOnce = Lib.InstallFountainInstanceOnce

Sử dụng khi config instance ở dạng key:array<value>; Sẽ luôn cố gắng khởi tạo kể cả khi config ở dạng key:value

Install with config format <key>:array<value>; eg: fnat:array<value>

Usage:

config.yaml:

fnat:
  - name: default_name
    ...

code.go

fnat.InstallFountainInstances()

fnat.WithConfigKey("fnat").InstallFountainInstances()
go
var InstallFountainInstances = Lib.InstallFountainInstances

Truy cập thẳng tới bộ quản lý thư viện

go
var Lib = lib_3rd.NewLib(newClient, lib_3rd.WithDefaultConfigFunc[Config, client](DefaultConfig))

go
var WithConfigKey = Lib.WithConfigKey

func WithAddr

go
func WithAddr(addr string) lib_3rd.Option[Config]

func WithAllowReconnect

go
func WithAllowReconnect(allowReconnect bool) lib_3rd.Option[Config]

func WithAsyncErrorCB

go
func WithAsyncErrorCB(cb nats.ErrHandler) lib_3rd.Option[Config]

func WithClosedCB

go
func WithClosedCB(cb nats.ConnHandler) lib_3rd.Option[Config]

func WithConfig

go
func WithConfig(conf *Config) lib_3rd.Option[Config]

WithAddrs set addresses

func WithDisconnectedErrCB

go
func WithDisconnectedErrCB(cb nats.ConnErrHandler) lib_3rd.Option[Config]

func WithDiscoveredServersCB

go
func WithDiscoveredServersCB(cb nats.ConnHandler) lib_3rd.Option[Config]

func WithEnvironment

go
func WithEnvironment(env string) lib_3rd.Option[Config]

func WithLogger

go
func WithLogger(logger flog.FlogInf) lib_3rd.Option[Config]

func WithMaxReconnect

go
func WithMaxReconnect(max int) lib_3rd.Option[Config]

func WithName

go
func WithName(name string) lib_3rd.Option[Config]

func WithNatOpts

go
func WithNatOpts(opts ...nats.Option) lib_3rd.Option[Config]

func WithNkeyFilePath

go
func WithNkeyFilePath(path string) lib_3rd.Option[Config]

func WithPingInterval

go
func WithPingInterval(interval time.Duration) lib_3rd.Option[Config]

func WithReconnectWait

go
func WithReconnectWait(wait time.Duration) lib_3rd.Option[Config]

func WithReconnectedCB

go
func WithReconnectedCB(cb nats.ConnHandler) lib_3rd.Option[Config]

func WithTimeout

go
func WithTimeout(timeout time.Duration) lib_3rd.Option[Config]

func WithToken

go
func WithToken(token string) lib_3rd.Option[Config]

func WithUserInfo

go
func WithUserInfo(username, password string) lib_3rd.Option[Config]

func WithUserJWT

go
func WithUserJWT(userCB nats.UserJWTHandler, sigCB nats.SignatureHandler) lib_3rd.Option[Config]

type Config

go
type Config struct {
    lib_3rd.BaseConfig `conf:",squash"`
    Addr               string        `conf:"addr" json:"addr,omitempty"` // Using `,` to split addrs
    Username           string        `conf:"username" json:"username,omitempty"`
    Password           string        `conf:"password" json:"password,omitempty"`
    AllowReconnect     bool          `conf:"allow_reconnect" json:"allow_reconnect,omitempty"`
    MaxReconnect       int           `conf:"max_reconnect" json:"max_reconnect,omitempty"`
    ReconnectWait      time.Duration `conf:"reconnect_wait" json:"reconnect_wait,omitempty"` // example: "10s"
    Timeout            time.Duration `conf:"timeout" json:"timeout,omitempty"`               // example: "10s"
    PingInterval       time.Duration `conf:"ping_interval" json:"ping_interval,omitempty"`   // example: "10s"
    NkeyFilePath       string        `conf:"nkey_file_path" json:"nkey_file_path,omitempty"`
    Token              string        `conf:"token" json:"token,omitempty"`

    NatsOptions []nats.Option `conf:"-"`

    // Marshaler là marshaler dùng để chuyển đổi message giữa brokers và wire formats
    Marshaler Marshaler `conf:"-"`

    JetStreamConfig *JetStreamConfig `conf:"jetstream_config" json:"jetstream_config,omitempty"`

    // SubjectCalculator là một hàm được sử dụng để chuyển đổi một chủ đề thành một mảng các chủ đề khi tạo (mặc định là chủ đề là Chính và queueGroupPrefix là QueueGroup)
    SubjectCalculator SubjectCalculator `conf:"-"`

    SubscriberConfig subscriberConfig `conf:"subscriber_config" json:"subscriber_config,omitempty"`
}

func DefaultConfig

go
func DefaultConfig() *Config

func (*Config) Validate

go
func (conf *Config) Validate() error

type Connection

go
type Connection interface {
    // QueueSubscribe subscribes to a NATS subject, equivalent to default Subscribe if queuegroup not supplied.
    QueueSubscribe(string, string, nats.MsgHandler) (*nats.Subscription, error)
    // PublishMsg sends the provided NATS message to the broker.
    PublishMsg(*nats.Msg) error
    // Drain will end all active subscription interest and attempt to wait for in-flight messages to process before closing.
    Drain() error
    // Close will close the connection
    Close()
}

type ConsumerStreamConfigurator

go
type ConsumerStreamConfigurator func(string, string) jetstream.ConsumerConfig

type Delay

go
type Delay interface {
    // WaitTime trả về time.Duration mà cần chờ.
    // retryNum là số lần WaitTime được gọi cho
    // message cụ thể
    WaitTime(retryNum uint64) time.Duration
}

type DurableCalculator

go
type DurableCalculator = func(string, string) string

type GobMarshaler

GobMarshaler là marshaller sử dụng Gob để mã hóa brokers messages.

go
type GobMarshaler struct{}

func (GobMarshaler) Marshal

go
func (GobMarshaler) Marshal(topic string, msg *brokers.Message) (*nats.Msg, error)

Marshal chuyển đổi một brokers message thành định dạng gob.

func (GobMarshaler) Unmarshal

go
func (GobMarshaler) Unmarshal(natsMsg *nats.Msg) (*brokers.Message, error)

Unmarshal trích xuất một brokers message từ một nats message.

type JSONMarshaler

JSONMarshaler sử dụng encoding/json để mã hóa brokers messages.

go
type JSONMarshaler struct{}

func (JSONMarshaler) Marshal

go
func (JSONMarshaler) Marshal(topic string, msg *brokers.Message) (*nats.Msg, error)

Marshal chuyển đổi một brokers message thành định dạng JSON.

func (JSONMarshaler) Unmarshal

go
func (JSONMarshaler) Unmarshal(natsMsg *nats.Msg) (*brokers.Message, error)

Unmarshal trích xuất một brokers message từ một nats message.

type JetStreamConfig

JetStreamConfig chứa các thiết lập cấu hình cụ thể để chạy ở chế độ JetStream

go
type JetStreamConfig struct {
    // Disabled kiểm soát việc có sử dụng semantics JetStream hay không
    Disabled bool

    // AutoProvision chỉ ra rằng ứng dụng nên tạo stream được cấu hình nếu thiếu trên broker
    AutoProvision bool

    // ConnectOptions chứa các tùy chọn cụ thể của JetStream để sử dụng khi thiết lập context
    ConnectOptions []nats.JSOpt

    // SubscribeOptions chứa các tùy chọn để sử dụng khi thiết lập subscriptions
    SubscribeOptions []nats.SubOpt

    // PublishOptions chứa các tùy chọn để gửi trong mỗi thao tác publish
    PublishOptions []nats.PubOpt

    // TrackMsgId sử dụng tùy chọn Nats.MsgId với msg UUID để ngăn ngừa trùng lặp (cần cho xử lý exactly once)
    TrackMsgId bool

    // AckAsync cho phép xác nhận không đồng bộ
    AckAsync bool

    // DurablePrefix là tiền tố được sử dụng để tạo tên durable từ topic.
    //
    // Theo mặc định, tiền tố sẽ được sử dụng riêng để tạo tên durable. Điều này chỉ cho phép sử dụng
    // một subscription duy nhất cho mỗi cấu hình. Để linh hoạt hơn, cung cấp một DurableCalculator
    // sẽ nhận tiền tố durable + topic.
    //
    // Subscriptions cũng có thể chỉ định một “durable name” sẽ tồn tại sau khi client khởi động lại.
    // Durable subscriptions khiến server theo dõi số thứ tự message cuối cùng được xác nhận
    // cho một client và durable name. Khi client khởi động lại/resubscribe,
    // và sử dụng cùng một client ID và durable name, server sẽ tiếp tục gửi bắt đầu
    // từ message chưa được xác nhận sớm nhất cho subscription durable này.
    //
    // Làm điều này khiến server JetStream theo dõi
    // message cuối cùng được xác nhận cho ClientID + Durable đó.
    DurablePrefix string

    // DurableCalculator là một hàm tùy chỉnh được sử dụng để tạo tên durable từ topic + durable prefix
    DurableCalculator DurableCalculator
}

func (JetStreamConfig) CalculateDurableName

go
func (c JetStreamConfig) CalculateDurableName(topic string) string

func (JetStreamConfig) ShouldAutoProvision

go
func (c JetStreamConfig) ShouldAutoProvision() bool

ShouldAutoProvision kiểm tra xem có nên tự động tạo tài nguyên (stream) hay không. Hàm này trả về true nếu cấu hình không bị vô hiệu hóa và cờ AutoProvision được bật.

Usage:

config := JetStreamConfig{Disabled: false, AutoProvision: true}
shouldProvision := config.ShouldAutoProvision()
fmt.Println(shouldProvision) // Output: true

type Marshaler

Marshaler cung cấp các hàm mã hóa cho transport

go
type Marshaler interface {
    // Marshal chuyển đổi một brokers message thành định dạng wire của NATS.
    Marshal(topic string, msg *brokers.Message) (*nats.Msg, error)
    // Unmarshal tạo ra một brokers message từ định dạng wire của NATS.
    Unmarshal(*nats.Msg) (*brokers.Message, error)
}

type MaxRetryDelay

MaxRetryDelay delay trả về cùng một time.Duration đến một giới hạn trước khi gửi term

go
type MaxRetryDelay struct {
    StaticDelay
    // contains filtered or unexported fields
}

func NewMaxRetryDelay

go
func NewMaxRetryDelay(delay time.Duration, retryLimit uint64) MaxRetryDelay

func (MaxRetryDelay) WaitTime

go
func (s MaxRetryDelay) WaitTime(retryNum uint64) time.Duration

type Message

go
type Message struct {
    Subject    string `json:"topic"`
    Reply      string `json:"reply"`
    ObjectJSON string `json:"object_json"`
}

func NAT2Message

go
func NAT2Message(msg *nats.Msg) *Message

func (Message) ToNAT

go
func (o Message) ToNAT() *nats.Msg

type NATSMarshaler

NATSMarshaler sử dụng NATS header để mã hóa trực tiếp giữa brokers và NATS formats. UUID của brokers được lưu trữ tại _brokers_message_uuid

go
type NATSMarshaler struct{}

func (*NATSMarshaler) Marshal

go
func (*NATSMarshaler) Marshal(topic string, msg *brokers.Message) (*nats.Msg, error)

Marshal chuyển đổi một brokers message thành định dạng JSON.

func (*NATSMarshaler) Unmarshal

go
func (*NATSMarshaler) Unmarshal(natsMsg *nats.Msg) (*brokers.Message, error)

Unmarshal trích xuất một brokers message từ một nats message.

type Publisher

Publisher cung cấp triển khai nats cho các thao tác publish của brokers

go
type Publisher struct {
    // contains filtered or unexported fields
}

func (*Publisher) Close

go
func (p *Publisher) Close() error

Close đóng publisher và kết nối bên dưới

func (*Publisher) Publish

go
func (p *Publisher) Publish(topic string, messages ...*brokers.Message) error

Publish gửi message đến NATS.

Publish sẽ không trả về cho đến khi nhận được ack từ JetStream. Khi một trong các message bị lỗi khi gửi - hàm sẽ bị gián đoạn.

type PublisherStream

PublisherStream cung cấp một implement brokers publisher cho NATS JetStream

go
type PublisherStream struct {
    // contains filtered or unexported fields
}

func (*PublisherStream) Close

go
func (p *PublisherStream) Close() error

Close đóng publisher và kết nối cơ bản của nó

func (*PublisherStream) Publish

go
func (p *PublisherStream) Publish(topic string, messages ...*brokers.Message) error

Publish gửi các brokers messages được cung cấp đến topic đã cho.

type ResourceInitializer

go
type ResourceInitializer func(ctx context.Context, js jetstream.JetStream, topic string) (jetstream.Consumer, func(context.Context, flog.FlogInf), error)

func EphemeralConsumerStream

go
func EphemeralConsumerStream() ResourceInitializer

EphemeralConsumerStream xây dựng một callback để tạo một consumer, trả về một hàm sẽ được dùng để xóa consumer do broker quản lý.

func ExistingConsumerStream

go
func ExistingConsumerStream(consumerNamer ConsumerStreamConfigurator, group string) ResourceInitializer

ExistingConsumerStream dùng để kết nối tới stream/consumer đã tồn tại với tên topic cho trước - sẽ không cố gắng tạo bất kỳ tài nguyên nào do broker quản lý. Nó nhận một hàm để chuyển đổi topic thành tên consumer, truyền nil sẽ gọi hành vi mặc định consumerName := fmt.Sprintf("brokers__%s", topic)

func GroupedConsumerStream

go
func GroupedConsumerStream(groupName string) ResourceInitializer

GroupedConsumerStream xây dựng một callback để tạo một consumer trong group cho trước. Hàm đóng không được trả về vì một subscription duy nhất trong group không thể biết khi nào consumer hỗ trợ nên bị xóa.

type StaticDelay

StaticDelay delay luôn trả về cùng một time.Duration

go
type StaticDelay struct {
    Delay time.Duration
}

func NewStaticDelay

go
func NewStaticDelay(delay time.Duration) StaticDelay

func (StaticDelay) WaitTime

go
func (s StaticDelay) WaitTime(retryNum uint64) time.Duration

type StreamConfigurator

go
type StreamConfigurator func(string) jetstream.StreamConfig

type SubjectCalculator

SubjectCalculator là một hàm dùng để tính toán nats subject(s) cho một topic.

go
type SubjectCalculator func(queueGroupPrefix, topic string) *SubjectDetail

type SubjectDetail

SubjectDetail chứa chi tiết jetstream subject (primary + tất cả các subject bổ sung) cùng với tên durable và queue group cho một brokers topic.

go
type SubjectDetail struct {
    Primary    string
    Additional []string
    QueueGroup string
}

func DefaultSubjectCalculator

go
func DefaultSubjectCalculator(queueGroupPrefix, topic string) *SubjectDetail

func (*SubjectDetail) All

go
func (s *SubjectDetail) All() []string

All kết hợp primary và tất cả các subject bổ sung để sử dụng bởi jetstream client khi tạo.

type Subscriber

Subscriber cung cấp triển khai nats cho các hoạt động subscribe của brokers

go
type Subscriber struct {
    // contains filtered or unexported fields
}

func (*Subscriber) Close

go
func (s *Subscriber) Close() error

Close đóng publisher và kết nối cơ bản. Nó sẽ cố gắng chờ đợi các message đang xử lý hoàn thành.

func (*Subscriber) Subscribe

go
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)

Subscribe subscribe các message từ JetStream.

func (*Subscriber) SubscribeInitialize

go
func (s *Subscriber) SubscribeInitialize(topic string) error

SubscribeInitialize cung cấp một cách để đảm bảo stream cho một topic tồn tại trước khi subscribe

type SubscriberStream

SubscriberStream cung cấp một interface brokers subscriber cho NATS JetStream

go
type SubscriberStream struct {
    // contains filtered or unexported fields
}

func (*SubscriberStream) Close

go
func (s *SubscriberStream) Close() error

Close đóng subscriber và báo hiệu để đóng bất kỳ subscription nào nó đã tạo cùng với kết nối cơ bản.

func (*SubscriberStream) Subscribe

go
func (s *SubscriberStream) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)

Subscribe thiết lập một JetStream subscription cho topic đã cho.

func (*SubscriberStream) SubscribeInitialize

go
func (s *SubscriberStream) SubscribeInitialize(topic string) error

SubscribeInitialize cung cấp một cách để đảm bảo stream cho một topic tồn tại trước khi subscribe

Generated by gomarkdoc