Import path:
gitlab.soludian.com/soludian/fountain/libs/brokers/providers/sse
sse
import "gitlab.soludian.com/soludian/fountain/libs/brokers/providers/sse"Index
- Constants
- Variables
- func DefaultErrorHandler(w http.ResponseWriter, r *http.Request, err error)
- func DefaultMarshalMessageFunc(url string, msg *brokers.Message) (*http.Request, error)
- func DefaultUnmarshalMessageFunc(topic string, req *http.Request) (*brokers.Message, error)
- func SetResponseStatusCode(m *brokers.Message, code int) *brokers.Message
- func StatusCodeFromContext(ctx context.Context, otherwise int) int
- func WithResponseStatusCode(ctx context.Context, code int) context.Context
- type HandleErrorFunc
- type JSONSSEMarshaler
- type MarshalMessageFunc
- type Publisher
- type SSEMarshaler
- type SSERouter
- type ServerSentEvent
- type StreamAdapter
- type StringSSEMarshaler
- type Subscriber
- type UnmarshalMessageFunc
Constants
const (
HeaderUUID = "message-uuid"
HeaderMetadata = "message-metadata"
ProviderName = "http"
)const PackageName = "sse"Variables
var (
// ErrPublisherClosed xảy ra khi cố gắng publish tới một topic trong khi publisher đã đóng hoặc đang đóng.
ErrPublisherClosed = errors.New("publisher is closed")
ErrNoMarshalFunc = errors.New("marshal function is missing")
ErrErrorResponse = errors.New("server responded with error status")
)func DefaultErrorHandler
func DefaultErrorHandler(w http.ResponseWriter, r *http.Request, err error)DefaultErrorHandler ghi phản hồi lỗi JSON cùng với mã Lỗi Máy chủ Nội bộ (500).
func DefaultMarshalMessageFunc
func DefaultMarshalMessageFunc(url string, msg *brokers.Message) (*http.Request, error)DefaultMarshalMessageFunc chuyển đổi message thành một HTTP POST request. Nó mã hóa UUID và Metadata trong các header của request.
func DefaultUnmarshalMessageFunc
func DefaultUnmarshalMessageFunc(topic string, req *http.Request) (*brokers.Message, error)DefaultUnmarshalMessageFunc lấy UUID và Metadata từ request headers, như được mã hóa bởi DefaultMarshalMessageFunc.
func SetResponseStatusCode
func SetResponseStatusCode(m *brokers.Message, code int) *brokers.MessageSetResponseStatusCode đặt một http status code cho message đã cho.
func StatusCodeFromContext
func StatusCodeFromContext(ctx context.Context, otherwise int) intStatusCodeFromContext trả về status code từ context.
func WithResponseStatusCode
func WithResponseStatusCode(ctx context.Context, code int) context.ContextWithResponseStatusCode trả về một context mới với status code.
type HandleErrorFunc
type HandleErrorFunc func(w http.ResponseWriter, r *http.Request, err error)type JSONSSEMarshaler
type JSONSSEMarshaler struct{}func (JSONSSEMarshaler) Marshal
func (j JSONSSEMarshaler) Marshal(ctx context.Context, payload any) (ServerSentEvent, error)type MarshalMessageFunc
MarshalMessageFunc chuyển đổi message thành một HTTP request để gửi tới url chỉ định.
type MarshalMessageFunc func(url string, msg *brokers.Message) (*http.Request, error)type Publisher
type Publisher struct {
// contains filtered or unexported fields
}func NewPublisher
func NewPublisher(opts ...publisherOption) *PublisherNewPublisher tạo một Publisher mới. Nó publish các message nhận được dưới dạng HTTP requests. URL, method và payload của request được xác định bởi MarshalMessageFunc đã cấu hình.
func (*Publisher) Close
func (p *Publisher) Close() errorfunc (*Publisher) Publish
func (p *Publisher) Publish(topic string, messages ...*brokers.Message) errortype SSEMarshaler
type SSEMarshaler interface {
Marshal(ctx context.Context, payload any) (ServerSentEvent, error)
}type SSERouter
SSERouter là router xử lý Server-Sent Events.
type SSERouter struct {
// contains filtered or unexported fields
}func NewSSERouter
func NewSSERouter(opts ...sseRouterOption) SSERouterNewSSERouter tạo một SSERouter mới.
func (SSERouter) AddHandler
func (r SSERouter) AddHandler(topic string, streamAdapter StreamAdapter) http.HandlerFuncAddHandler bắt đầu một process mới cho một topic nhất định.
func (SSERouter) Close
func (r SSERouter) Close() errorClose dừng SSERouter.
func (SSERouter) Run
func (r SSERouter) Run(ctx context.Context) errorRun bắt đầu SSERouter.
func (SSERouter) Running
func (r SSERouter) Running() chan struct{}Running đóng khi SSERouter đang chạy.
type ServerSentEvent
type ServerSentEvent struct {
Event string
Data []byte
}type StreamAdapter
type StreamAdapter interface {
// InitialStreamResponse trả về phản hồi luồng (request) đầu tiên gửi lại cho client.
// Bất kỳ lỗi nào xảy ra cần được xử lý và ghi vào `w`.
// Trả về `ok` bằng false sẽ kết thúc xử lý yêu cầu HTTP.
InitialStreamResponse(w http.ResponseWriter, r *http.Request) (response any, ok bool)
// NextStreamResponse trả về phản hồi luồng tiếp theo gửi lại cho client.
// Thông thường điều này liên quan đến việc kiểm tra một số loại ID mô hình được trích xuất từ `msg`.
// Phản hồi chỉ được gửi lại cho client nếu `ok` là true.
// Bất kỳ lỗi nào xảy ra cần được:
// 1) ghi lại và bỏ qua, trả về (nil, false)
// 2) gửi lại cho client, trả về (errorStruct, true)
NextStreamResponse(r *http.Request, msg *brokers.Message) (response any, ok bool)
}type StringSSEMarshaler
type StringSSEMarshaler struct{}func (StringSSEMarshaler) Marshal
func (s StringSSEMarshaler) Marshal(ctx context.Context, payload any) (ServerSentEvent, error)type Subscriber
Subscriber có thể subscribe các HTTP requests và tạo brokers' messages dựa trên chúng.
type Subscriber struct {
// contains filtered or unexported fields
}func NewSubscriber
func NewSubscriber(addr string, opts ...subscriberOption) *SubscriberNewSubscriber tạo Subscriber mới.
addr là địa chỉ TCP để lắng nghe
logger là flog.FlogInf.
func (*Subscriber) Addr
func (s *Subscriber) Addr() net.AddrAddr trả về địa chỉ server hoặc nil nếu server không chạy.
func (*Subscriber) Close
func (s *Subscriber) Close() errorfunc (*Subscriber) StartHTTPServer
func (s *Subscriber) StartHTTPServer() errorStartHTTPServer khởi động http server. Cần được gọi sau khi tất cả các Subscribe calls đã hoàn thành. Giống như http.Server.Serve(), nó trả về http.ErrServerClosed sau khi server đã đóng. https://golang.org/pkg/net/http/#Server.Serve
func (*Subscriber) Subscribe
func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *brokers.Message, error)Subscribe thêm HTTP process sẽ lắng nghe tại url được cung cấp cho các messages.
Subscribe cần được gọi trước `StartHTTPServer`.
Khi request được gửi, nó sẽ chờ `Ack`. Khi Ack được nhận, status HTTP 200 sẽ được gửi. Khi Nack được gửi, status HTTP 500 sẽ được gửi.
type UnmarshalMessageFunc
type UnmarshalMessageFunc func(topic string, request *http.Request) (*brokers.Message, error)Generated by gomarkdoc