Skip to content

Import path: gitlab.soludian.com/soludian/fountain/libs/brokers/pipeline/middleware

middleware

go
import "gitlab.soludian.com/soludian/fountain/libs/brokers/pipeline/middleware"

Index

Constants

Các khóa Metadata đánh dấu lý do và ngữ cảnh tại sao message bị coi là poison message.

go
const (
    ReasonForPoisonedKey  = "reason_poisoned"
    PoisonedTopicKey      = "topic_poisoned"
    PoisonedHandlerKey    = "process_poisoned"
    PoisonedSubscriberKey = "subscriber_poisoned"
)

CorrelationIDMetadataKey được sử dụng để lưu trữ ID tương quan trong metadata.

go
const CorrelationIDMetadataKey string = "correlation_id"

MessageHasherReadLimitMinimum chỉ định số byte ít nhất của một [brokers.Message] được sử dụng để tính toán giá trị hash của chúng bằng MessageHasher.

go
const MessageHasherReadLimitMinimum = 64

Variables

ErrInvalidPoisonQueueTopic xảy ra khi topic được cung cấp cho PoisonQueue constructor không hợp lệ.

go
var ErrInvalidPoisonQueueTopic = errors.New("invalid poison queue topic")

func CorrelationID

go
func CorrelationID(h brokers.Processor) brokers.Processor

CorrelationID thêm ID tương quan vào tất cả các message được tạo ra bởi process. ID dựa trên ID từ message nhận được bởi process.

Để CorrelationID hoạt động đúng, SetCorrelationID phải được gọi cho message đầu tiên vào hệ thống.

func Duplicator

go
func Duplicator(h brokers.Processor) brokers.Processor

Duplicator xử lý message hai lần, để đảm bảo rằng điểm cuối là idempotent.

func InstantAck

go
func InstantAck(h brokers.Processor) brokers.Processor

InstantAck khiến process xác nhận ngay lập tức message đến, bất kể có lỗi nào. Nó có thể được sử dụng để tăng thông lượng, nhưng với một cái giá: Nếu có giao hàng chính xác một lần, có thể mong đợi ít nhất một lần thay thế. Nếu có các message được sắp xếp theo thứ tự, thứ tự có thể bị phá vỡ.

func MessageCorrelationID

go
func MessageCorrelationID(message *brokers.Message) string

MessageCorrelationID trả về ID tương quan từ brokers.

func PoisonQueue

go
func PoisonQueue(pub brokers.Publisher, topic string) brokers.ProcessMiddleware

PoisonQueue cung cấp một middleware để cứu vãn các message không xử lý được và publish chúng trên một topic riêng biệt. Chuỗi middleware chính sau đó tiếp tục như bình thường.

func PoisonQueueWithFilter

go
func PoisonQueueWithFilter(pub brokers.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) brokers.ProcessMiddleware

PoisonQueueWithFilter giống như PoisonQueue, nhưng chấp nhận một hàm quyết định lỗi nào đủ điều kiện cho hàng đợi poison.

func RandomFail

go
func RandomFail(errorProbability float32) brokers.ProcessMiddleware

RandomFail làm cho process thất bại với lỗi dựa trên xác suất ngẫu nhiên. Xác suất lỗi nên nằm trong khoảng (0,1).

func RandomPanic

go
func RandomPanic(panicProbability float32) brokers.ProcessMiddleware

RandomPanic làm cho process hoảng loạn dựa trên xác suất ngẫu nhiên. Xác suất hoảng loạn nên nằm trong khoảng (0,1).

func Recoverer

go
func Recoverer(h brokers.Processor) brokers.Processor

Recoverer phục hồi từ bất kỳ panic nào trong process và thêm RecoveredPanicError với stacktrace vào bất kỳ lỗi nào được trả về từ process.

func SetCorrelationID

go
func SetCorrelationID(id string, msg *brokers.Message)

SetCorrelationID thiết lập một ID tương quan cho brokers.

SetCorrelationID nên được gọi khi message vào hệ thống. Khi message được tạo ra trong một yêu cầu (ví dụ HTTP), ID tương quan của message nên giống với ID tương quan của yêu cầu.

func Timeout

go
func Timeout(timeout time.Duration) func(brokers.Processor) brokers.Processor

Timeout làm cho process hủy context của message đến sau một khoảng thời gian xác định. Bất kỳ chức năng nào nhạy cảm với thời gian chờ của process nên lắng nghe trên msg.Context().Done() để biết khi nào cần thất bại.

type CircuitBreaker

CircuitBreaker là một middleware bọc process trong một circuit breaker. Dựa trên cấu hình, circuit breaker sẽ nhanh chóng thất bại nếu process liên tục trả về lỗi. Điều này hữu ích để ngăn chặn các lỗi lan truyền.

go
type CircuitBreaker struct {
    // contains filtered or unexported fields
}

func NewCircuitBreaker

go
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker

NewCircuitBreaker trả về một middleware CircuitBreaker mới. Tham khảo tài liệu gobreaker để biết các cài đặt có sẵn.

func (CircuitBreaker) Middleware

