Skip to content

Import path: gitlab.soludian.com/soludian/fountain/libs/brokers/pubsub/gochannel

gochannel

go
import "gitlab.soludian.com/soludian/fountain/libs/brokers/pubsub/gochannel"

Index

type FanOut

FanOut là một component nhận messages từ subscriber và chuyển chúng đến tất cả publishers. Hiệu quả là, messages được "nhân lên".

Một trường hợp sử dụng điển hình của FanOut là có một external subscription và nhiều workers bên trong process.

Cần gọi phương thức AddSubscription cho tất cả các topics mà muốn lắng nghe. Điều này cần được thực hiện *trước* khi bắt đầu FanOut.

FanOut cung cấp interface Subscriber tiêu chuẩn.

go
type FanOut struct {
    // contains filtered or unexported fields
}

func NewFanOut

go
func NewFanOut(subscriber brokers.Subscriber) *FanOut

NewFanOut tạo một FanOut mới.

func (*FanOut) AddSubscription

go
func (f *FanOut) AddSubscription(topic string)

AddSubscription thêm một internal subscription cho topic đã cho. Cần gọi phương thức này với tất cả các topics mà muốn lắng nghe, trước khi FanOut được bắt đầu. AddSubscription là idempotent.

func (*FanOut) Close

go
func (f *FanOut) Close() error

Close đóng Pub/Sub nội bộ của FanOut.

func (*FanOut) IsClosed

go
func (f *FanOut) IsClosed() bool

func (*FanOut) Run

go
func (f *FanOut) Run(ctx context.Context) error

Run chạy FanOut.

func (*FanOut) Running

go
func (f *FanOut) Running() chan struct{}

Running đóng khi FanOut đang chạy.

func (*FanOut) Subscribe

go
func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)

Subscribe bắt đầu subscription đến Pub/Sub nội bộ của FanOut.

type GoChannel

GoChannel là triển khai Pub/Sub đơn giản nhất. Nó dựa trên các kênh của Golang được gửi trong quá trình.

GoChannel không có trạng thái toàn cục, điều đó có nghĩa là cần sử dụng cùng một instance để Publishing và Subscribing!

Khi GoChannel là persistent, thứ tự của các message không được đảm bảo.

go
type GoChannel struct {
    // contains filtered or unexported fields
}

func NewGoChannel

go
func NewGoChannel(opts ...Option) *GoChannel

NewGoChannel tạo GoChannel Pub/Sub mới.

GoChannel này không persistent. Điều đó có nghĩa là nếu gửi một message đến một topic mà không có subscriber nào đăng ký, message đó sẽ bị loại bỏ.

func (*GoChannel) Close

go
func (g *GoChannel) Close() error

Close đóng GoChannel Pub/Sub.

func (*GoChannel) Publish

go
func (g *GoChannel) Publish(topic string, messages ...*brokers.Message) error

Publish trong GoChannel KHÔNG chặn cho đến khi tất cả các consumer tiêu thụ. Các message sẽ được gửi trong nền.

Các message có thể được lưu trữ hoặc không, tùy thuộc vào thuộc tính persistent.

func (*GoChannel) Subscribe

go
func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)

Subscribe trả về kênh mà tất cả các message đã publish được gửi đến. Các message không được lưu trữ. Nếu không có subscriber và message được sản xuất, nó sẽ biến mất.

Không có hỗ trợ nhóm consumer, mỗi consumer sẽ nhận mọi message đã sản xuất.

type Option

go
type Option func(*pubSubConfig)

func WithBlockPublishUntilSubscriberAck

go
func WithBlockPublishUntilSubscriberAck(block ...bool) Option

func WithLogger

go
func WithLogger(logger ...flog.FlogInf) Option

func WithOutputChannelBuffer

go
func WithOutputChannelBuffer(buffer int64) Option

func WithPersistent

go
func WithPersistent(persistent ...bool) Option

Generated by gomarkdoc