Skip to content

Import path: gitlab.soludian.com/soludian/fountain/libs/brokers

brokers

go
import "gitlab.soludian.com/soludian/fountain/libs/brokers"

Index

Variables

go
var (
    // ErrOutputInNoPublisherProcess xảy ra khi một hàm xử lý trả về một số message trong một trình xử lý không có publisher.
    // todo: có thể thay đổi chữ ký hàm xử lý trong trình xử lý no-publisher loại bỏ khả năng này
    ErrOutputInNoPublisherProcess = errors.New("returned output messages in a process without publisher")
)

func ProcessNameFromCtx

go
func ProcessNameFromCtx(ctx context.Context) string

ProcessNameFromCtx trả về tên của trình xử lý message trong pipeline đã xử lý message.

func PublishTopicFromCtx

go
func PublishTopicFromCtx(ctx context.Context) string

PublishTopicFromCtx trả về topic mà message sẽ được publish bởi pipeline.

func PublisherNameFromCtx

go
func PublisherNameFromCtx(ctx context.Context) string

PublisherNameFromCtx trả về tên của loại publisher message đã publish message trong pipeline. Ví dụ, đối với Kafka, nó sẽ là `kafka.Publisher`.

func SubscribeTopicFromCtx

go
func SubscribeTopicFromCtx(ctx context.Context) string

SubscribeTopicFromCtx trả về topic từ đó message đã được nhận trong pipeline.

func SubscriberNameFromCtx

go
func SubscriberNameFromCtx(ctx context.Context) string

SubscriberNameFromCtx trả về tên của loại subscriber message đã subscribe message trong pipeline. Ví dụ, đối với Kafka, nó sẽ là `kafka.Subscriber`.

type DuplicateProcessNameError

DuplicateProcessNameError được gửi trong một panic khi cố gắng thêm một trình xử lý thứ hai với cùng tên.

go
type DuplicateProcessNameError struct {
    ProcessName string
}

func (DuplicateProcessNameError) Error

go
func (d DuplicateProcessNameError) Error() string

type Message

Message là đơn vị chuyển giao cơ bản. Messages được phát ra bởi Publishers và nhận bởi Subscribers.

Một publisher có thể sửa đổi message trong quá trình gửi tin, ví dụ như có thể thay đổi metadata. Tránh sửa đổi message song song với việc gửi tin, vì nó có thể dẫn đến leak data races. Nói chung, một message nên được chuyển đến một Publish duy nhất và sau đó được coi là bất biến. Nếu cần, sử dụng phương thức Copy để tạo một message mới.

go
type Message struct {
    // UUID là định danh duy nhất của message. Nó chỉ được sử dụng để gỡ lỗi.
    // UUID có thể trống.
    UUID string

    // Metadata chứa metadata của message.
    //
    // Có thể được sử dụng để lưu trữ dữ liệu mà không cần giải mã toàn bộ payload.
    // Nó tương tự như headers của yêu cầu HTTP.
    //
    // Metadata được mã hóa và sẽ được lưu vào PubSub.
    Metadata Metadata

    // Payload là payload của message.
    Payload Payload
    // contains filtered or unexported fields
}

func NewMessage

go
func NewMessage(uuid string, payload Payload) *Message

NewMessage tạo và trả về cấu trúc Message với các thông số đầu vào được cung cấp. Tham số:

  • uuid: chuỗi đại diện cho UUID của message.
  • payload: đối tượng Payload chứa dữ liệu của message.

Trả về:

  • Cấu trúc Message với các trường được khởi tạo và sẵn sàng sử dụng.

func (*Message) Ack

go
func (m *Message) Ack() bool

Ack gửi xác nhận của message.

Ack không chặn. Ack là idempotent. Trả về false, nếu Nack đã được gửi.

func (*Message) Acked

go
func (m *Message) Acked() <-chan struct{}

Acked trả về kênh được đóng khi xác nhận được gửi.

Usage:

select {
case <-message.Acked():
	// nhận được ack
case <-message.Nacked():
	// nhận được nack
}

func (*Message) Context

go
func (m *Message) Context() context.Context

Context trả về context của message. Để thay đổi context, sử dụng SetContext.

Context trả về luôn không nil; nó mặc định là context nền.

func (*Message) Copy

go
func (m *Message) Copy() *Message

