Skip to content

Import path: gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fka

fka

go
import "gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fka"

Index

Constants

go
const KPackageName = "fka"

NoSleep có thể được đặt cho subscriberConfig.NackResendSleep và subscriberConfig.ReconnectRetrySleep.

go
const NoSleep time.Duration = -1

go
const PackageName = "fka"

go
const UUIDHeaderKey = "_fka_message_uuid"

Variables

go
var GetFountainInstance = Lib.GetFountainInstance

go
var GetFountainManager = Lib.GetFountainManager

Sử dụng khi config instance ở dạng key:value; Nếu config instance ở dạng key:array thì sử dụng hàm InstallFountainInstances Nếu config ở dạng key:array thì sẽ chỉ install config phần tử đầu tiên mà thôi

Install with config format <key>:<value>; eg: fka:<value>

Usage:

config.yaml:

	fka:
	  name: default_name
	  ...

	code.go

	fka.InstallFountainInstance()

 fka.WithConfigKey("fka").InstallFountainInstance()
go
var InstallFountainInstance = Lib.InstallFountainInstance

Sử dụng khi config instance ở dạng key:array<value>; Sẽ luôn cố gắng khởi tạo kể cả khi config ở dạng key:value

Install with config format <key>:array<value>; eg: fka:array<value>

Usage:

config.yaml:

fka:
  - name: default_name
    ...

code.go

fka.InstallFountainInstances()

fka.WithConfigKey("fka").InstallFountainInstances()
go
var InstallFountainInstances = Lib.InstallFountainInstances

Truy cập thẳng tới bộ quản lý thư viện

go
var Lib = lib_3rd.NewLib(newClient, lib_3rd.WithDefaultConfigFunc[config, client](DefaultConfig))

go
var WithConfigKey = Lib.WithConfigKey

func DefaultSaramaSubscriberConfig

go
func DefaultSaramaSubscriberConfig() *sarama.Config

DefaultSaramaSubscriberConfig creates default Sarama config used by brokers.

Custom config can be passed to NewSubscriber and NewPublisher.

saramaConfig := DefaultSaramaSubscriberConfig()
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

subscriberConfig.OverwriteSaramaConfig = saramaConfig

subscriber := NewSubscriber(subscriberConfig)

func DefaultSaramaSyncPublisherConfig

go
func DefaultSaramaSyncPublisherConfig() *sarama.Config

func MessageKeyFromCtx

go
func MessageKeyFromCtx(ctx context.Context) ([]byte, bool)

MessageKeyFromCtx trả về khóa nội bộ Kafka của message đã xử lý

func MessagePartitionFromCtx

go
func MessagePartitionFromCtx(ctx context.Context) (int32, bool)

MessagePartitionFromCtx trả về phân vùng Kafka của message đã nhận

func MessagePartitionOffsetFromCtx

go
func MessagePartitionOffsetFromCtx(ctx context.Context) (int64, bool)

MessagePartitionOffsetFromCtx trả về độ lệch phân vùng Kafka của message đã nhận

func MessageTimestampFromCtx

go
func MessageTimestampFromCtx(ctx context.Context) (time.Time, bool)

MessageTimestampFromCtx trả về dấu thời gian nội bộ Kafka của message đã nhận

func WithAddrs

go
func WithAddrs(addrs ...string) lib_3rd.Option[config]

func WithConfig

go
func WithConfig(conf *config) lib_3rd.Option[config]

WithAddrs set addresses

func WithEnvironment

go
func WithEnvironment(env string) lib_3rd.Option[config]

func WithLogger

go
func WithLogger(logger ...flog.FlogInf) lib_3rd.Option[config]

func WithName

go
func WithName(name string) lib_3rd.Option[config]

func WithNumPartitions

go
func WithNumPartitions(numPartitions int32) lib_3rd.Option[config]

func WithReplicationFactor

go
func WithReplicationFactor(replicationFactor int16) lib_3rd.Option[config]

type DefaultMarshaler

go
type DefaultMarshaler struct{}

func (DefaultMarshaler) Marshal

go
func (DefaultMarshaler) Marshal(topic string, msg *brokers.Message) (*sarama.ProducerMessage, error)

Marshal chuyển đổi thông điệp của brokers.message sang Kafka message.

func (DefaultMarshaler) Unmarshal

go
func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*brokers.Message, error)

Unmarshal chuyển đổi Kafka message sang thông điệp của brokers.message.

type GeneratePartitionKey

go
type GeneratePartitionKey func(topic string, msg *brokers.Message) (string, error)

type Marshaler

Marshaler chuyển đổi thông điệp của brokers.message sang Kafka message và ngược lại.

go
type Marshaler interface {
    Marshal(topic string, msg *brokers.Message) (*sarama.ProducerMessage, error)
    Unmarshal(*sarama.ConsumerMessage) (*brokers.Message, error)
}

func NewWithPartitioningMarshaler

go
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) Marshaler

NewWithPartitioningMarshaler tạo một Marshaler mới với phân vùng.

type OtelSaramaTracer

go
type OtelSaramaTracer struct{}

func (OtelSaramaTracer) WrapConsumer

go
func (t OtelSaramaTracer) WrapConsumer(c sarama.Consumer) sarama.Consumer

func (OtelSaramaTracer) WrapConsumerGroupHandler

go
func (t OtelSaramaTracer) WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler

func (OtelSaramaTracer) WrapPartitionConsumer

go
func (t OtelSaramaTracer) WrapPartitionConsumer(pc sarama.PartitionConsumer) sarama.PartitionConsumer

func (OtelSaramaTracer) WrapSyncProducer

go
func (t OtelSaramaTracer) WrapSyncProducer(cfg *sarama.Config, p sarama.SyncProducer) sarama.SyncProducer

type PartitionOffset

go
type PartitionOffset map[int32]int64

type Publisher

go
type Publisher struct {
    // contains filtered or unexported fields
}

func (*Publisher) Close

go
func (p *Publisher) Close() error

func (*Publisher) Publish

go
func (p *Publisher) Publish(topic string, msgs ...*brokers.Message) error

Publish publishes message to Kafka.

Publish is blocking and wait for ack from Kafka. When one of messages delivery fails - function is interrupted.

type SaramaTracer

go
type SaramaTracer interface {
    WrapConsumer(sarama.Consumer) sarama.Consumer
    WrapPartitionConsumer(sarama.PartitionConsumer) sarama.PartitionConsumer
    WrapConsumerGroupHandler(sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler
    WrapSyncProducer(*sarama.Config, sarama.SyncProducer) sarama.SyncProducer
}

func NewOtelSaramaTracer

go
func NewOtelSaramaTracer() SaramaTracer

type Subscriber

go
type Subscriber struct {
    // contains filtered or unexported fields
}

func (*Subscriber) Close

go
func (s *Subscriber) Close() error

func (*Subscriber) PartitionOffset

go
func (s *Subscriber) PartitionOffset(topic string) (PartitionOffset, error)

func (*Subscriber) Subscribe

go
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)

Subscribe subscribers for messages in Kafka.

There are multiple subscribers spawned

func (*Subscriber) SubscribeInitialize

go
func (s *Subscriber) SubscribeInitialize(topic string) (err error)

Generated by gomarkdoc