1
0
mirror of https://github.com/go-gitea/gitea.git synced 2024-09-01 14:56:30 +00:00
gitea/modules/queue/queue_bytefifo.go
zeripath 382101ecc7
In disk_channel queues synchronously push to disk on shutdown () ()
Partial Backport of 

Instead of using an asynchronous goroutine to push to disk on shutdown
just close the datachan and immediately push to the disk.

Prevents messages of incompletely flushed queues.

Signed-off-by: Andrew Thornton <art27@cantab.net>

Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
2022-02-22 20:08:35 +08:00

319 lines
8.2 KiB
Go

// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"fmt"
"sync"
"time"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
Name string
WaitOnEmpty bool
}
var _ Queue = &ByteFIFOQueue{}
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
type ByteFIFOQueue struct {
*WorkerPool
byteFIFO ByteFIFO
typ Type
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
terminateCtxCancel context.CancelFunc
exemplar interface{}
workers int
name string
lock sync.Mutex
waitOnEmpty bool
pushed chan struct{}
}
// NewByteFIFOQueue creates a new ByteFIFOQueue
func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) {
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(ByteFIFOQueueConfiguration)
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
return &ByteFIFOQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO,
typ: typ,
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
terminateCtxCancel: terminateCtxCancel,
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
waitOnEmpty: config.WaitOnEmpty,
pushed: make(chan struct{}, 1),
}, nil
}
// Name returns the name of this queue
func (q *ByteFIFOQueue) Name() string {
return q.name
}
// Push pushes data to the fifo
func (q *ByteFIFOQueue) Push(data Data) error {
return q.PushFunc(data, nil)
}
// PushFunc pushes data to the fifo
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
return err
}
if q.waitOnEmpty {
defer func() {
select {
case q.pushed <- struct{}{}:
default:
}
}()
}
return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
}
// IsEmpty checks if the queue is empty
func (q *ByteFIFOQueue) IsEmpty() bool {
q.lock.Lock()
defer q.lock.Unlock()
if !q.WorkerPool.IsEmpty() {
return false
}
return q.byteFIFO.Len(q.terminateCtx) == 0
}
// Run runs the bytefifo queue
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name)
_ = q.AddWorkers(q.workers, 0)
log.Trace("%s: %s Now running", q.typ, q.name)
q.readToChan()
<-q.shutdownCtx.Done()
log.Trace("%s: %s Waiting til done", q.typ, q.name)
q.Wait()
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
q.CleanUp(q.terminateCtx)
q.terminateCtxCancel()
}
const maxBackOffTime = time.Second * 3
func (q *ByteFIFOQueue) readToChan() {
// handle quick cancels
select {
case <-q.shutdownCtx.Done():
// tell the pool to shutdown.
q.baseCtxCancel()
return
default:
}
// Default backoff values
backOffTime := time.Millisecond * 100
loop:
for {
err := q.doPop()
if err == errQueueEmpty {
log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
select {
case <-q.pushed:
// reset backOffTime
backOffTime = 100 * time.Millisecond
continue loop
case <-q.shutdownCtx.Done():
// Oops we've been shutdown whilst waiting
// Make sure the worker pool is shutdown too
q.baseCtxCancel()
return
}
}
// Reset the backOffTime if there is no error or an unmarshalError
if err == nil || err == errUnmarshal {
backOffTime = 100 * time.Millisecond
}
if err != nil {
// Need to Backoff
select {
case <-q.shutdownCtx.Done():
// Oops we've been shutdown whilst backing off
// Make sure the worker pool is shutdown too
q.baseCtxCancel()
return
case <-time.After(backOffTime):
// OK we've waited - so backoff a bit
backOffTime += backOffTime / 2
if backOffTime > maxBackOffTime {
backOffTime = maxBackOffTime
}
continue loop
}
}
select {
case <-q.shutdownCtx.Done():
// Oops we've been shutdown
// Make sure the worker pool is shutdown too
q.baseCtxCancel()
return
default:
continue loop
}
}
}
var (
errQueueEmpty = fmt.Errorf("empty queue")
errEmptyBytes = fmt.Errorf("empty bytes")
errUnmarshal = fmt.Errorf("failed to unmarshal")
)
func (q *ByteFIFOQueue) doPop() error {
q.lock.Lock()
defer q.lock.Unlock()
bs, err := q.byteFIFO.Pop(q.shutdownCtx)
if err != nil {
if err == context.Canceled {
q.baseCtxCancel()
return err
}
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
return err
}
if len(bs) == 0 {
if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 {
return errQueueEmpty
}
return errEmptyBytes
}
data, err := unmarshalAs(bs, q.exemplar)
if err != nil {
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
return errUnmarshal
}
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
q.WorkerPool.Push(data)
return nil
}
// Shutdown processing from this queue
func (q *ByteFIFOQueue) Shutdown() {
log.Trace("%s: %s Shutting down", q.typ, q.name)
select {
case <-q.shutdownCtx.Done():
return
default:
}
q.shutdownCtxCancel()
log.Debug("%s: %s Shutdown", q.typ, q.name)
}
// IsShutdown returns a channel which is closed when this Queue is shutdown
func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} {
return q.shutdownCtx.Done()
}
// Terminate this queue and close the queue
func (q *ByteFIFOQueue) Terminate() {
log.Trace("%s: %s Terminating", q.typ, q.name)
q.Shutdown()
select {
case <-q.terminateCtx.Done():
return
default:
}
if log.IsDebug() {
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx))
}
q.terminateCtxCancel()
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
log.Debug("%s: %s Terminated", q.typ, q.name)
}
// IsTerminated returns a channel which is closed when this Queue is terminated
func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} {
return q.terminateCtx.Done()
}
var _ UniqueQueue = &ByteFIFOUniqueQueue{}
// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
type ByteFIFOUniqueQueue struct {
ByteFIFOQueue
}
// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) {
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(ByteFIFOQueueConfiguration)
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
return &ByteFIFOUniqueQueue{
ByteFIFOQueue: ByteFIFOQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO,
typ: typ,
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
terminateCtxCancel: terminateCtxCancel,
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
},
}, nil
}
// Has checks if the provided data is in the queue
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if !assignableTo(data, q.exemplar) {
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
return false, err
}
return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs)
}