Copy sao chép tất cả message mà không có Acks/Nacks. Context không được truyền đến bản sao.

func (*Message) Equals

go
func (m *Message) Equals(toCompare *Message) bool

S sánh hai messages. Acks/Nacks không được so sánh.

func (*Message) Nack

go
func (m *Message) Nack() bool

Nack gửi xác nhận lỗi khi xử lý message.

Nack không chặn. Nack là idempotent. Trả về false, nếu Ack đã được gửi.

func (*Message) Nacked

go
func (m *Message) Nacked() <-chan struct{}

Nacked trả về kênh được đóng khi xác nhận tiêu cực được gửi.

Usage:

select {
case <-message.Acked():
	// nhận được ack
case <-message.Nacked():
	// nhận được nack
}

func (*Message) SetContext

go
func (m *Message) SetContext(ctx context.Context)

SetContext đặt context được cung cấp cho message.

type Messages

Messages là một list các messages.

go
type Messages []*Message

func (Messages) IDs

go
func (m Messages) IDs() []string

IDs trả về toàn bộ ids của các messages.

type Metadata

Metadata được gửi kèm với mỗi message để cung cấp ngữ cảnh bổ sung mà không cần giải mã payload của message.

go
type Metadata map[string]string

func (Metadata) Get

go
func (m Metadata) Get(key string) string

Get trả về giá trị metadata cho khóa được cung cấp. Nếu khóa không được tìm thấy, một chuỗi rỗng sẽ được trả về.

func (Metadata) Set

go
func (m Metadata) Set(key, value string)

Set đặt khóa metadata thành giá trị.

type NoPublishProcessor

NoPublishProcessor là một lựa chọn thay thế cho Processor, không tạo ra bất kỳ message nào.

go
type NoPublishProcessor func(msg *Message) error

type Payload

Payload là payload của Message.

go
type Payload []byte

type Pipeline

Pipeline chịu trách nhiệm xử lý message từ subscriber bằng cách sử dụng các hàm xử lý đã cung cấp.

Nếu hàm xử lý trả về một message, message sẽ được publish với publisher. Có thể sử dụng middlewares để bọc các trình xử lý với logic chung như ghi nhật ký, đo lường, v.v.

go
type Pipeline struct {
    // contains filtered or unexported fields
}

func NewPipeline

go
func NewPipeline(opts ...option) *Pipeline

NewPipeline tạo một Pipeline mới với cấu hình đã cho. pipeline được dùng để điều hướng message từ subscriber đến các trình xử lý.

Vd: AddProcesses: nhận message từ topic trong subscriber, xử lý message qua process và gửi kết quả đến publisher với topic khác.

func (*Pipeline) AddMiddleware

go
func (r *Pipeline) AddMiddleware(m ...ProcessMiddleware)

AddMiddleware thêm một middleware mới vào pipeline.

Thứ tự của middleware quan trọng. Middleware được thêm vào đầu tiên sẽ được thực thi trước.

func (*Pipeline) AddPlugin

go
func (r *Pipeline) AddPlugin(p ...PipelinePlugin) *Pipeline

AddPlugin thêm một plugin mới vào pipeline. Các plugin được thực thi trong quá trình khởi động của pipeline.

Một plugin có thể, ví dụ, đóng pipeline sau khi SIGINT hoặc SIGTERM được gửi đến quá trình (plugin SignalsProcess).

func (*Pipeline) AddPublisherDecorators

go
func (r *Pipeline) AddPublisherDecorators(dec ...PublisherDecorator) *Pipeline

AddPublisherDecorators bọc publisher của pipeline. Decorator đầu tiên là lớp trong cùng nhất, tức là gọi publisher gốc.

func (*Pipeline) AddSubscriberDecorators

go
func (r *Pipeline) AddSubscriberDecorators(dec ...SubscriberDecorator) *Pipeline

AddSubscriberDecorators bọc subscriber của pipeline. Decorator đầu tiên là lớp trong cùng nhất, tức là gọi subscriber gốc.

func (*Pipeline) Close

go
func (r *Pipeline) Close() error

Close đóng pipeline một cách nhẹ nhàng với thời gian chờ được cung cấp trong cấu hình.

func (*Pipeline) IsClosed

go
func (r *Pipeline) IsClosed() bool

func (*Pipeline) IsRunning

go
func (r *Pipeline) IsRunning() bool