go
func (c CircuitBreaker) Middleware(h brokers.Processor) brokers.Processor

Middleware trả về middleware CircuitBreaker.

type Deduplicator

Deduplicator bỏ qua các message tương tự nếu chúng có mặt trong một ExpiringKeyRepository. Sự tương tự được xác định bởi một MessageHasher. Thời gian chờ được áp dụng cho các thao tác repository bằng cách sử dụng context.WithTimeout.

Gọi Deduplicator.Middleware cho một middleware mới hoặc [Deduplicator.Decorator] cho một [brokers.PublisherDecorator].

KeyFactory mặc định là NewMessageHasherAdler32 với giới hạn đọc được đặt thành math.MaxInt64 để gắn thẻ nhanh. Sử dụng NewMessageHasherSHA256 để giảm thiểu va chạm.

Repository mặc định là NewMapExpiringKeyRepository với một cửa sổ giữ lại một phút. Cài đặt mặc định này có hiệu suất cao nhưng **không hỗ trợ các thao tác phân tán**. Nếu bạn triển khai một ExpiringKeyRepository được hỗ trợ bởi Redis, vui lòng gửi một pull request.

Timeout mặc định là một phút. Nếu thấp hơn năm mili giây, nó được đặt thành năm mili giây.

ExpiringKeyRepository phải hết hạn các giá trị trong một khoảng thời gian nhất định. Nếu không có sự hết hạn, chỉ có một message duy nhất sẽ được gửi đi miễn là repository giữ trạng thái của nó.

go
type Deduplicator struct {
    KeyFactory MessageHasher
    Repository ExpiringKeyRepository
    Timeout    time.Duration
}

func (*Deduplicator) IsDuplicate

go
func (d *Deduplicator) IsDuplicate(m *brokers.Message) (bool, error)

IsDuplicate trả về true nếu thẻ hash của message được tính toán bằng cách sử dụng một MessageHasher đã được nhìn thấy trong cửa sổ thời gian deduplication.

func (*Deduplicator) Middleware

go
func (d *Deduplicator) Middleware(h brokers.Processor) brokers.Processor

Middleware trả về [brokers.ProcessMiddleware] mà bỏ qua các message tương tự trong một khoảng thời gian nhất định.

func (*Deduplicator) PublisherDecorator

go
func (d *Deduplicator) PublisherDecorator() brokers.PublisherDecorator

PublisherDecorator trả về một decorator mà xác nhận và bỏ qua mọi [brokers.Message] được nhận diện bởi một Deduplicator.

Decorator trả về cung cấp cùng chức năng cho một [brokers.Publisher] như Deduplicator.Middleware cho một [brokers.Pipeline].

type DelayOnError

DelayOnError là một middleware thêm metadata delay vào message nếu xảy ra lỗi.

IMPORTANT: Delay hiện tại chỉ hỗ trợ Postgres Pub/Sub! Sử dụng nó sẽ không có hiệu lực trên các Pub/Sub không hỗ trợ.

go
type DelayOnError struct {
    // InitialInterval là khoảng thời gian đầu tiên giữa các lần retry. Các khoảng thời gian tiếp theo sẽ được nhân với Multiplier.
    InitialInterval time.Duration
    // MaxInterval đặt giới hạn cho backoff theo cấp số nhân của các lần retry. Khoảng thời gian sẽ không tăng quá MaxInterval.
    MaxInterval time.Duration
    // Multiplier là hệ số mà khoảng thời gian chờ sẽ được nhân giữa các lần retry.
    Multiplier float64
}

func (*DelayOnError) Middleware

go
func (d *DelayOnError) Middleware(h brokers.Processor) brokers.Processor

type ExpiringKeyRepository

ExpiringKeyRepository là một container trạng thái để kiểm tra sự tồn tại của một key trong một khoảng thời gian nhất định. Tất cả các thao tác phải an toàn cho việc sử dụng đồng thời.

go
type ExpiringKeyRepository interface {
    // IsDuplicate trả về `true` nếu key
    // không được kiểm tra trong gần đây.
    // Key phải hết hạn trong một khoảng thời gian nhất định.
    IsDuplicate(ctx context.Context, key string) (ok bool, err error)
}

func NewMapExpiringKeyRepository

go
func NewMapExpiringKeyRepository(window time.Duration) (ExpiringKeyRepository, error)

NewMapExpiringKeyRepository trả về một bộ nhớ lưu trữ được hỗ trợ bởi một hash map thông thường được bảo vệ bởi một sync.Mutex. Trạng thái **không thể được chia sẻ hoặc đồng bộ hóa giữa các instance** theo thiết kế để đạt hiệu suất cao.

Nếu cần bỏ qua các message trùng lặp bằng cách điều phối, triển khai giao diện ExpiringKeyRepository được hỗ trợ bởi Redis hoặc tương tự.

Window chỉ định thời gian tối thiểu mà các thẻ trùng lặp được ghi nhớ. Thời gian thực có thể kéo dài lên đến 50% lâu hơn vì nó phụ thuộc vào chu kỳ dọn dẹp.

