Skip to content

Import path: gitlab.soludian.com/soludian/fountain/libs/fsync

fsync

go
import "gitlab.soludian.com/soludian/fountain/libs/fsync"

Index

Constants

go
const (
    // Constant for an unbounded queue
    Unbounded               = math.MaxInt
    DefaultQueueSize        = Unbounded
    DefaultNonBlocking      = false
    LinkedBufferInitialSize = 1024
    LinkedBufferMaxCapacity = 100 * 1024
)

go
const KPackageName = "fsync"

Variables

go
var (
    ErrQueueFull             = errors.New("queue is full")
    ErrQueueEmpty            = errors.New("queue is empty")
    ErrPoolStopped           = errors.New("pool stopped")
    ErrMaxConcurrencyReached = errors.New("max concurrency reached")
)

go
var ErrContextCanceled = errors.New("context canceled")

go
var ErrGroupStopped = errors.New("task group stopped")

go
var ErrPanic = errors.New("task panicked")

go
var GetFountainInstance = Lib.GetFountainInstance

go
var GetFountainManager = Lib.GetFountainManager

go
var InstallFountainInstance = Lib.InstallFountainInstance

go
var InstallFountainInstanceOnce = Lib.InstallFountainInstanceOnce

go
var InstallFountainInstances = Lib.InstallFountainInstances

go
var Lib = lib_3rd.NewLib(newPoolFromConfig, lib_3rd.WithDefaultConfigFunc[config, Pool](DefaultConfig), lib_3rd.WithStatsCollectors[config, Pool](statsCollector))

go
var WithConfigKey = Lib.WithConfigKey

func InstallDefaultPool

go
func InstallDefaultPool(p Pool)

InstallDefaultPool replaces the default pool with a configured pool. Only the first call takes effect (sync.Once). Automatically called by the first InstallFountainInstance invocation. Pool cũ sẽ được dừng trong goroutine riêng để không chặn quá trình khởi động.

func WithConfigNonBlocking

go
func WithConfigNonBlocking(nonBlocking bool) lib_3rd.Option[config]

func WithConfigPanicRecovery

go
func WithConfigPanicRecovery(enable bool) lib_3rd.Option[config]

func WithConfigQueueSize

go
func WithConfigQueueSize(size int) lib_3rd.Option[config]

func WithEnvironment

go
func WithEnvironment(env string) lib_3rd.Option[config]

func WithLogger

go
func WithLogger(logger ...flog.FlogInf) lib_3rd.Option[config]

func WithMaxConcurrency

go
func WithMaxConcurrency(n int) lib_3rd.Option[config]

func WithName

go
func WithName(name string) lib_3rd.Option[config]

type Option

go
type Option func(*pool)

func WithContext

go
func WithContext(ctx context.Context) Option

WithContext sets the context for the pool.

func WithNonBlocking

go
func WithNonBlocking(nonBlocking bool) Option

WithNonBlocking sets the pool to be non-blocking when the queue is full. This option is only effective when the queue size is set.

func WithQueueSize

go
func WithQueueSize(size int) Option

WithQueueSize sets the max number of elements that can be queued in the pool.

func WithoutPanicRecovery

go
func WithoutPanicRecovery() Option

WithoutPanicRecovery disables panic interception inside worker goroutines. When this option is enabled, panics inside tasks will propagate just like regular goroutines.

type Pool

Represents a pool of goroutines that can execute tasks concurrently.