IsRunning trả về true khi pipeline đang chạy.

Cảnh báo: vì lý do lịch sử, phương thức này không nhận biết về việc đóng pipeline. Nếu muốn biết liệu pipeline đã đóng hay chưa, hãy sử dụng IsClosed.

func (*Pipeline) Logger

go
func (r *Pipeline) Logger() flog.FlogInf

func (*Pipeline) NewProcess

go
func (r *Pipeline) NewProcess(processName string) *Process

NewProcess khởi tạo một process mới với tên processName. Nếu tên process đã tồn tại trong pipeline thì sẽ báo lỗi trùng tên.

processName: Tên của process. Trả về: Con trỏ đến đối tượng Process.

Usage:

process := pipeline.NewProcess("NewProcess")
// Full example
process.NewProcess("NewProcess").From("topic", subscriber).NoPublishFunc(noPublisherProcessMessage).Build()
process.NewProcess("NewProcess").From("topic", subscriber).Func(processMessage).To("new_topic", publisher).Build()

func (*Pipeline) Processes

go
func (r *Pipeline) Processes() map[string]Processor

Processes trả về tất cả các trình xử lý đã subscribe.

func (*Pipeline) Run

go
func (r *Pipeline) Run(ctx context.Context) (err error)

Run chạy tất cả các plugin và trình xử lý và bắt đầu subscribe vào topics đã cung cấp. Lời gọi này sẽ chặn trong khi pipeline đang chạy.

Hàm này nên được chạy trong một goroutine.

Khi tất cả các trình xử lý đã dừng (vd: vì các subscribe đã bị đóng), pipeline cũng sẽ dừng.

Để dừng Run() nên gọi Close() trên pipeline. ctx sẽ được truyền đến tất cả các subscriber.

Khi tất cả các trình xử lý đã dừng (vd: vì kết nối đã bị đóng), Run() cũng sẽ dừng.

Usage:

go func() {
    err = pipeline.Run(context.Background())
    if err != nil {
        panic(err)
    }
}()

func (*Pipeline) RunProcesses

go
func (r *Pipeline) RunProcesses(ctx context.Context) error

RunProcesses chạy tất cả các trình xử lý đã được thêm sau Run(). RunProcesses là idempotent (có tính đồng nhất), vì vậy có thể được gọi nhiều lần một cách an toàn.

func (*Pipeline) Running

go
func (r *Pipeline) Running() chan struct{}

Running được đóng khi pipeline đang chạy. Nói cách khác: có thể chờ đợi cho đến khi pipeline đang chạy bằng cách sử dụng

fmt.Println("Bắt đầu pipeline")
go r.Run(ctx)
<- r.Running()
fmt.Println("Pipeline đang chạy")

Cảnh báo: vì lý do lịch sử, kênh này không nhận biết về việc đóng pipeline - kênh sẽ được đóng nếu pipeline đã chạy và đóng.

type PipelinePlugin

PipelinePlugin là hàm được thực thi khi Pipeline khởi động.

go
type PipelinePlugin func(*Pipeline) error

type Process

Process xử lý Messages.

go
type Process struct {
    // contains filtered or unexported fields
}

func (*Process) AddMiddleware

go
func (p *Process) AddMiddleware(m ...ProcessMiddleware) *Process

AddMiddleware thêm middleware mới vào process được chỉ định trong pipeline.

Thứ tự của middleware rất quan trọng. Middleware được thêm vào đầu tiên sẽ được thực thi trước.

func (*Process) Build

go
func (p *Process) Build() *Process

Build tạo và thêm process vào pipeline sau khi cấu hình hoàn tất.

Trả về: Trả về đối tượng Process hiện tại.

Usage:

// Khởi tạo một process mới và đăng ký thông tin:
 process := pipeline.NewProcess("NewProcess").
	 From("mySubscribeTopic", mySubscriber). // Lắng nghe từ "mySubscribeTopic"
	 Handle(myProcessor).                  // Đặt hàm xử lý
	 To("myPublishTopic", myPublisher).      // Gửi thông điệp đến "myPublishTopic"
	 Build()                                 // Xây dựng process và thêm vào pipeline

 // Khởi tạo process chỉ lắng nghe mà không phát ra thông điệp:
 process := pipeline.NewProcess("MyNoPublishProcess").
     From("mySubscribeTopic", mySubscriber).   // Lắng nghe từ "mySubscribeTopic"
     NoPublishFunc(myNoPublishProcessor).  // Đặt hàm xử lý mà không phát ra thông điệp
     Build()                                   // Xây dựng process và thêm vào pipeline