type IgnoreErrors

IgnoreErrors cung cấp một middleware giúp process bỏ qua một số lỗi được whitelist rõ ràng.

go
type IgnoreErrors struct {
    // contains filtered or unexported fields
}

func NewIgnoreErrors

go
func NewIgnoreErrors(errs []error) IgnoreErrors

NewIgnoreErrors tạo một middleware IgnoreErrors mới.

func (IgnoreErrors) Middleware

go
func (i IgnoreErrors) Middleware(h brokers.Processor) brokers.Processor

Middleware trả về middleware IgnoreErrors.

type MessageHasher

MessageHasher trả về một thẻ ngắn mô tả một brokers. Thẻ nên là duy nhất cho mỗi message, nhưng tránh hoàn toàn các va chạm hash là không thực tế vì lý do hiệu suất. Được sử dụng để cung cấp năng lượng cho [Deduplicator]s.

go
type MessageHasher func(*brokers.Message) (string, error)

func NewMessageHasherAdler32

go
func NewMessageHasherAdler32(readLimit int64) MessageHasher

NewMessageHasherAdler32 tạo ra các hash của message bằng cách sử dụng Adler-32 checksum nhanh của body [brokers.Message]. Giới hạn đọc chỉ định số byte của message được sử dụng để tính toán hash.

Giới hạn thấp hơn cải thiện hiệu suất nhưng dẫn đến nhiều kết quả dương tính giả hơn. Giới hạn đọc phải lớn hơn MessageHasherReadLimitMinimum.

func NewMessageHasherFromMetadataField

go
func NewMessageHasherFromMetadataField(field string) MessageHasher

NewMessageHasherFromMetadataField tìm kiếm một giá trị hash bên trong metadata của message thay vì tính toán một cái mới. Hữu ích nếu một MessageHasher đã được áp dụng trong một [brokers.Processor] trước đó.

func NewMessageHasherSHA256

go
func NewMessageHasherSHA256(readLimit int64) MessageHasher

NewMessageHasherSHA256 tạo ra các hash của message bằng cách sử dụng hashing chậm hơn nhưng bền vững hơn của body [brokers.Message]. Giới hạn đọc chỉ định số byte của message được sử dụng để tính toán hash.

Giới hạn thấp hơn cải thiện hiệu suất nhưng dẫn đến nhiều kết quả dương tính giả hơn. Giới hạn đọc phải lớn hơn MessageHasherReadLimitMinimum.

type RecoveredPanicError

RecoveredPanicError chứa lỗi panic đã được phục hồi cùng với stacktrace.

go
type RecoveredPanicError struct {
    V          any
    Stacktrace string
}

func (RecoveredPanicError) Error

go
func (p RecoveredPanicError) Error() string

type Retry

Retry cung cấp một middleware để thử lại process nếu có lỗi xảy ra. Hành vi thử lại có thể cấu hình, với backoff lũy thừa và thời gian tối đa.

go
type Retry struct {
    // MaxRetries là số lần thử lại tối đa.
    MaxRetries int

    // InitialInterval là khoảng thời gian đầu tiên giữa các lần thử lại. Các khoảng thời gian tiếp theo sẽ được nhân lên bởi Multiplier.
    InitialInterval time.Duration
    // MaxInterval đặt giới hạn cho backoff lũy thừa của các lần thử lại. Khoảng thời gian sẽ không tăng quá MaxInterval.
    MaxInterval time.Duration
    // Multiplier là hệ số mà khoảng thời gian chờ sẽ được nhân lên giữa các lần thử lại.
    Multiplier float64
    // MaxElapsedTime đặt giới hạn thời gian cho việc thử lại. Vô hiệu hóa nếu là 0.
    MaxElapsedTime time.Duration
    // RandomizationFactor ngẫu nhiên hóa sự phân bố của các khoảng thời gian backoff trong khoảng:
    // [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
    RandomizationFactor float64

    // OnRetryHook là một hàm tùy chọn sẽ được thực thi trong mỗi lần thử lại.
    // Số lần thử lại hiện tại được truyền vào như retryNum,
    OnRetryHook func(retryNum int, delay time.Duration)

    Logger flog.FlogInf
}

func (Retry) Middleware

go
func (r Retry) Middleware(h brokers.Processor) brokers.Processor

Middleware trả về middleware Retry.

type Throttle

Throttle cung cấp một middleware giới hạn số lượng message được xử lý mỗi đơn vị thời gian. Điều này có thể được thực hiện ví dụ để ngăn chặn tải quá mức do chạy một process trên một hàng đợi dài các message chưa được xử lý.

go
type Throttle struct {
    // contains filtered or unexported fields
}

func NewThrottle

go
func NewThrottle(count int64, duration time.Duration) *Throttle

NewThrottle tạo một middleware Throttle mới. Ví dụ về thời gian và số lượng: NewThrottle(10, time.Second) cho 10 message mỗi giây

func (Throttle) Middleware

go
func (t Throttle) Middleware(h brokers.Processor) brokers.Processor

Middleware trả về middleware Throttle.

Generated by gomarkdoc