go
type Pool interface {

    // Submits a task to the pool without waiting for it to complete.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, this method will return ErrPoolStopped.
    Go(task func()) error

    // Submits a task to the pool and returns a future that can be used to wait for the task to complete.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, the returned future will resolve to ErrPoolStopped.
    Submit(task func()) Task

    // Submits a task to the pool and returns a future that can be used to wait for the task to complete.
    // The task function must return an error.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, the returned future will resolve to ErrPoolStopped.
    SubmitErr(task func() error) Task

    // Attempts to submit a task to the pool and returns a future that can be used to wait for the task to complete
    // and a boolean indicating whether the task was submitted successfully.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, the returned future will resolve to ErrPoolStopped.
    TrySubmit(task func()) (Task, bool)

    // Attempts to submit a task to the pool and returns a future that can be used to wait for the task to complete
    // and a boolean indicating whether the task was submitted successfully.
    // The task function must return an error.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, the returned future will resolve to ErrPoolStopped.
    TrySubmitErr(task func() error) (Task, bool)

    // Creates a new subpool with the specified maximum concurrency and options.
    NewSubpool(maxConcurrency int, options ...Option) Pool

    // Creates a new task group.
    NewGroup() TaskGroup

    // Creates a new task group with the specified context.
    NewGroupContext(ctx context.Context) TaskGroup
    // contains filtered or unexported methods
}

func DefaultPool

go
func DefaultPool() Pool

DefaultPool returns the current default pool instance.

func NewPool

go
func NewPool(maxConcurrency int, options ...Option) Pool

NewPool creates a new pool with the given maximum concurrency and options. The new maximum concurrency must be greater than or equal to 0 (0 means no limit).

func NewSubpool

go
func NewSubpool(maxConcurrency int) Pool

NewSubpool creates a new subpool with the default pool.

type Result

Result is deprecated. Use ResultTask instead. This interface is maintained for backward compatibility.

Deprecated: Use ResultTask instead.

go
type Result[R any] interface {

    // Done returns a channel that is closed when the task is complete or has failed.
    Done() <-chan struct{}

    // Wait waits for the task to complete and returns the result and any error that occurred.
    Wait() (R, error)
}

type ResultPool

ResultPool is a pool that can be used to submit tasks that return a result.

go
type ResultPool[R any] interface {

    // Submits a task to the pool and returns a future that can be used to wait for the task to complete and get the result.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, this method will return ErrPoolStopped.
    Submit(task func() R) ResultTask[R]

    // Submits a task to the pool and returns a future that can be used to wait for the task to complete and get the result.
    // The task function must return a result and an error.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, this method will return ErrPoolStopped.
    SubmitErr(task func() (R, error)) ResultTask[R]

    // Attempts to submit a task to the pool and returns a future that can be used to wait for the task to complete
    // and a boolean indicating whether the task was submitted successfully.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, this method will return ErrPoolStopped.
    TrySubmit(task func() R) (ResultTask[R], bool)

    // Attempts to submit a task to the pool and returns a future that can be used to wait for the task to complete
    // and a boolean indicating whether the task was submitted successfully.
    // The task function must return a result and an error.
    // The pool will not accept new tasks after it has been stopped.
    // If the pool has been stopped, this method will return ErrPoolStopped.
    TrySubmitErr(task func() (R, error)) (ResultTask[R], bool)

    // Creates a new subpool with the specified maximum concurrency and options.
    NewSubpool(maxConcurrency int, options ...Option) ResultPool[R]

    // Creates a new task group.
    NewGroup() ResultTaskGroup[R]

    // Creates a new task group with the specified context.
    NewGroupContext(ctx context.Context) ResultTaskGroup[R]
    // contains filtered or unexported methods
}

func NewResultPool

go
func NewResultPool[R any](maxConcurrency int, options ...Option) ResultPool[R]

NewResultPool creates a new result pool with the given maximum concurrency and options. Result pools are generic pools that can be used to submit tasks that return a result. The new maximum concurrency must be greater than or equal to 0 (0 means no limit).

type ResultTask

ResultTask represents a task that yields a result. If the task fails, the error can be retrieved.

go
type ResultTask[R any] interface {

    // Done returns a channel that is closed when the task is complete or has failed.
    Done() <-chan struct{}

    // Wait waits for the task to complete and returns the result and any error that occurred.
    Wait() (R, error)
}

type ResultTaskGroup

ResultTaskGroup represents a group of tasks that can be executed concurrently. As opposed to TaskGroup, the tasks in a ResultTaskGroup yield a result. The group can be waited on to block until all tasks have completed. If any of the tasks return an error, the group will return the first error encountered.

