Import path:
gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fka
fka
import "gitlab.soludian.com/soludian/fountain/libs/brokers/providers/fka"Index
- Constants
- Variables
- func DefaultSaramaSubscriberConfig() *sarama.Config
- func DefaultSaramaSyncPublisherConfig() *sarama.Config
- func MessageKeyFromCtx(ctx context.Context) ([]byte, bool)
- func MessagePartitionFromCtx(ctx context.Context) (int32, bool)
- func MessagePartitionOffsetFromCtx(ctx context.Context) (int64, bool)
- func MessageTimestampFromCtx(ctx context.Context) (time.Time, bool)
- func WithAddrs(addrs ...string) lib_3rd.Option[config]
- func WithConfig(conf *config) lib_3rd.Option[config]
- func WithEnvironment(env string) lib_3rd.Option[config]
- func WithLogger(logger ...flog.FlogInf) lib_3rd.Option[config]
- func WithName(name string) lib_3rd.Option[config]
- func WithNumPartitions(numPartitions int32) lib_3rd.Option[config]
- func WithReplicationFactor(replicationFactor int16) lib_3rd.Option[config]
- type DefaultMarshaler
- type GeneratePartitionKey
- type Marshaler
- type OtelSaramaTracer
- func (t OtelSaramaTracer) WrapConsumer(c sarama.Consumer) sarama.Consumer
- func (t OtelSaramaTracer) WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler
- func (t OtelSaramaTracer) WrapPartitionConsumer(pc sarama.PartitionConsumer) sarama.PartitionConsumer
- func (t OtelSaramaTracer) WrapSyncProducer(cfg *sarama.Config, p sarama.SyncProducer) sarama.SyncProducer
- type PartitionOffset
- type Publisher
- type SaramaTracer
- type Subscriber
Constants
const KPackageName = "fka"NoSleep có thể được đặt cho subscriberConfig.NackResendSleep và subscriberConfig.ReconnectRetrySleep.
const NoSleep time.Duration = -1const PackageName = "fka"const UUIDHeaderKey = "_fka_message_uuid"Variables
var GetFountainInstance = Lib.GetFountainInstancevar GetFountainManager = Lib.GetFountainManagerSử dụng khi config instance ở dạng key:value; Nếu config instance ở dạng key:array thì sử dụng hàm InstallFountainInstances Nếu config ở dạng key:array thì sẽ chỉ install config phần tử đầu tiên mà thôi
Install with config format <key>:<value>; eg: fka:<value>
Usage:
config.yaml:
fka:
name: default_name
...
code.go
fka.InstallFountainInstance()
fka.WithConfigKey("fka").InstallFountainInstance()var InstallFountainInstance = Lib.InstallFountainInstanceSử dụng khi config instance ở dạng key:array<value>; Sẽ luôn cố gắng khởi tạo kể cả khi config ở dạng key:value
Install with config format <key>:array<value>; eg: fka:array<value>
Usage:
config.yaml:
fka:
- name: default_name
...
code.go
fka.InstallFountainInstances()
fka.WithConfigKey("fka").InstallFountainInstances()var InstallFountainInstances = Lib.InstallFountainInstancesTruy cập thẳng tới bộ quản lý thư viện
var Lib = lib_3rd.NewLib(newClient, lib_3rd.WithDefaultConfigFunc[config, client](DefaultConfig))var WithConfigKey = Lib.WithConfigKeyfunc DefaultSaramaSubscriberConfig
func DefaultSaramaSubscriberConfig() *sarama.ConfigDefaultSaramaSubscriberConfig creates default Sarama config used by brokers.
Custom config can be passed to NewSubscriber and NewPublisher.
saramaConfig := DefaultSaramaSubscriberConfig()
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriberConfig.OverwriteSaramaConfig = saramaConfig
subscriber := NewSubscriber(subscriberConfig)func DefaultSaramaSyncPublisherConfig
func DefaultSaramaSyncPublisherConfig() *sarama.Configfunc MessageKeyFromCtx
func MessageKeyFromCtx(ctx context.Context) ([]byte, bool)MessageKeyFromCtx trả về khóa nội bộ Kafka của message đã xử lý
func MessagePartitionFromCtx
func MessagePartitionFromCtx(ctx context.Context) (int32, bool)MessagePartitionFromCtx trả về phân vùng Kafka của message đã nhận
func MessagePartitionOffsetFromCtx
func MessagePartitionOffsetFromCtx(ctx context.Context) (int64, bool)MessagePartitionOffsetFromCtx trả về độ lệch phân vùng Kafka của message đã nhận
func MessageTimestampFromCtx
func MessageTimestampFromCtx(ctx context.Context) (time.Time, bool)MessageTimestampFromCtx trả về dấu thời gian nội bộ Kafka của message đã nhận
func WithAddrs
func WithAddrs(addrs ...string) lib_3rd.Option[config]func WithConfig
func WithConfig(conf *config) lib_3rd.Option[config]WithAddrs set addresses
func WithEnvironment
func WithEnvironment(env string) lib_3rd.Option[config]func WithLogger
func WithLogger(logger ...flog.FlogInf) lib_3rd.Option[config]func WithName
func WithName(name string) lib_3rd.Option[config]func WithNumPartitions
func WithNumPartitions(numPartitions int32) lib_3rd.Option[config]func WithReplicationFactor
func WithReplicationFactor(replicationFactor int16) lib_3rd.Option[config]type DefaultMarshaler
type DefaultMarshaler struct{}func (DefaultMarshaler) Marshal
func (DefaultMarshaler) Marshal(topic string, msg *brokers.Message) (*sarama.ProducerMessage, error)Marshal chuyển đổi thông điệp của brokers.message sang Kafka message.
func (DefaultMarshaler) Unmarshal
func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*brokers.Message, error)Unmarshal chuyển đổi Kafka message sang thông điệp của brokers.message.
type GeneratePartitionKey
type GeneratePartitionKey func(topic string, msg *brokers.Message) (string, error)type Marshaler
Marshaler chuyển đổi thông điệp của brokers.message sang Kafka message và ngược lại.
type Marshaler interface {
Marshal(topic string, msg *brokers.Message) (*sarama.ProducerMessage, error)
Unmarshal(*sarama.ConsumerMessage) (*brokers.Message, error)
}func NewWithPartitioningMarshaler
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerNewWithPartitioningMarshaler tạo một Marshaler mới với phân vùng.
type OtelSaramaTracer
type OtelSaramaTracer struct{}func (OtelSaramaTracer) WrapConsumer
func (t OtelSaramaTracer) WrapConsumer(c sarama.Consumer) sarama.Consumerfunc (OtelSaramaTracer) WrapConsumerGroupHandler
func (t OtelSaramaTracer) WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandlerfunc (OtelSaramaTracer) WrapPartitionConsumer
func (t OtelSaramaTracer) WrapPartitionConsumer(pc sarama.PartitionConsumer) sarama.PartitionConsumerfunc (OtelSaramaTracer) WrapSyncProducer
func (t OtelSaramaTracer) WrapSyncProducer(cfg *sarama.Config, p sarama.SyncProducer) sarama.SyncProducertype PartitionOffset
type PartitionOffset map[int32]int64type Publisher
type Publisher struct {
// contains filtered or unexported fields
}func (*Publisher) Close
func (p *Publisher) Close() errorfunc (*Publisher) Publish
func (p *Publisher) Publish(topic string, msgs ...*brokers.Message) errorPublish publishes message to Kafka.
Publish is blocking and wait for ack from Kafka. When one of messages delivery fails - function is interrupted.
type SaramaTracer
type SaramaTracer interface {
WrapConsumer(sarama.Consumer) sarama.Consumer
WrapPartitionConsumer(sarama.PartitionConsumer) sarama.PartitionConsumer
WrapConsumerGroupHandler(sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler
WrapSyncProducer(*sarama.Config, sarama.SyncProducer) sarama.SyncProducer
}func NewOtelSaramaTracer
func NewOtelSaramaTracer() SaramaTracertype Subscriber
type Subscriber struct {
// contains filtered or unexported fields
}func (*Subscriber) Close
func (s *Subscriber) Close() errorfunc (*Subscriber) PartitionOffset
func (s *Subscriber) PartitionOffset(topic string) (PartitionOffset, error)func (*Subscriber) Subscribe
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *brokers.Message, error)Subscribe subscribers for messages in Kafka.
There are multiple subscribers spawned
func (*Subscriber) SubscribeInitialize
func (s *Subscriber) SubscribeInitialize(topic string) (err error)Generated by gomarkdoc