Import path:
gitlab.soludian.com/soludian/fountain/libs/brokers/pubsub/gochannel
gochannel
import "gitlab.soludian.com/soludian/fountain/libs/brokers/pubsub/gochannel"Index
- type FanOut
- func NewFanOut(subscriber brokers.Subscriber) *FanOut
- func (f *FanOut) AddSubscription(topic string)
- func (f *FanOut) Close() error
- func (f *FanOut) IsClosed() bool
- func (f *FanOut) Run(ctx context.Context) error
- func (f *FanOut) Running() chan struct{}
- func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)
- type GoChannel
- type Option
type FanOut
FanOut là một component nhận messages từ subscriber và chuyển chúng đến tất cả publishers. Hiệu quả là, messages được "nhân lên".
Một trường hợp sử dụng điển hình của FanOut là có một external subscription và nhiều workers bên trong process.
Cần gọi phương thức AddSubscription cho tất cả các topics mà muốn lắng nghe. Điều này cần được thực hiện *trước* khi bắt đầu FanOut.
FanOut cung cấp interface Subscriber tiêu chuẩn.
type FanOut struct {
// contains filtered or unexported fields
}func NewFanOut
func NewFanOut(subscriber brokers.Subscriber) *FanOutNewFanOut tạo một FanOut mới.
func (*FanOut) AddSubscription
func (f *FanOut) AddSubscription(topic string)AddSubscription thêm một internal subscription cho topic đã cho. Cần gọi phương thức này với tất cả các topics mà muốn lắng nghe, trước khi FanOut được bắt đầu. AddSubscription là idempotent.
func (*FanOut) Close
func (f *FanOut) Close() errorClose đóng Pub/Sub nội bộ của FanOut.
func (*FanOut) IsClosed
func (f *FanOut) IsClosed() boolfunc (*FanOut) Run
func (f *FanOut) Run(ctx context.Context) errorRun chạy FanOut.
func (*FanOut) Running
func (f *FanOut) Running() chan struct{}Running đóng khi FanOut đang chạy.
func (*FanOut) Subscribe
func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)Subscribe bắt đầu subscription đến Pub/Sub nội bộ của FanOut.
type GoChannel
GoChannel là triển khai Pub/Sub đơn giản nhất. Nó dựa trên các kênh của Golang được gửi trong quá trình.
GoChannel không có trạng thái toàn cục, điều đó có nghĩa là cần sử dụng cùng một instance để Publishing và Subscribing!
Khi GoChannel là persistent, thứ tự của các message không được đảm bảo.
type GoChannel struct {
// contains filtered or unexported fields
}func NewGoChannel
func NewGoChannel(opts ...Option) *GoChannelNewGoChannel tạo GoChannel Pub/Sub mới.
GoChannel này không persistent. Điều đó có nghĩa là nếu gửi một message đến một topic mà không có subscriber nào đăng ký, message đó sẽ bị loại bỏ.
func (*GoChannel) Close
func (g *GoChannel) Close() errorClose đóng GoChannel Pub/Sub.
func (*GoChannel) Publish
func (g *GoChannel) Publish(topic string, messages ...*brokers.Message) errorPublish trong GoChannel KHÔNG chặn cho đến khi tất cả các consumer tiêu thụ. Các message sẽ được gửi trong nền.
Các message có thể được lưu trữ hoặc không, tùy thuộc vào thuộc tính persistent.
func (*GoChannel) Subscribe
func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)Subscribe trả về kênh mà tất cả các message đã publish được gửi đến. Các message không được lưu trữ. Nếu không có subscriber và message được sản xuất, nó sẽ biến mất.
Không có hỗ trợ nhóm consumer, mỗi consumer sẽ nhận mọi message đã sản xuất.
type Option
type Option func(*pubSubConfig)func WithBlockPublishUntilSubscriberAck
func WithBlockPublishUntilSubscriberAck(block ...bool) Optionfunc WithLogger
func WithLogger(logger ...flog.FlogInf) Optionfunc WithOutputChannelBuffer
func WithOutputChannelBuffer(buffer int64) Optionfunc WithPersistent
func WithPersistent(persistent ...bool) OptionGenerated by gomarkdoc