Import path:
gitlab.soludian.com/soludian/fountain/libs/brokers
brokers
import "gitlab.soludian.com/soludian/fountain/libs/brokers"Index
- Variables
- func ProcessNameFromCtx(ctx context.Context) string
- func PublishTopicFromCtx(ctx context.Context) string
- func PublisherNameFromCtx(ctx context.Context) string
- func SubscribeTopicFromCtx(ctx context.Context) string
- func SubscriberNameFromCtx(ctx context.Context) string
- type DuplicateProcessNameError
- type Message
- func NewMessage(uuid string, payload Payload) *Message
- func (m *Message) Ack() bool
- func (m *Message) Acked() <-chan struct{}
- func (m *Message) Context() context.Context
- func (m *Message) Copy() *Message
- func (m *Message) Equals(toCompare *Message) bool
- func (m *Message) Nack() bool
- func (m *Message) Nacked() <-chan struct{}
- func (m *Message) SetContext(ctx context.Context)
- type Messages
- type Metadata
- type NoPublishProcessor
- type Payload
- type Pipeline
- func NewPipeline(opts ...option) *Pipeline
- func (r *Pipeline) AddMiddleware(m ...ProcessMiddleware)
- func (r *Pipeline) AddPlugin(p ...PipelinePlugin) *Pipeline
- func (r *Pipeline) AddPublisherDecorators(dec ...PublisherDecorator) *Pipeline
- func (r *Pipeline) AddSubscriberDecorators(dec ...SubscriberDecorator) *Pipeline
- func (r *Pipeline) Close() error
- func (r *Pipeline) IsClosed() bool
- func (r *Pipeline) IsRunning() bool
- func (r *Pipeline) Logger() flog.FlogInf
- func (r *Pipeline) NewProcess(processName string) *Process
- func (r *Pipeline) Processes() map[string]Processor
- func (r *Pipeline) Run(ctx context.Context) (err error)
- func (r *Pipeline) RunProcesses(ctx context.Context) error
- func (r *Pipeline) Running() chan struct{}
- type PipelinePlugin
- type Process
- func (p *Process) AddMiddleware(m ...ProcessMiddleware) *Process
- func (p *Process) Build() *Process
- func (p *Process) From(subscribeTopic string, subscriber Subscriber) *Process
- func (p *Process) Func(processor Processor) *Process
- func (p *Process) NoPublishFunc(noPublishProcessor NoPublishProcessor) *Process
- func (p *Process) Started() chan struct{}
- func (p *Process) Stop()
- func (p *Process) Stopped() chan struct{}
- func (p *Process) To(publishTopic string, publisher Publisher) *Process
- func (p *Process) WithFunc(processor Processor) *Process
- func (p *Process) WithNoPublishFunc(noPublishProcessor NoPublishProcessor) *Process
- type ProcessMiddleware
- type Processor
- type Publisher
- type PublisherDecorator
- type SubscribeInitializer
- type Subscriber
- type SubscriberDecorator
Variables
var (
// ErrOutputInNoPublisherProcess xảy ra khi một hàm xử lý trả về một số message trong một trình xử lý không có publisher.
// todo: có thể thay đổi chữ ký hàm xử lý trong trình xử lý no-publisher loại bỏ khả năng này
ErrOutputInNoPublisherProcess = errors.New("returned output messages in a process without publisher")
)func ProcessNameFromCtx
func ProcessNameFromCtx(ctx context.Context) stringProcessNameFromCtx trả về tên của trình xử lý message trong pipeline đã xử lý message.
func PublishTopicFromCtx
func PublishTopicFromCtx(ctx context.Context) stringPublishTopicFromCtx trả về topic mà message sẽ được publish bởi pipeline.
func PublisherNameFromCtx
func PublisherNameFromCtx(ctx context.Context) stringPublisherNameFromCtx trả về tên của loại publisher message đã publish message trong pipeline. Ví dụ, đối với Kafka, nó sẽ là `kafka.Publisher`.
func SubscribeTopicFromCtx
func SubscribeTopicFromCtx(ctx context.Context) stringSubscribeTopicFromCtx trả về topic từ đó message đã được nhận trong pipeline.
func SubscriberNameFromCtx
func SubscriberNameFromCtx(ctx context.Context) stringSubscriberNameFromCtx trả về tên của loại subscriber message đã subscribe message trong pipeline. Ví dụ, đối với Kafka, nó sẽ là `kafka.Subscriber`.
type DuplicateProcessNameError
DuplicateProcessNameError được gửi trong một panic khi cố gắng thêm một trình xử lý thứ hai với cùng tên.
type DuplicateProcessNameError struct {
ProcessName string
}func (DuplicateProcessNameError) Error
func (d DuplicateProcessNameError) Error() stringtype Message
Message là đơn vị chuyển giao cơ bản. Messages được phát ra bởi Publishers và nhận bởi Subscribers.
Một publisher có thể sửa đổi message trong quá trình gửi tin, ví dụ như có thể thay đổi metadata. Tránh sửa đổi message song song với việc gửi tin, vì nó có thể dẫn đến leak data races. Nói chung, một message nên được chuyển đến một Publish duy nhất và sau đó được coi là bất biến. Nếu cần, sử dụng phương thức Copy để tạo một message mới.
type Message struct {
// UUID là định danh duy nhất của message. Nó chỉ được sử dụng để gỡ lỗi.
// UUID có thể trống.
UUID string
// Metadata chứa metadata của message.
//
// Có thể được sử dụng để lưu trữ dữ liệu mà không cần giải mã toàn bộ payload.
// Nó tương tự như headers của yêu cầu HTTP.
//
// Metadata được mã hóa và sẽ được lưu vào PubSub.
Metadata Metadata
// Payload là payload của message.
Payload Payload
// contains filtered or unexported fields
}func NewMessage
func NewMessage(uuid string, payload Payload) *MessageNewMessage tạo và trả về cấu trúc Message với các thông số đầu vào được cung cấp. Tham số:
- uuid: chuỗi đại diện cho UUID của message.
- payload: đối tượng Payload chứa dữ liệu của message.
Trả về:
- Cấu trúc Message với các trường được khởi tạo và sẵn sàng sử dụng.
func (*Message) Ack
func (m *Message) Ack() boolAck gửi xác nhận của message.
Ack không chặn. Ack là idempotent. Trả về false, nếu Nack đã được gửi.
func (*Message) Acked
func (m *Message) Acked() <-chan struct{}Acked trả về kênh được đóng khi xác nhận được gửi.
Usage:
select {
case <-message.Acked():
// nhận được ack
case <-message.Nacked():
// nhận được nack
}func (*Message) Context
func (m *Message) Context() context.ContextContext trả về context của message. Để thay đổi context, sử dụng SetContext.
Context trả về luôn không nil; nó mặc định là context nền.
func (*Message) Copy
func (m *Message) Copy() *MessageCopy sao chép tất cả message mà không có Acks/Nacks. Context không được truyền đến bản sao.
func (*Message) Equals
func (m *Message) Equals(toCompare *Message) boolS sánh hai messages. Acks/Nacks không được so sánh.
func (*Message) Nack
func (m *Message) Nack() boolNack gửi xác nhận lỗi khi xử lý message.
Nack không chặn. Nack là idempotent. Trả về false, nếu Ack đã được gửi.
func (*Message) Nacked
func (m *Message) Nacked() <-chan struct{}Nacked trả về kênh được đóng khi xác nhận tiêu cực được gửi.
Usage:
select {
case <-message.Acked():
// nhận được ack
case <-message.Nacked():
// nhận được nack
}func (*Message) SetContext
func (m *Message) SetContext(ctx context.Context)SetContext đặt context được cung cấp cho message.
type Messages
Messages là một list các messages.
type Messages []*Messagefunc (Messages) IDs
func (m Messages) IDs() []stringIDs trả về toàn bộ ids của các messages.
type Metadata
Metadata được gửi kèm với mỗi message để cung cấp ngữ cảnh bổ sung mà không cần giải mã payload của message.
type Metadata map[string]stringfunc (Metadata) Get
func (m Metadata) Get(key string) stringGet trả về giá trị metadata cho khóa được cung cấp. Nếu khóa không được tìm thấy, một chuỗi rỗng sẽ được trả về.
func (Metadata) Set
func (m Metadata) Set(key, value string)Set đặt khóa metadata thành giá trị.
type NoPublishProcessor
NoPublishProcessor là một lựa chọn thay thế cho Processor, không tạo ra bất kỳ message nào.
type NoPublishProcessor func(msg *Message) errortype Payload
Payload là payload của Message.
type Payload []bytetype Pipeline
Pipeline chịu trách nhiệm xử lý message từ subscriber bằng cách sử dụng các hàm xử lý đã cung cấp.
Nếu hàm xử lý trả về một message, message sẽ được publish với publisher. Có thể sử dụng middlewares để bọc các trình xử lý với logic chung như ghi nhật ký, đo lường, v.v.
type Pipeline struct {
// contains filtered or unexported fields
}func NewPipeline
func NewPipeline(opts ...option) *PipelineNewPipeline tạo một Pipeline mới với cấu hình đã cho. pipeline được dùng để điều hướng message từ subscriber đến các trình xử lý.
Vd: AddProcesses: nhận message từ topic trong subscriber, xử lý message qua process và gửi kết quả đến publisher với topic khác.
func (*Pipeline) AddMiddleware
func (r *Pipeline) AddMiddleware(m ...ProcessMiddleware)AddMiddleware thêm một middleware mới vào pipeline.
Thứ tự của middleware quan trọng. Middleware được thêm vào đầu tiên sẽ được thực thi trước.
func (*Pipeline) AddPlugin
func (r *Pipeline) AddPlugin(p ...PipelinePlugin) *PipelineAddPlugin thêm một plugin mới vào pipeline. Các plugin được thực thi trong quá trình khởi động của pipeline.
Một plugin có thể, ví dụ, đóng pipeline sau khi SIGINT hoặc SIGTERM được gửi đến quá trình (plugin SignalsProcess).
func (*Pipeline) AddPublisherDecorators
func (r *Pipeline) AddPublisherDecorators(dec ...PublisherDecorator) *PipelineAddPublisherDecorators bọc publisher của pipeline. Decorator đầu tiên là lớp trong cùng nhất, tức là gọi publisher gốc.
func (*Pipeline) AddSubscriberDecorators
func (r *Pipeline) AddSubscriberDecorators(dec ...SubscriberDecorator) *PipelineAddSubscriberDecorators bọc subscriber của pipeline. Decorator đầu tiên là lớp trong cùng nhất, tức là gọi subscriber gốc.
func (*Pipeline) Close
func (r *Pipeline) Close() errorClose đóng pipeline một cách nhẹ nhàng với thời gian chờ được cung cấp trong cấu hình.
func (*Pipeline) IsClosed
func (r *Pipeline) IsClosed() boolfunc (*Pipeline) IsRunning
func (r *Pipeline) IsRunning() boolIsRunning trả về true khi pipeline đang chạy.
Cảnh báo: vì lý do lịch sử, phương thức này không nhận biết về việc đóng pipeline. Nếu muốn biết liệu pipeline đã đóng hay chưa, hãy sử dụng IsClosed.
func (*Pipeline) Logger
func (r *Pipeline) Logger() flog.FlogInffunc (*Pipeline) NewProcess
func (r *Pipeline) NewProcess(processName string) *ProcessNewProcess khởi tạo một process mới với tên processName. Nếu tên process đã tồn tại trong pipeline thì sẽ báo lỗi trùng tên.
processName: Tên của process. Trả về: Con trỏ đến đối tượng Process.
Usage:
process := pipeline.NewProcess("NewProcess")
// Full example
process.NewProcess("NewProcess").From("topic", subscriber).NoPublishFunc(noPublisherProcessMessage).Build()
process.NewProcess("NewProcess").From("topic", subscriber).Func(processMessage).To("new_topic", publisher).Build()func (*Pipeline) Processes
func (r *Pipeline) Processes() map[string]ProcessorProcesses trả về tất cả các trình xử lý đã subscribe.
func (*Pipeline) Run
func (r *Pipeline) Run(ctx context.Context) (err error)Run chạy tất cả các plugin và trình xử lý và bắt đầu subscribe vào topics đã cung cấp. Lời gọi này sẽ chặn trong khi pipeline đang chạy.
Hàm này nên được chạy trong một goroutine.
Khi tất cả các trình xử lý đã dừng (vd: vì các subscribe đã bị đóng), pipeline cũng sẽ dừng.
Để dừng Run() nên gọi Close() trên pipeline. ctx sẽ được truyền đến tất cả các subscriber.
Khi tất cả các trình xử lý đã dừng (vd: vì kết nối đã bị đóng), Run() cũng sẽ dừng.
Usage:
go func() {
err = pipeline.Run(context.Background())
if err != nil {
panic(err)
}
}()func (*Pipeline) RunProcesses
func (r *Pipeline) RunProcesses(ctx context.Context) errorRunProcesses chạy tất cả các trình xử lý đã được thêm sau Run(). RunProcesses là idempotent (có tính đồng nhất), vì vậy có thể được gọi nhiều lần một cách an toàn.
func (*Pipeline) Running
func (r *Pipeline) Running() chan struct{}Running được đóng khi pipeline đang chạy. Nói cách khác: có thể chờ đợi cho đến khi pipeline đang chạy bằng cách sử dụng
fmt.Println("Bắt đầu pipeline")
go r.Run(ctx)
<- r.Running()
fmt.Println("Pipeline đang chạy")Cảnh báo: vì lý do lịch sử, kênh này không nhận biết về việc đóng pipeline - kênh sẽ được đóng nếu pipeline đã chạy và đóng.
type PipelinePlugin
PipelinePlugin là hàm được thực thi khi Pipeline khởi động.
type PipelinePlugin func(*Pipeline) errortype Process
Process xử lý Messages.
type Process struct {
// contains filtered or unexported fields
}func (*Process) AddMiddleware
func (p *Process) AddMiddleware(m ...ProcessMiddleware) *ProcessAddMiddleware thêm middleware mới vào process được chỉ định trong pipeline.
Thứ tự của middleware rất quan trọng. Middleware được thêm vào đầu tiên sẽ được thực thi trước.
func (*Process) Build
func (p *Process) Build() *ProcessBuild tạo và thêm process vào pipeline sau khi cấu hình hoàn tất.
Trả về: Trả về đối tượng Process hiện tại.
Usage:
// Khởi tạo một process mới và đăng ký thông tin:
process := pipeline.NewProcess("NewProcess").
From("mySubscribeTopic", mySubscriber). // Lắng nghe từ "mySubscribeTopic"
Handle(myProcessor). // Đặt hàm xử lý
To("myPublishTopic", myPublisher). // Gửi thông điệp đến "myPublishTopic"
Build() // Xây dựng process và thêm vào pipeline
// Khởi tạo process chỉ lắng nghe mà không phát ra thông điệp:
process := pipeline.NewProcess("MyNoPublishProcess").
From("mySubscribeTopic", mySubscriber). // Lắng nghe từ "mySubscribeTopic"
NoPublishFunc(myNoPublishProcessor). // Đặt hàm xử lý mà không phát ra thông điệp
Build() // Xây dựng process và thêm vào pipelinefunc (*Process) From
func (p *Process) From(subscribeTopic string, subscriber Subscriber) *ProcessFrom đặt thông tin về subscriber và subscribeTopic cho process.
subscribeTopic: Tên của topic mà process sẽ lắng nghe. subscriber: Subscriber sẽ đăng ký vào topic đó. Trả về: Trả về đối tượng Process hiện tại để tiếp tục cấu hình.
Usage:
process := pipeline.NewProcess("MyPipeline").From("mySubscribeTopic", mySubscriber)func (*Process) Func
func (p *Process) Func(processor Processor) *ProcessHandle đặt hàm xử lý (processor) cho process.
processor: Hàm xử lý sẽ được gọi khi có thông điệp từ topic. Trả về: Trả về đối tượng Process hiện tại để tiếp tục cấu hình.
Usage:
process := pipeline.NewProcess("NewProcess").
From("mySubscribeTopic", mySubscriber).
Func(myProcessor).
To("myPublishTopic", myPublisher)func (*Process) NoPublishFunc
func (p *Process) NoPublishFunc(noPublishProcessor NoPublishProcessor) *ProcessNoPublishFunc đặt hàm xử lý không có publish cho process.
noPublishProcessor: Hàm xử lý không gửi thông điệp ra ngoài. Trả về: Trả về đối tượng Process hiện tại.
Usage:
process := pipeline.NewProcess("NewProcess").NoPublishFunc(myNoPublishProcessor)func (*Process) Started
func (p *Process) Started() chan struct{}Started trả về kênh được dừng khi process đang chạy.
func (*Process) Stop
func (p *Process) Stop()Stop dừng process. Stop là không đồng bộ. Có thể kiểm tra xem process đã dừng chưa bằng hàm Stopped().
func (*Process) Stopped
func (p *Process) Stopped() chan struct{}Stopped trả về kênh được dừng khi process đã dừng.
func (*Process) To
func (p *Process) To(publishTopic string, publisher Publisher) *ProcessTo đặt thông tin về publisher và publishTopic cho process.
publishTopic: Tên của topic mà process sẽ gửi thông điệp. publisher: Publisher sẽ phát thông điệp đến topic. Trả về: Trả về đối tượng Process hiện tại để tiếp tục cấu hình.
Usage:
process := pipeline.NewProcess("NewProcess").
From("mySubscribeTopic", mySubscriber).
To("myPublishTopic", myPublisher)func (*Process) WithFunc
func (p *Process) WithFunc(processor Processor) *ProcessWithFunc đặt hàm xử lý tùy chỉnh cho process.
processor: Hàm xử lý sẽ được gọi khi có thông điệp từ topic. Trả về: Trả về đối tượng Process hiện tại.
Usage:
process := pipeline.NewProcess("NewProcess").WithFunc(myProcessor)func (*Process) WithNoPublishFunc
func (p *Process) WithNoPublishFunc(noPublishProcessor NoPublishProcessor) *ProcessWithNoPublishFunc đặt hàm xử lý không có publish tùy chỉnh cho process.
noPublishProcessor: Hàm xử lý không gửi thông điệp ra ngoài. Trả về: Trả về đối tượng Process hiện tại.
Usage:
process := pipeline.NewProcess("NewProcess").WithNoPublishFunc(myNoPublishProcessor)type ProcessMiddleware
ProcessMiddleware cho phép chúng ta viết những thứ như decorators cho Processor. Nó có thể thực thi một cái gì đó trước khi xử lý (ví dụ: sửa đổi message đã xử lý) hoặc sau đó (sửa đổi message đã tạo, ack/nack trên message đã xử lý, xử lý lỗi, ghi nhật ký, v.v.).
Nó có thể được đính kèm vào pipeline bằng cách sử dụng phương thức `AddMiddleware`.
Ví dụ:
func ExampleMiddleware(h message.Processor) message.Processor {
return func(message *message.Message) ([]*message.Message, error) {
fmt.Println("thực thi trước khi xử lý")
producedMessages, err := h(message)
fmt.Println("thực thi sau khi xử lý")
return producedMessages, err
}
}type ProcessMiddleware func(h Processor) Processortype Processor
Processor là hàm được gọi khi nhận được message.
msg.Ack() được gọi tự động khi Processor không trả về lỗi. Khi Processor trả về lỗi, msg.Nack() được gọi. Khi msg.Ack() được gọi trong trình xử lý và Processor trả về lỗi, msg.Nack() sẽ không được gửi vì Ack đã được gửi.
Các Processor được thực thi song song khi nhận được nhiều message (vì msg.Ack() đã được gửi trong Processor hoặc Subscriber hỗ trợ nhiều consumers).
type Processor func(msg *Message) ([]*Message, error)PassthroughProcessor là một trình xử lý chuyển tiếp message không thay đổi từ subscriber đến publisher.
var PassthroughProcessor Processor = func(msg *Message) ([]*Message, error) {
return []*Message{msg}, nil
}type Publisher
Publisher của Pub/Sub.
type Publisher interface {
// Publish gửi các thông điệp được cung cấp tới topic đã cho.
//
// Publish có thể đồng bộ hoặc không đồng bộ - tùy thuộc vào triển khai.
//
// Hầu hết các triển khai publisher không hỗ trợ gửi thông điệp atomic.
// Điều này có nghĩa là nếu việc gửi một trong các thông điệp thất bại, các thông điệp tiếp theo sẽ không được gửi.
//
// Publish không hoạt động với một Context duy nhất.
// Sử dụng phương thức Context() của mỗi thông điệp thay thế.
//
// Publish cần sử dụng thread safe.
Publish(topic string, messages ...*Message) error
// Close nên xả các thông điệp chưa gửi nếu publisher là không đồng bộ.
Close() error
}type PublisherDecorator
PublisherDecorator bọc publisher cơ bản, thêm một số chức năng.
type PublisherDecorator func(pub Publisher) (Publisher, error)func MessageTransformPublisherDecorator
func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecoratorMessageTransformPublisherDecorator tạo một publisher decorator gọi transform trên mỗi message đi qua publisher.
type SubscribeInitializer
SubscribeInitializer được sử dụng để khởi tạo các subscriber.
type SubscribeInitializer interface {
// SubscribeInitialize có thể được gọi để khởi tạo subscribe trước khi xử lý.
// Khi gọi Subscribe trước Publish, SubscribeInitialize không cần thiết.
//
// Không phải mọi Pub/Sub đều yêu cầu khởi tạo này, và nó có thể là tùy chọn để cải thiện hiệu suất v.v.
// Để biết chi tiết về chức năng SubscribeInitialize, vui lòng kiểm tra Pub/Sub.
//
// Việc triển khai SubscribeInitialize không bắt buộc.
SubscribeInitialize(topic string) error
}type Subscriber
Subscriber của Pub/Sub.
type Subscriber interface {
// Subscribe trả về một kênh đầu ra là các thông điệp từ topic đã cung cấp.
// Kênh sẽ được đóng sau khi Close() được gọi trên subscriber.
//
// Để nhận thông điệp tiếp theo, `Ack()` phải được gọi trên thông điệp đã nhận.
// Nếu xử lý thông điệp thất bại và thông điệp cần được gửi lại, `Nack()` nên được gọi thay thế.
//
// Khi ctx được cung cấp bị hủy, subscriber đóng subscribe và kênh đầu ra.
// ctx được cung cấp được truyền tới tất cả các thông điệp được tạo ra.
// Khi Nack hoặc Ack được gọi trên thông điệp, context của thông điệp bị hủy.
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
// Close đóng tất cả các subscribe với các kênh đầu ra của chúng và xả các offset v.v. khi cần thiết.
Close() error
}type SubscriberDecorator
SubscriberDecorator bọc subscriber cơ bản, thêm một số chức năng.
type SubscriberDecorator func(sub Subscriber) (Subscriber, error)func MessageTransformSubscriberDecorator
func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecoratorMessageTransformSubscriberDecorator tạo một subscriber decorator gọi transform trên mỗi message đi qua subscriber.
Generated by gomarkdoc