Skip to content

Import path: gitlab.soludian.com/soludian/fountain/libs/brokers/providers/sse

sse

go
import "gitlab.soludian.com/soludian/fountain/libs/brokers/providers/sse"

Index

Constants

go
const (
    HeaderUUID     = "message-uuid"
    HeaderMetadata = "message-metadata"
    ProviderName   = "http"
)

go
const PackageName = "sse"

Variables

go
var (
    // ErrPublisherClosed xảy ra khi cố gắng publish tới một topic trong khi publisher đã đóng hoặc đang đóng.
    ErrPublisherClosed = errors.New("publisher is closed")
    ErrNoMarshalFunc   = errors.New("marshal function is missing")
    ErrErrorResponse   = errors.New("server responded with error status")
)

func DefaultErrorHandler

go
func DefaultErrorHandler(w http.ResponseWriter, r *http.Request, err error)

DefaultErrorHandler ghi phản hồi lỗi JSON cùng với mã Lỗi Máy chủ Nội bộ (500).

func DefaultMarshalMessageFunc

go
func DefaultMarshalMessageFunc(url string, msg *brokers.Message) (*http.Request, error)

DefaultMarshalMessageFunc chuyển đổi message thành một HTTP POST request. Nó mã hóa UUID và Metadata trong các header của request.

func DefaultUnmarshalMessageFunc

go
func DefaultUnmarshalMessageFunc(topic string, req *http.Request) (*brokers.Message, error)

DefaultUnmarshalMessageFunc lấy UUID và Metadata từ request headers, như được mã hóa bởi DefaultMarshalMessageFunc.

func SetResponseStatusCode

go
func SetResponseStatusCode(m *brokers.Message, code int) *brokers.Message

SetResponseStatusCode đặt một http status code cho message đã cho.

func StatusCodeFromContext

go
func StatusCodeFromContext(ctx context.Context, otherwise int) int

StatusCodeFromContext trả về status code từ context.

func WithResponseStatusCode

go
func WithResponseStatusCode(ctx context.Context, code int) context.Context

WithResponseStatusCode trả về một context mới với status code.

type HandleErrorFunc

go
type HandleErrorFunc func(w http.ResponseWriter, r *http.Request, err error)

type JSONSSEMarshaler

go
type JSONSSEMarshaler struct{}

func (JSONSSEMarshaler) Marshal

go
func (j JSONSSEMarshaler) Marshal(ctx context.Context, payload any) (ServerSentEvent, error)

type MarshalMessageFunc

MarshalMessageFunc chuyển đổi message thành một HTTP request để gửi tới url chỉ định.

go
type MarshalMessageFunc func(url string, msg *brokers.Message) (*http.Request, error)

type Publisher

go
type Publisher struct {
    // contains filtered or unexported fields
}

func NewPublisher

go
func NewPublisher(opts ...publisherOption) *Publisher

NewPublisher tạo một Publisher mới. Nó publish các message nhận được dưới dạng HTTP requests. URL, method và payload của request được xác định bởi MarshalMessageFunc đã cấu hình.

func (*Publisher) Close

go
func (p *Publisher) Close() error

func (*Publisher) Publish

go
func (p *Publisher) Publish(topic string, messages ...*brokers.Message) error

type SSEMarshaler

go
type SSEMarshaler interface {
    Marshal(ctx context.Context, payload any) (ServerSentEvent, error)
}

type SSERouter

SSERouter là router xử lý Server-Sent Events.

go
type SSERouter struct {
    // contains filtered or unexported fields
}

func NewSSERouter

go
func NewSSERouter(opts ...sseRouterOption) SSERouter

NewSSERouter tạo một SSERouter mới.

func (SSERouter) AddHandler

go
func (r SSERouter) AddHandler(topic string, streamAdapter StreamAdapter) http.HandlerFunc

AddHandler bắt đầu một process mới cho một topic nhất định.

func (SSERouter) Close

go
func (r SSERouter) Close() error

Close dừng SSERouter.

func (SSERouter) Run

go
func (r SSERouter) Run(ctx context.Context) error

Run bắt đầu SSERouter.

func (SSERouter) Running

go
func (r SSERouter) Running() chan struct{}

Running đóng khi SSERouter đang chạy.

type ServerSentEvent

go
type ServerSentEvent struct {
    Event string
    Data  []byte
}

type StreamAdapter

go
type StreamAdapter interface {
    // InitialStreamResponse trả về phản hồi luồng (request) đầu tiên gửi lại cho client.
    // Bất kỳ lỗi nào xảy ra cần được xử lý và ghi vào `w`.
    // Trả về `ok` bằng false sẽ kết thúc xử lý yêu cầu HTTP.
    InitialStreamResponse(w http.ResponseWriter, r *http.Request) (response any, ok bool)
    // NextStreamResponse trả về phản hồi luồng tiếp theo gửi lại cho client.
    // Thông thường điều này liên quan đến việc kiểm tra một số loại ID mô hình được trích xuất từ `msg`.
    // Phản hồi chỉ được gửi lại cho client nếu `ok` là true.
    // Bất kỳ lỗi nào xảy ra cần được:
    //    1) ghi lại và bỏ qua, trả về (nil, false)
    //    2) gửi lại cho client, trả về (errorStruct, true)
    NextStreamResponse(r *http.Request, msg *brokers.Message) (response any, ok bool)
}

type StringSSEMarshaler

go
type StringSSEMarshaler struct{}

func (StringSSEMarshaler) Marshal

go
func (s StringSSEMarshaler) Marshal(ctx context.Context, payload any) (ServerSentEvent, error)

type Subscriber

Subscriber có thể subscribe các HTTP requests và tạo brokers' messages dựa trên chúng.

go
type Subscriber struct {
    // contains filtered or unexported fields
}

func NewSubscriber

go
func NewSubscriber(addr string, opts ...subscriberOption) *Subscriber

NewSubscriber tạo Subscriber mới.

addr là địa chỉ TCP để lắng nghe

logger là flog.FlogInf.

func (*Subscriber) Addr

go
func (s *Subscriber) Addr() net.Addr

Addr trả về địa chỉ server hoặc nil nếu server không chạy.

func (*Subscriber) Close

go
func (s *Subscriber) Close() error

func (*Subscriber) StartHTTPServer

go
func (s *Subscriber) StartHTTPServer() error

StartHTTPServer khởi động http server. Cần được gọi sau khi tất cả các Subscribe calls đã hoàn thành. Giống như http.Server.Serve(), nó trả về http.ErrServerClosed sau khi server đã đóng. https://golang.org/pkg/net/http/#Server.Serve

func (*Subscriber) Subscribe

go
func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *brokers.Message, error)

Subscribe thêm HTTP process sẽ lắng nghe tại url được cung cấp cho các messages.

Subscribe cần được gọi trước `StartHTTPServer`.

Khi request được gửi, nó sẽ chờ `Ack`. Khi Ack được nhận, status HTTP 200 sẽ được gửi. Khi Nack được gửi, status HTTP 500 sẽ được gửi.

type UnmarshalMessageFunc

go
type UnmarshalMessageFunc func(topic string, request *http.Request) (*brokers.Message, error)

Generated by gomarkdoc