forked from go-gitea/gitea
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Unique Queue infrastructure and move TestPullRequests to this (go…
…-gitea#9856) * Upgrade levelqueue to version 0.2.0 This adds functionality for Unique Queues * Add UniqueQueue interface and functions to create them * Add UniqueQueue implementations * Move TestPullRequests over to use UniqueQueue * Reduce code duplication * Add bytefifos * Ensure invalid types are logged * Fix close race in PersistableChannelQueue Shutdown
- Loading branch information
Showing
29 changed files
with
1,934 additions
and
500 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
// Copyright 2020 The Gitea Authors. All rights reserved. | ||
// Use of this source code is governed by a MIT-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package queue | ||
|
||
// ByteFIFO defines a FIFO that takes a byte array | ||
type ByteFIFO interface { | ||
// Len returns the length of the fifo | ||
Len() int64 | ||
// PushFunc pushes data to the end of the fifo and calls the callback if it is added | ||
PushFunc(data []byte, fn func() error) error | ||
// Pop pops data from the start of the fifo | ||
Pop() ([]byte, error) | ||
// Close this fifo | ||
Close() error | ||
} | ||
|
||
// UniqueByteFIFO defines a FIFO that Uniques its contents | ||
type UniqueByteFIFO interface { | ||
ByteFIFO | ||
// Has returns whether the fifo contains this data | ||
Has(data []byte) (bool, error) | ||
} | ||
|
||
var _ (ByteFIFO) = &DummyByteFIFO{} | ||
|
||
// DummyByteFIFO represents a dummy fifo | ||
type DummyByteFIFO struct{} | ||
|
||
// PushFunc returns nil | ||
func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error { | ||
return nil | ||
} | ||
|
||
// Pop returns nil | ||
func (*DummyByteFIFO) Pop() ([]byte, error) { | ||
return []byte{}, nil | ||
} | ||
|
||
// Close returns nil | ||
func (*DummyByteFIFO) Close() error { | ||
return nil | ||
} | ||
|
||
// Len is always 0 | ||
func (*DummyByteFIFO) Len() int64 { | ||
return 0 | ||
} | ||
|
||
var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{} | ||
|
||
// DummyUniqueByteFIFO represents a dummy unique fifo | ||
type DummyUniqueByteFIFO struct { | ||
DummyByteFIFO | ||
} | ||
|
||
// Has always returns false | ||
func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) { | ||
return false, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
// Copyright 2020 The Gitea Authors. All rights reserved. | ||
// Use of this source code is governed by a MIT-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package queue | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"code.gitea.io/gitea/modules/log" | ||
) | ||
|
||
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue | ||
type ByteFIFOQueueConfiguration struct { | ||
WorkerPoolConfiguration | ||
Workers int | ||
Name string | ||
} | ||
|
||
var _ (Queue) = &ByteFIFOQueue{} | ||
|
||
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool | ||
type ByteFIFOQueue struct { | ||
*WorkerPool | ||
byteFIFO ByteFIFO | ||
typ Type | ||
closed chan struct{} | ||
terminated chan struct{} | ||
exemplar interface{} | ||
workers int | ||
name string | ||
lock sync.Mutex | ||
} | ||
|
||
// NewByteFIFOQueue creates a new ByteFIFOQueue | ||
func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) { | ||
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
config := configInterface.(ByteFIFOQueueConfiguration) | ||
|
||
return &ByteFIFOQueue{ | ||
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), | ||
byteFIFO: byteFIFO, | ||
typ: typ, | ||
closed: make(chan struct{}), | ||
terminated: make(chan struct{}), | ||
exemplar: exemplar, | ||
workers: config.Workers, | ||
name: config.Name, | ||
}, nil | ||
} | ||
|
||
// Name returns the name of this queue | ||
func (q *ByteFIFOQueue) Name() string { | ||
return q.name | ||
} | ||
|
||
// Push pushes data to the fifo | ||
func (q *ByteFIFOQueue) Push(data Data) error { | ||
return q.PushFunc(data, nil) | ||
} | ||
|
||
// PushFunc pushes data to the fifo | ||
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { | ||
if !assignableTo(data, q.exemplar) { | ||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) | ||
} | ||
bs, err := json.Marshal(data) | ||
if err != nil { | ||
return err | ||
} | ||
return q.byteFIFO.PushFunc(bs, fn) | ||
} | ||
|
||
// IsEmpty checks if the queue is empty | ||
func (q *ByteFIFOQueue) IsEmpty() bool { | ||
q.lock.Lock() | ||
defer q.lock.Unlock() | ||
if !q.WorkerPool.IsEmpty() { | ||
return false | ||
} | ||
return q.byteFIFO.Len() == 0 | ||
} | ||
|
||
// Run runs the bytefifo queue | ||
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { | ||
atShutdown(context.Background(), q.Shutdown) | ||
atTerminate(context.Background(), q.Terminate) | ||
log.Debug("%s: %s Starting", q.typ, q.name) | ||
|
||
go func() { | ||
_ = q.AddWorkers(q.workers, 0) | ||
}() | ||
|
||
go q.readToChan() | ||
|
||
log.Trace("%s: %s Waiting til closed", q.typ, q.name) | ||
<-q.closed | ||
log.Trace("%s: %s Waiting til done", q.typ, q.name) | ||
q.Wait() | ||
|
||
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
atTerminate(ctx, cancel) | ||
q.CleanUp(ctx) | ||
cancel() | ||
} | ||
|
||
func (q *ByteFIFOQueue) readToChan() { | ||
for { | ||
select { | ||
case <-q.closed: | ||
// tell the pool to shutdown. | ||
q.cancel() | ||
return | ||
default: | ||
q.lock.Lock() | ||
bs, err := q.byteFIFO.Pop() | ||
if err != nil { | ||
q.lock.Unlock() | ||
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) | ||
time.Sleep(time.Millisecond * 100) | ||
continue | ||
} | ||
|
||
if len(bs) == 0 { | ||
q.lock.Unlock() | ||
time.Sleep(time.Millisecond * 100) | ||
continue | ||
} | ||
|
||
data, err := unmarshalAs(bs, q.exemplar) | ||
if err != nil { | ||
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) | ||
q.lock.Unlock() | ||
time.Sleep(time.Millisecond * 100) | ||
continue | ||
} | ||
|
||
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) | ||
q.WorkerPool.Push(data) | ||
q.lock.Unlock() | ||
} | ||
} | ||
} | ||
|
||
// Shutdown processing from this queue | ||
func (q *ByteFIFOQueue) Shutdown() { | ||
log.Trace("%s: %s Shutting down", q.typ, q.name) | ||
q.lock.Lock() | ||
select { | ||
case <-q.closed: | ||
default: | ||
close(q.closed) | ||
} | ||
q.lock.Unlock() | ||
log.Debug("%s: %s Shutdown", q.typ, q.name) | ||
} | ||
|
||
// Terminate this queue and close the queue | ||
func (q *ByteFIFOQueue) Terminate() { | ||
log.Trace("%s: %s Terminating", q.typ, q.name) | ||
q.Shutdown() | ||
q.lock.Lock() | ||
select { | ||
case <-q.terminated: | ||
q.lock.Unlock() | ||
return | ||
default: | ||
} | ||
close(q.terminated) | ||
q.lock.Unlock() | ||
if log.IsDebug() { | ||
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) | ||
} | ||
if err := q.byteFIFO.Close(); err != nil { | ||
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) | ||
} | ||
log.Debug("%s: %s Terminated", q.typ, q.name) | ||
} | ||
|
||
var _ (UniqueQueue) = &ByteFIFOUniqueQueue{} | ||
|
||
// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo | ||
type ByteFIFOUniqueQueue struct { | ||
ByteFIFOQueue | ||
} | ||
|
||
// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue | ||
func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) { | ||
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
config := configInterface.(ByteFIFOQueueConfiguration) | ||
|
||
return &ByteFIFOUniqueQueue{ | ||
ByteFIFOQueue: ByteFIFOQueue{ | ||
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), | ||
byteFIFO: byteFIFO, | ||
typ: typ, | ||
closed: make(chan struct{}), | ||
terminated: make(chan struct{}), | ||
exemplar: exemplar, | ||
workers: config.Workers, | ||
name: config.Name, | ||
}, | ||
}, nil | ||
} | ||
|
||
// Has checks if the provided data is in the queue | ||
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { | ||
if !assignableTo(data, q.exemplar) { | ||
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) | ||
} | ||
bs, err := json.Marshal(data) | ||
if err != nil { | ||
return false, err | ||
} | ||
return q.byteFIFO.(UniqueByteFIFO).Has(bs) | ||
} |
Oops, something went wrong.