Import path:
gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fnat
fnat
import "gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fnat"Index
- Constants
- Variables
- func WithAddr(addr string) lib_3rd.Option[Config]
- func WithAllowReconnect(allowReconnect bool) lib_3rd.Option[Config]
- func WithAsyncErrorCB(cb nats.ErrHandler) lib_3rd.Option[Config]
- func WithClosedCB(cb nats.ConnHandler) lib_3rd.Option[Config]
- func WithConfig(conf *Config) lib_3rd.Option[Config]
- func WithDisconnectedErrCB(cb nats.ConnErrHandler) lib_3rd.Option[Config]
- func WithDiscoveredServersCB(cb nats.ConnHandler) lib_3rd.Option[Config]
- func WithEnvironment(env string) lib_3rd.Option[Config]
- func WithLogger(logger flog.FlogInf) lib_3rd.Option[Config]
- func WithMaxReconnect(max int) lib_3rd.Option[Config]
- func WithName(name string) lib_3rd.Option[Config]
- func WithNatOpts(opts ...nats.Option) lib_3rd.Option[Config]
- func WithNkeyFilePath(path string) lib_3rd.Option[Config]
- func WithPingInterval(interval time.Duration) lib_3rd.Option[Config]
- func WithReconnectWait(wait time.Duration) lib_3rd.Option[Config]
- func WithReconnectedCB(cb nats.ConnHandler) lib_3rd.Option[Config]
- func WithTimeout(timeout time.Duration) lib_3rd.Option[Config]
- func WithToken(token string) lib_3rd.Option[Config]
- func WithUserInfo(username, password string) lib_3rd.Option[Config]
- func WithUserJWT(userCB nats.UserJWTHandler, sigCB nats.SignatureHandler) lib_3rd.Option[Config]
- type Config
- type Connection
- type ConsumerStreamConfigurator
- type Delay
- type DurableCalculator
- type GobMarshaler
- type JSONMarshaler
- type JetStreamConfig
- type Marshaler
- type MaxRetryDelay
- type Message
- type NATSMarshaler
- type Publisher
- type PublisherStream
- type ResourceInitializer
- type StaticDelay
- type StreamConfigurator
- type SubjectCalculator
- type SubjectDetail
- type Subscriber
- type SubscriberStream
Constants
reserved header cho NATSMarshaler để gửi UUID
const FountainBrokersUUIDHdr = "_brokers_message_uuid"const KPackageName = "fmqtt"const PackageName = "fnat"TermSignal nếu duration này được trả về, event sẽ bị term
const TermSignal = time.Duration(-1)Variables
var GetFountainInstance = Lib.GetFountainInstancevar GetFountainManager = Lib.GetFountainManagerSử dụng khi config instance ở dạng key:value; Nếu config instance ở dạng key:array thì sử dụng hàm InstallFountainInstances Nếu config ở dạng key:array thì sẽ chỉ install config phần tử đầu tiên mà thôi
Install with config format <key>:<value>; eg: fnat:<value>
Usage:
config.yaml:
fnat:
name: default_name
...
code.go
fnat.InstallFountainInstance()
fnat.WithConfigKey("fnat").InstallFountainInstance()var InstallFountainInstance = Lib.InstallFountainInstanceSử dụng khi config instance ở dạng key:value; Nếu config ở dạng key:array thì sẽ chỉ install config phần tử đầu tiên mà thôi
Install with config format <key>:<value>; eg: fnat:<value>
Usage:
config.yaml:
fnat:
name: default_name
...
code.go
fnat.InstallFountainInstanceOnce()
fnat.WithConfigKey("fnat").InstallFountainInstanceOnce()var InstallFountainInstanceOnce = Lib.InstallFountainInstanceOnceSử dụng khi config instance ở dạng key:array<value>; Sẽ luôn cố gắng khởi tạo kể cả khi config ở dạng key:value
Install with config format <key>:array<value>; eg: fnat:array<value>
Usage:
config.yaml:
fnat:
- name: default_name
...
code.go
fnat.InstallFountainInstances()
fnat.WithConfigKey("fnat").InstallFountainInstances()var InstallFountainInstances = Lib.InstallFountainInstancesTruy cập thẳng tới bộ quản lý thư viện
var Lib = lib_3rd.NewLib(newClient, lib_3rd.WithDefaultConfigFunc[Config, client](DefaultConfig))var WithConfigKey = Lib.WithConfigKeyfunc WithAddr
func WithAddr(addr string) lib_3rd.Option[Config]func WithAllowReconnect
func WithAllowReconnect(allowReconnect bool) lib_3rd.Option[Config]func WithAsyncErrorCB
func WithAsyncErrorCB(cb nats.ErrHandler) lib_3rd.Option[Config]func WithClosedCB
func WithClosedCB(cb nats.ConnHandler) lib_3rd.Option[Config]func WithConfig
func WithConfig(conf *Config) lib_3rd.Option[Config]WithAddrs set addresses
func WithDisconnectedErrCB
func WithDisconnectedErrCB(cb nats.ConnErrHandler) lib_3rd.Option[Config]func WithDiscoveredServersCB
func WithDiscoveredServersCB(cb nats.ConnHandler) lib_3rd.Option[Config]func WithEnvironment
func WithEnvironment(env string) lib_3rd.Option[Config]func WithLogger
func WithLogger(logger flog.FlogInf) lib_3rd.Option[Config]func WithMaxReconnect
func WithMaxReconnect(max int) lib_3rd.Option[Config]func WithName
func WithName(name string) lib_3rd.Option[Config]func WithNatOpts
func WithNatOpts(opts ...nats.Option) lib_3rd.Option[Config]func WithNkeyFilePath
func WithNkeyFilePath(path string) lib_3rd.Option[Config]func WithPingInterval
func WithPingInterval(interval time.Duration) lib_3rd.Option[Config]func WithReconnectWait
func WithReconnectWait(wait time.Duration) lib_3rd.Option[Config]func WithReconnectedCB
func WithReconnectedCB(cb nats.ConnHandler) lib_3rd.Option[Config]func WithTimeout
func WithTimeout(timeout time.Duration) lib_3rd.Option[Config]func WithToken
func WithToken(token string) lib_3rd.Option[Config]func WithUserInfo
func WithUserInfo(username, password string) lib_3rd.Option[Config]func WithUserJWT
func WithUserJWT(userCB nats.UserJWTHandler, sigCB nats.SignatureHandler) lib_3rd.Option[Config]type Config
type Config struct {
lib_3rd.BaseConfig `conf:",squash"`
Addr string `conf:"addr" json:"addr,omitempty"` // Using `,` to split addrs
Username string `conf:"username" json:"username,omitempty"`
Password string `conf:"password" json:"password,omitempty"`
AllowReconnect bool `conf:"allow_reconnect" json:"allow_reconnect,omitempty"`
MaxReconnect int `conf:"max_reconnect" json:"max_reconnect,omitempty"`
ReconnectWait time.Duration `conf:"reconnect_wait" json:"reconnect_wait,omitempty"` // example: "10s"
Timeout time.Duration `conf:"timeout" json:"timeout,omitempty"` // example: "10s"
PingInterval time.Duration `conf:"ping_interval" json:"ping_interval,omitempty"` // example: "10s"
NkeyFilePath string `conf:"nkey_file_path" json:"nkey_file_path,omitempty"`
Token string `conf:"token" json:"token,omitempty"`
NatsOptions []nats.Option `conf:"-"`
// Marshaler là marshaler dùng để chuyển đổi message giữa brokers và wire formats
Marshaler Marshaler `conf:"-"`
JetStreamConfig *JetStreamConfig `conf:"jetstream_config" json:"jetstream_config,omitempty"`
// SubjectCalculator là một hàm được sử dụng để chuyển đổi một chủ đề thành một mảng các chủ đề khi tạo (mặc định là chủ đề là Chính và queueGroupPrefix là QueueGroup)
SubjectCalculator SubjectCalculator `conf:"-"`
SubscriberConfig subscriberConfig `conf:"subscriber_config" json:"subscriber_config,omitempty"`
}func DefaultConfig
func DefaultConfig() *Configfunc (*Config) Validate
func (conf *Config) Validate() errortype Connection
type Connection interface {
// QueueSubscribe subscribes to a NATS subject, equivalent to default Subscribe if queuegroup not supplied.
QueueSubscribe(string, string, nats.MsgHandler) (*nats.Subscription, error)
// PublishMsg sends the provided NATS message to the broker.
PublishMsg(*nats.Msg) error
// Drain will end all active subscription interest and attempt to wait for in-flight messages to process before closing.
Drain() error
// Close will close the connection
Close()
}type ConsumerStreamConfigurator
type ConsumerStreamConfigurator func(string, string) jetstream.ConsumerConfigtype Delay
type Delay interface {
// WaitTime trả về time.Duration mà cần chờ.
// retryNum là số lần WaitTime được gọi cho
// message cụ thể
WaitTime(retryNum uint64) time.Duration
}type DurableCalculator
type DurableCalculator = func(string, string) stringtype GobMarshaler
GobMarshaler là marshaller sử dụng Gob để mã hóa brokers messages.
type GobMarshaler struct{}func (GobMarshaler) Marshal
func (GobMarshaler) Marshal(topic string, msg *brokers.Message) (*nats.Msg, error)Marshal chuyển đổi một brokers message thành định dạng gob.
func (GobMarshaler) Unmarshal
func (GobMarshaler) Unmarshal(natsMsg *nats.Msg) (*brokers.Message, error)Unmarshal trích xuất một brokers message từ một nats message.
type JSONMarshaler
JSONMarshaler sử dụng encoding/json để mã hóa brokers messages.
type JSONMarshaler struct{}func (JSONMarshaler) Marshal
func (JSONMarshaler) Marshal(topic string, msg *brokers.Message) (*nats.Msg, error)Marshal chuyển đổi một brokers message thành định dạng JSON.
func (JSONMarshaler) Unmarshal
func (JSONMarshaler) Unmarshal(natsMsg *nats.Msg) (*brokers.Message, error)Unmarshal trích xuất một brokers message từ một nats message.
type JetStreamConfig
JetStreamConfig chứa các thiết lập cấu hình cụ thể để chạy ở chế độ JetStream
type JetStreamConfig struct {
// Disabled kiểm soát việc có sử dụng semantics JetStream hay không
Disabled bool
// AutoProvision chỉ ra rằng ứng dụng nên tạo stream được cấu hình nếu thiếu trên broker
AutoProvision bool
// ConnectOptions chứa các tùy chọn cụ thể của JetStream để sử dụng khi thiết lập context
ConnectOptions []nats.JSOpt
// SubscribeOptions chứa các tùy chọn để sử dụng khi thiết lập subscriptions
SubscribeOptions []nats.SubOpt
// PublishOptions chứa các tùy chọn để gửi trong mỗi thao tác publish
PublishOptions []nats.PubOpt
// TrackMsgId sử dụng tùy chọn Nats.MsgId với msg UUID để ngăn ngừa trùng lặp (cần cho xử lý exactly once)
TrackMsgId bool
// AckAsync cho phép xác nhận không đồng bộ
AckAsync bool
// DurablePrefix là tiền tố được sử dụng để tạo tên durable từ topic.
//
// Theo mặc định, tiền tố sẽ được sử dụng riêng để tạo tên durable. Điều này chỉ cho phép sử dụng
// một subscription duy nhất cho mỗi cấu hình. Để linh hoạt hơn, cung cấp một DurableCalculator
// sẽ nhận tiền tố durable + topic.
//
// Subscriptions cũng có thể chỉ định một “durable name” sẽ tồn tại sau khi client khởi động lại.
// Durable subscriptions khiến server theo dõi số thứ tự message cuối cùng được xác nhận
// cho một client và durable name. Khi client khởi động lại/resubscribe,
// và sử dụng cùng một client ID và durable name, server sẽ tiếp tục gửi bắt đầu
// từ message chưa được xác nhận sớm nhất cho subscription durable này.
//
// Làm điều này khiến server JetStream theo dõi
// message cuối cùng được xác nhận cho ClientID + Durable đó.
DurablePrefix string
// DurableCalculator là một hàm tùy chỉnh được sử dụng để tạo tên durable từ topic + durable prefix
DurableCalculator DurableCalculator
}func (JetStreamConfig) CalculateDurableName
func (c JetStreamConfig) CalculateDurableName(topic string) stringfunc (JetStreamConfig) ShouldAutoProvision
func (c JetStreamConfig) ShouldAutoProvision() boolShouldAutoProvision kiểm tra xem có nên tự động tạo tài nguyên (stream) hay không. Hàm này trả về true nếu cấu hình không bị vô hiệu hóa và cờ AutoProvision được bật.
Usage:
config := JetStreamConfig{Disabled: false, AutoProvision: true}
shouldProvision := config.ShouldAutoProvision()
fmt.Println(shouldProvision) // Output: truetype Marshaler
Marshaler cung cấp các hàm mã hóa cho transport
type Marshaler interface {
// Marshal chuyển đổi một brokers message thành định dạng wire của NATS.
Marshal(topic string, msg *brokers.Message) (*nats.Msg, error)
// Unmarshal tạo ra một brokers message từ định dạng wire của NATS.
Unmarshal(*nats.Msg) (*brokers.Message, error)
}type MaxRetryDelay
MaxRetryDelay delay trả về cùng một time.Duration đến một giới hạn trước khi gửi term
type MaxRetryDelay struct {
StaticDelay
// contains filtered or unexported fields
}func NewMaxRetryDelay
func NewMaxRetryDelay(delay time.Duration, retryLimit uint64) MaxRetryDelayfunc (MaxRetryDelay) WaitTime
func (s MaxRetryDelay) WaitTime(retryNum uint64) time.Durationtype Message
type Message struct {
Subject string `json:"topic"`
Reply string `json:"reply"`
ObjectJSON string `json:"object_json"`
}func NAT2Message
func NAT2Message(msg *nats.Msg) *Messagefunc (Message) ToNAT
func (o Message) ToNAT() *nats.Msgtype NATSMarshaler
NATSMarshaler sử dụng NATS header để mã hóa trực tiếp giữa brokers và NATS formats. UUID của brokers được lưu trữ tại _brokers_message_uuid
type NATSMarshaler struct{}func (*NATSMarshaler) Marshal
func (*NATSMarshaler) Marshal(topic string, msg *brokers.Message) (*nats.Msg, error)Marshal chuyển đổi một brokers message thành định dạng JSON.
func (*NATSMarshaler) Unmarshal
func (*NATSMarshaler) Unmarshal(natsMsg *nats.Msg) (*brokers.Message, error)Unmarshal trích xuất một brokers message từ một nats message.
type Publisher
Publisher cung cấp triển khai nats cho các thao tác publish của brokers
type Publisher struct {
// contains filtered or unexported fields
}func (*Publisher) Close
func (p *Publisher) Close() errorClose đóng publisher và kết nối bên dưới
func (*Publisher) Publish
func (p *Publisher) Publish(topic string, messages ...*brokers.Message) errorPublish gửi message đến NATS.
Publish sẽ không trả về cho đến khi nhận được ack từ JetStream. Khi một trong các message bị lỗi khi gửi - hàm sẽ bị gián đoạn.
type PublisherStream
PublisherStream cung cấp một implement brokers publisher cho NATS JetStream
type PublisherStream struct {
// contains filtered or unexported fields
}func (*PublisherStream) Close
func (p *PublisherStream) Close() errorClose đóng publisher và kết nối cơ bản của nó
func (*PublisherStream) Publish
func (p *PublisherStream) Publish(topic string, messages ...*brokers.Message) errorPublish gửi các brokers messages được cung cấp đến topic đã cho.
type ResourceInitializer
type ResourceInitializer func(ctx context.Context, js jetstream.JetStream, topic string) (jetstream.Consumer, func(context.Context, flog.FlogInf), error)func EphemeralConsumerStream
func EphemeralConsumerStream() ResourceInitializerEphemeralConsumerStream xây dựng một callback để tạo một consumer, trả về một hàm sẽ được dùng để xóa consumer do broker quản lý.
func ExistingConsumerStream
func ExistingConsumerStream(consumerNamer ConsumerStreamConfigurator, group string) ResourceInitializerExistingConsumerStream dùng để kết nối tới stream/consumer đã tồn tại với tên topic cho trước - sẽ không cố gắng tạo bất kỳ tài nguyên nào do broker quản lý. Nó nhận một hàm để chuyển đổi topic thành tên consumer, truyền nil sẽ gọi hành vi mặc định consumerName := fmt.Sprintf("brokers__%s", topic)
func GroupedConsumerStream
func GroupedConsumerStream(groupName string) ResourceInitializerGroupedConsumerStream xây dựng một callback để tạo một consumer trong group cho trước. Hàm đóng không được trả về vì một subscription duy nhất trong group không thể biết khi nào consumer hỗ trợ nên bị xóa.
type StaticDelay
StaticDelay delay luôn trả về cùng một time.Duration
type StaticDelay struct {
Delay time.Duration
}func NewStaticDelay
func NewStaticDelay(delay time.Duration) StaticDelayfunc (StaticDelay) WaitTime
func (s StaticDelay) WaitTime(retryNum uint64) time.Durationtype StreamConfigurator
type StreamConfigurator func(string) jetstream.StreamConfigtype SubjectCalculator
SubjectCalculator là một hàm dùng để tính toán nats subject(s) cho một topic.
type SubjectCalculator func(queueGroupPrefix, topic string) *SubjectDetailtype SubjectDetail
SubjectDetail chứa chi tiết jetstream subject (primary + tất cả các subject bổ sung) cùng với tên durable và queue group cho một brokers topic.
type SubjectDetail struct {
Primary string
Additional []string
QueueGroup string
}func DefaultSubjectCalculator
func DefaultSubjectCalculator(queueGroupPrefix, topic string) *SubjectDetailfunc (*SubjectDetail) All
func (s *SubjectDetail) All() []stringAll kết hợp primary và tất cả các subject bổ sung để sử dụng bởi jetstream client khi tạo.
type Subscriber
Subscriber cung cấp triển khai nats cho các hoạt động subscribe của brokers
type Subscriber struct {
// contains filtered or unexported fields
}func (*Subscriber) Close
func (s *Subscriber) Close() errorClose đóng publisher và kết nối cơ bản. Nó sẽ cố gắng chờ đợi các message đang xử lý hoàn thành.
func (*Subscriber) Subscribe
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)Subscribe subscribe các message từ JetStream.
func (*Subscriber) SubscribeInitialize
func (s *Subscriber) SubscribeInitialize(topic string) errorSubscribeInitialize cung cấp một cách để đảm bảo stream cho một topic tồn tại trước khi subscribe
type SubscriberStream
SubscriberStream cung cấp một interface brokers subscriber cho NATS JetStream
type SubscriberStream struct {
// contains filtered or unexported fields
}func (*SubscriberStream) Close
func (s *SubscriberStream) Close() errorClose đóng subscriber và báo hiệu để đóng bất kỳ subscription nào nó đã tạo cùng với kết nối cơ bản.
func (*SubscriberStream) Subscribe
func (s *SubscriberStream) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)Subscribe thiết lập một JetStream subscription cho topic đã cho.
func (*SubscriberStream) SubscribeInitialize
func (s *SubscriberStream) SubscribeInitialize(topic string) errorSubscribeInitialize cung cấp một cách để đảm bảo stream cho một topic tồn tại trước khi subscribe
Generated by gomarkdoc