go
type ResultTaskGroup[O any] interface {

    // Submits a task to the group.
    Submit(tasks ...func() O) ResultTaskGroup[O]

    // Submits a task to the group that can return an error.
    SubmitErr(tasks ...func() (O, error)) ResultTaskGroup[O]

    // Waits for all tasks in the group to complete and returns the results of each task in the order they were submitted.
    // If any of the tasks return an error, the group will return the first error encountered.
    // If the context is cancelled, the group will return the context error.
    // If the group is stopped, the group will return ErrGroupStopped.
    // If a task is running when the context is cancelled or the group is stopped, the task will be allowed to complete before returning.
    Wait() ([]O, error)

    // Returns a channel that is closed when all tasks in the group have completed, a task returns an error, or the group is stopped.
    Done() <-chan struct{}

    // Stops the group and cancels all remaining tasks. Running tasks are not interrupted.
    Stop()
}

type SingleflightGroup

SingleflightGroup represents a class of work and forms a namespace in which units of work can be executed with duplicate suppression. The zero value is ready to use.

go
type SingleflightGroup[T any] struct {
    // contains filtered or unexported fields
}

func (*SingleflightGroup[T]) Do

go
func (g *SingleflightGroup[T]) Do(key string, fn func() (T, error)) (v T, err error, shared bool)

Do executes and returns the results of the given function, making sure that only one execution is in-flight for a given key at a time. If a duplicate comes in, the duplicate caller waits for the original to complete and receives the same results.

func (*SingleflightGroup[T]) DoChan

go
func (g *SingleflightGroup[T]) DoChan(key string, fn func() (T, error)) <-chan SingleflightResult[T]

DoChan is like Do but returns a channel that will receive the results when they are ready. The returned channel will not be closed.

func (*SingleflightGroup[T]) Forget

go
func (g *SingleflightGroup[T]) Forget(key string)

Forget tells the SingleflightGroup to forget about a key. Future calls to Do for this key will call the function rather than waiting for an earlier call to complete.

type SingleflightResult

SingleflightResult holds the results of DoChan, so they can be passed on a channel.

go
type SingleflightResult[T any] struct {
    Val    T
    Err    error
    Shared bool
}

type Task

Task represents a task that can be waited on. If the task fails, the error can be retrieved.

go
type Task interface {

    // Done returns a channel that is closed when the task is complete or has failed.
    Done() <-chan struct{}

    // Wait waits for the task to complete and returns any error that occurred.
    Wait() error
}

func Submit

go
func Submit(task func()) Task

Submit submits a task to the default pool and returns a future that can be used to wait for the task to complete.

func SubmitErr

go
func SubmitErr(task func() error) Task

SubmitErr submits a task to the default pool and returns a future that can be used to wait for the task to complete.

type TaskGroup

TaskGroup represents a group of tasks that can be executed concurrently. The group can be waited on to block until all tasks have completed. If any of the tasks return an error, the group will return the first error encountered.

go
type TaskGroup interface {

    // Submits a task to the group.
    Submit(tasks ...func()) TaskGroup

    // Submits a task to the group that can return an error.
    SubmitErr(tasks ...func() error) TaskGroup

    // Waits for all tasks in the group to complete.
    // If any of the tasks return an error, the group will return the first error encountered.
    // If the context is cancelled, the group will return the context error.
    // If the group is stopped, the group will return ErrGroupStopped.
    // If a task is running when the context is cancelled or the group is stopped, the task will be allowed to complete before returning.
    Wait() error

    // Returns a channel that is closed when all tasks in the group have completed, a task returns an error, or the group is stopped.
    Done() <-chan struct{}

    // Stops the group and cancels all remaining tasks. Running tasks are not interrupted.
    Stop()

    // Returns the context associated with this group.
    // This context will be cancelled when either the parent context is cancelled
    // or any task in the group returns an error, whichever comes first.
    Context() context.Context
}

func NewGroup

go
func NewGroup() TaskGroup

NewGroup creates a new task group with the default pool.

Generated by gomarkdoc