Prevent deadlock in TestPersistableChannelQueue (#17717)

* Prevent deadlock in TestPersistableChannelQueue

There is a potential deadlock in TestPersistableChannelQueue due to attempting to
shutdown the test queue before it is ready.

Signed-off-by: Andrew Thornton <art27@cantab.net>

* prevent npe

Signed-off-by: Andrew Thornton <art27@cantab.net>
This commit is contained in:
zeripath 2021-11-19 01:13:25 +00:00 committed by GitHub
parent 72b0882a45
commit a85e75b2b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -18,6 +18,9 @@ func TestPersistableChannelQueue(t *testing.T) {
handleChan := make(chan *testData) handleChan := make(chan *testData)
handle := func(data ...Data) { handle := func(data ...Data) {
for _, datum := range data { for _, datum := range data {
if datum == nil {
continue
}
testDatum := datum.(*testData) testDatum := datum.(*testData)
handleChan <- testDatum handleChan <- testDatum
} }
@ -42,13 +45,26 @@ func TestPersistableChannelQueue(t *testing.T) {
}, &testData{}) }, &testData{})
assert.NoError(t, err) assert.NoError(t, err)
readyForShutdown := make(chan struct{})
readyForTerminate := make(chan struct{})
go queue.Run(func(shutdown func()) { go queue.Run(func(shutdown func()) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
select {
case <-readyForShutdown:
default:
close(readyForShutdown)
}
queueShutdown = append(queueShutdown, shutdown) queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) { }, func(terminate func()) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
select {
case <-readyForTerminate:
default:
close(readyForTerminate)
}
queueTerminate = append(queueTerminate, terminate) queueTerminate = append(queueTerminate, terminate)
}) })
@ -74,6 +90,7 @@ func TestPersistableChannelQueue(t *testing.T) {
err = queue.Push(test1) err = queue.Push(test1)
assert.Error(t, err) assert.Error(t, err)
<-readyForShutdown
// Now shutdown the queue // Now shutdown the queue
lock.Lock() lock.Lock()
callbacks := make([]func(), len(queueShutdown)) callbacks := make([]func(), len(queueShutdown))
@ -97,6 +114,7 @@ func TestPersistableChannelQueue(t *testing.T) {
} }
// terminate the queue // terminate the queue
<-readyForTerminate
lock.Lock() lock.Lock()
callbacks = make([]func(), len(queueTerminate)) callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate) copy(callbacks, queueTerminate)
@ -123,13 +141,26 @@ func TestPersistableChannelQueue(t *testing.T) {
}, &testData{}) }, &testData{})
assert.NoError(t, err) assert.NoError(t, err)
readyForShutdown = make(chan struct{})
readyForTerminate = make(chan struct{})
go queue.Run(func(shutdown func()) { go queue.Run(func(shutdown func()) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
select {
case <-readyForShutdown:
default:
close(readyForShutdown)
}
queueShutdown = append(queueShutdown, shutdown) queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) { }, func(terminate func()) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
select {
case <-readyForTerminate:
default:
close(readyForTerminate)
}
queueTerminate = append(queueTerminate, terminate) queueTerminate = append(queueTerminate, terminate)
}) })
@ -141,6 +172,7 @@ func TestPersistableChannelQueue(t *testing.T) {
assert.Equal(t, test2.TestString, result4.TestString) assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt) assert.Equal(t, test2.TestInt, result4.TestInt)
<-readyForShutdown
lock.Lock() lock.Lock()
callbacks = make([]func(), len(queueShutdown)) callbacks = make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown) copy(callbacks, queueShutdown)
@ -148,6 +180,7 @@ func TestPersistableChannelQueue(t *testing.T) {
for _, callback := range callbacks { for _, callback := range callbacks {
callback() callback()
} }
<-readyForTerminate
lock.Lock() lock.Lock()
callbacks = make([]func(), len(queueTerminate)) callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate) copy(callbacks, queueTerminate)