func (*Process) From

go
func (p *Process) From(subscribeTopic string, subscriber Subscriber) *Process

From đặt thông tin về subscriber và subscribeTopic cho process.

subscribeTopic: Tên của topic mà process sẽ lắng nghe. subscriber: Subscriber sẽ đăng ký vào topic đó. Trả về: Trả về đối tượng Process hiện tại để tiếp tục cấu hình.

Usage:

process := pipeline.NewProcess("MyPipeline").From("mySubscribeTopic", mySubscriber)

func (*Process) Func

go
func (p *Process) Func(processor Processor) *Process

Handle đặt hàm xử lý (processor) cho process.

processor: Hàm xử lý sẽ được gọi khi có thông điệp từ topic. Trả về: Trả về đối tượng Process hiện tại để tiếp tục cấu hình.

Usage:

process := pipeline.NewProcess("NewProcess").
            From("mySubscribeTopic", mySubscriber).
            Func(myProcessor).
            To("myPublishTopic", myPublisher)

func (*Process) NoPublishFunc

go
func (p *Process) NoPublishFunc(noPublishProcessor NoPublishProcessor) *Process

NoPublishFunc đặt hàm xử lý không có publish cho process.

noPublishProcessor: Hàm xử lý không gửi thông điệp ra ngoài. Trả về: Trả về đối tượng Process hiện tại.

Usage:

process := pipeline.NewProcess("NewProcess").NoPublishFunc(myNoPublishProcessor)

func (*Process) Started

go
func (p *Process) Started() chan struct{}

Started trả về kênh được dừng khi process đang chạy.

func (*Process) Stop

go
func (p *Process) Stop()

Stop dừng process. Stop là không đồng bộ. Có thể kiểm tra xem process đã dừng chưa bằng hàm Stopped().

func (*Process) Stopped

go
func (p *Process) Stopped() chan struct{}

Stopped trả về kênh được dừng khi process đã dừng.

func (*Process) To

go
func (p *Process) To(publishTopic string, publisher Publisher) *Process

To đặt thông tin về publisher và publishTopic cho process.

publishTopic: Tên của topic mà process sẽ gửi thông điệp. publisher: Publisher sẽ phát thông điệp đến topic. Trả về: Trả về đối tượng Process hiện tại để tiếp tục cấu hình.

Usage:

process := pipeline.NewProcess("NewProcess").
            From("mySubscribeTopic", mySubscriber).
            To("myPublishTopic", myPublisher)

func (*Process) WithFunc

go
func (p *Process) WithFunc(processor Processor) *Process

WithFunc đặt hàm xử lý tùy chỉnh cho process.

processor: Hàm xử lý sẽ được gọi khi có thông điệp từ topic. Trả về: Trả về đối tượng Process hiện tại.

Usage:

process := pipeline.NewProcess("NewProcess").WithFunc(myProcessor)

func (*Process) WithNoPublishFunc

go
func (p *Process) WithNoPublishFunc(noPublishProcessor NoPublishProcessor) *Process

WithNoPublishFunc đặt hàm xử lý không có publish tùy chỉnh cho process.

noPublishProcessor: Hàm xử lý không gửi thông điệp ra ngoài. Trả về: Trả về đối tượng Process hiện tại.

Usage:

process := pipeline.NewProcess("NewProcess").WithNoPublishFunc(myNoPublishProcessor)

type ProcessMiddleware

ProcessMiddleware cho phép chúng ta viết những thứ như decorators cho Processor. Nó có thể thực thi một cái gì đó trước khi xử lý (ví dụ: sửa đổi message đã xử lý) hoặc sau đó (sửa đổi message đã tạo, ack/nack trên message đã xử lý, xử lý lỗi, ghi nhật ký, v.v.).

Nó có thể được đính kèm vào pipeline bằng cách sử dụng phương thức `AddMiddleware`.

Ví dụ:

func ExampleMiddleware(h message.Processor) message.Processor {
	return func(message *message.Message) ([]*message.Message, error) {
		fmt.Println("thực thi trước khi xử lý")
		producedMessages, err := h(message)
		fmt.Println("thực thi sau khi xử lý")

		return producedMessages, err
	}
}
go
type ProcessMiddleware func(h Processor) Processor

type Processor

Processor là hàm được gọi khi nhận được message.

msg.Ack() được gọi tự động khi Processor không trả về lỗi. Khi Processor trả về lỗi, msg.Nack() được gọi. Khi msg.Ack() được gọi trong trình xử lý và Processor trả về lỗi, msg.Nack() sẽ không được gửi vì Ack đã được gửi.

Các Processor được thực thi song song khi nhận được nhiều message (vì msg.Ack() đã được gửi trong Processor hoặc Subscriber hỗ trợ nhiều consumers).

go
type Processor func(msg *Message) ([]*Message, error)

PassthroughProcessor là một trình xử lý chuyển tiếp message không thay đổi từ subscriber đến publisher.

go
var PassthroughProcessor Processor = func(msg *Message) ([]*Message, error) {
    return []*Message{msg}, nil
}

type Publisher

Publisher của Pub/Sub.

go
type Publisher interface {
    // Publish gửi các thông điệp được cung cấp tới topic đã cho.
    //
    // Publish có thể đồng bộ hoặc không đồng bộ - tùy thuộc vào triển khai.
    //
    // Hầu hết các triển khai publisher không hỗ trợ gửi thông điệp atomic.
    // Điều này có nghĩa là nếu việc gửi một trong các thông điệp thất bại, các thông điệp tiếp theo sẽ không được gửi.
    //
    // Publish không hoạt động với một Context duy nhất.
    // Sử dụng phương thức Context() của mỗi thông điệp thay thế.
    //
    // Publish cần sử dụng thread safe.
    Publish(topic string, messages ...*Message) error
    // Close nên xả các thông điệp chưa gửi nếu publisher là không đồng bộ.
    Close() error
}

type PublisherDecorator

PublisherDecorator bọc publisher cơ bản, thêm một số chức năng.

go
type PublisherDecorator func(pub Publisher) (Publisher, error)

func MessageTransformPublisherDecorator

go
func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecorator

MessageTransformPublisherDecorator tạo một publisher decorator gọi transform trên mỗi message đi qua publisher.

type SubscribeInitializer

SubscribeInitializer được sử dụng để khởi tạo các subscriber.

go
type SubscribeInitializer interface {
    // SubscribeInitialize có thể được gọi để khởi tạo subscribe trước khi xử lý.
    // Khi gọi Subscribe trước Publish, SubscribeInitialize không cần thiết.
    //
    // Không phải mọi Pub/Sub đều yêu cầu khởi tạo này, và nó có thể là tùy chọn để cải thiện hiệu suất v.v.
    // Để biết chi tiết về chức năng SubscribeInitialize, vui lòng kiểm tra Pub/Sub.
    //
    // Việc triển khai SubscribeInitialize không bắt buộc.
    SubscribeInitialize(topic string) error
}

type Subscriber

Subscriber của Pub/Sub.

go
type Subscriber interface {
    // Subscribe trả về một kênh đầu ra là các thông điệp từ topic đã cung cấp.
    // Kênh sẽ được đóng sau khi Close() được gọi trên subscriber.
    //
    // Để nhận thông điệp tiếp theo, `Ack()` phải được gọi trên thông điệp đã nhận.
    // Nếu xử lý thông điệp thất bại và thông điệp cần được gửi lại, `Nack()` nên được gọi thay thế.
    //
    // Khi ctx được cung cấp bị hủy, subscriber đóng subscribe và kênh đầu ra.
    // ctx được cung cấp được truyền tới tất cả các thông điệp được tạo ra.
    // Khi Nack hoặc Ack được gọi trên thông điệp, context của thông điệp bị hủy.
    Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
    // Close đóng tất cả các subscribe với các kênh đầu ra của chúng và xả các offset v.v. khi cần thiết.
    Close() error
}

type SubscriberDecorator

SubscriberDecorator bọc subscriber cơ bản, thêm một số chức năng.

go
type SubscriberDecorator func(sub Subscriber) (Subscriber, error)

func MessageTransformSubscriberDecorator

go
func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecorator

MessageTransformSubscriberDecorator tạo một subscriber decorator gọi transform trên mỗi message đi qua subscriber.

Generated by gomarkdoc