From 8981f6d0fc375c75ff5a2c1b6eb9e33c8dbaf722 Mon Sep 17 00:00:00 2001
From: Giteabot <teabot@gitea.io>
Date: Wed, 28 Jun 2023 19:39:23 -0400
Subject: [PATCH] Fix content holes in Actions task logs file (#25560) (#25566)

Backport #25560 by @wolfogre

Fix #25451.

Bugfixes:
- When stopping the zombie or endless tasks, set `LogInStorage` to true
after transferring the file to storage. It was missing, it could write
to a nonexistent file in DBFS because `LogInStorage` was false.
- Always update `ActionTask.Updated` when there's a new state reported
by the runner, even if there's no change. This is to avoid the task
being judged as a zombie task.

Enhancement:
- Support `Stat()` for DBFS file.
- `WriteLogs` refuses to write if it could result in content holes.

Co-authored-by: Jason Song <i@wolfogre.com>
---
 models/actions/task.go          |  9 +++++++++
 models/dbfs/dbfile.go           | 18 ++++++++++++++++++
 models/dbfs/dbfs.go             | 29 +++++++++++++++++++++++++++++
 models/dbfs/dbfs_test.go        | 13 +++++++++++++
 modules/actions/log.go          | 18 +++++++++++++++++-
 services/actions/clear_tasks.go | 18 +++++++++++++-----
 6 files changed, 99 insertions(+), 6 deletions(-)

diff --git a/models/actions/task.go b/models/actions/task.go
index 66cd2bbea1..fc63114d28 100644
--- a/models/actions/task.go
+++ b/models/actions/task.go
@@ -346,6 +346,9 @@ func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error {
 	return err
 }
 
+// UpdateTaskByState updates the task by the state.
+// It will always update the task if the state is not final, even there is no change.
+// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
 func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionTask, error) {
 	stepStates := map[int64]*runnerv1.StepState{}
 	for _, v := range state.Steps {
@@ -386,6 +389,12 @@ func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionT
 		}, nil); err != nil {
 			return nil, err
 		}
+	} else {
+		// Force update ActionTask.Updated to avoid the task being judged as a zombie task
+		task.Updated = timeutil.TimeStampNow()
+		if err := UpdateTask(ctx, task, "updated"); err != nil {
+			return nil, err
+		}
 	}
 
 	if err := task.LoadAttributes(ctx); err != nil {
diff --git a/models/dbfs/dbfile.go b/models/dbfs/dbfile.go
index bac1cb9eb6..3650ce057e 100644
--- a/models/dbfs/dbfile.go
+++ b/models/dbfs/dbfile.go
@@ -7,6 +7,7 @@ import (
 	"context"
 	"errors"
 	"io"
+	"io/fs"
 	"os"
 	"path/filepath"
 	"strconv"
@@ -21,6 +22,7 @@ var defaultFileBlockSize int64 = 32 * 1024
 type File interface {
 	io.ReadWriteCloser
 	io.Seeker
+	fs.File
 }
 
 type file struct {
@@ -193,10 +195,26 @@ func (f *file) Close() error {
 	return nil
 }
 
+func (f *file) Stat() (os.FileInfo, error) {
+	if f.metaID == 0 {
+		return nil, os.ErrInvalid
+	}
+
+	fileMeta, err := findFileMetaByID(f.ctx, f.metaID)
+	if err != nil {
+		return nil, err
+	}
+	return fileMeta, nil
+}
+
 func timeToFileTimestamp(t time.Time) int64 {
 	return t.UnixMicro()
 }
 
+func fileTimestampToTime(timestamp int64) time.Time {
+	return time.UnixMicro(timestamp)
+}
+
 func (f *file) loadMetaByPath() (*dbfsMeta, error) {
 	var fileMeta dbfsMeta
 	if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil {
diff --git a/models/dbfs/dbfs.go b/models/dbfs/dbfs.go
index 6b5b3beeb2..f68b4a2b70 100644
--- a/models/dbfs/dbfs.go
+++ b/models/dbfs/dbfs.go
@@ -5,7 +5,10 @@ package dbfs
 
 import (
 	"context"
+	"io/fs"
 	"os"
+	"path"
+	"time"
 
 	"code.gitea.io/gitea/models/db"
 )
@@ -100,3 +103,29 @@ func Remove(ctx context.Context, name string) error {
 	defer f.Close()
 	return f.delete()
 }
+
+var _ fs.FileInfo = (*dbfsMeta)(nil)
+
+func (m *dbfsMeta) Name() string {
+	return path.Base(m.FullPath)
+}
+
+func (m *dbfsMeta) Size() int64 {
+	return m.FileSize
+}
+
+func (m *dbfsMeta) Mode() fs.FileMode {
+	return os.ModePerm
+}
+
+func (m *dbfsMeta) ModTime() time.Time {
+	return fileTimestampToTime(m.ModifyTimestamp)
+}
+
+func (m *dbfsMeta) IsDir() bool {
+	return false
+}
+
+func (m *dbfsMeta) Sys() any {
+	return nil
+}
diff --git a/models/dbfs/dbfs_test.go b/models/dbfs/dbfs_test.go
index 300758c623..96cb1014c7 100644
--- a/models/dbfs/dbfs_test.go
+++ b/models/dbfs/dbfs_test.go
@@ -111,6 +111,19 @@ func TestDbfsBasic(t *testing.T) {
 
 	_, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY)
 	assert.Error(t, err)
+
+	// test stat
+	f, err = OpenFile(db.DefaultContext, "test/test.txt", os.O_RDWR|os.O_CREATE)
+	assert.NoError(t, err)
+	stat, err := f.Stat()
+	assert.NoError(t, err)
+	assert.EqualValues(t, "test.txt", stat.Name())
+	assert.EqualValues(t, 0, stat.Size())
+	_, err = f.Write([]byte("0123456789"))
+	assert.NoError(t, err)
+	stat, err = f.Stat()
+	assert.NoError(t, err)
+	assert.EqualValues(t, 10, stat.Size())
 }
 
 func TestDbfsReadWrite(t *testing.T) {
diff --git a/modules/actions/log.go b/modules/actions/log.go
index 3868101992..36bed931fa 100644
--- a/modules/actions/log.go
+++ b/modules/actions/log.go
@@ -29,12 +29,28 @@ const (
 )
 
 func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) {
+	flag := os.O_WRONLY
+	if offset == 0 {
+		// Create file only if offset is 0, or it could result in content holes if the file doesn't exist.
+		flag |= os.O_CREATE
+	}
 	name := DBFSPrefix + filename
-	f, err := dbfs.OpenFile(ctx, name, os.O_WRONLY|os.O_CREATE)
+	f, err := dbfs.OpenFile(ctx, name, flag)
 	if err != nil {
 		return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err)
 	}
 	defer f.Close()
+
+	stat, err := f.Stat()
+	if err != nil {
+		return nil, fmt.Errorf("dbfs Stat %q: %w", name, err)
+	}
+	if stat.Size() < offset {
+		// If the size is less than offset, refuse to write, or it could result in content holes.
+		// However, if the size is greater than offset, we can still write to overwrite the content.
+		return nil, fmt.Errorf("size of %q is less than offset", name)
+	}
+
 	if _, err := f.Seek(offset, io.SeekStart); err != nil {
 		return nil, fmt.Errorf("dbfs Seek %q: %w", name, err)
 	}
diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go
index 0616a5fc0d..d2893e4f23 100644
--- a/services/actions/clear_tasks.go
+++ b/services/actions/clear_tasks.go
@@ -56,12 +56,20 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
 			return nil
 		}); err != nil {
 			log.Warn("Cannot stop task %v: %v", task.ID, err)
-			// go on
-		} else if remove, err := actions.TransferLogs(ctx, task.LogFilename); err != nil {
-			log.Warn("Cannot transfer logs of task %v: %v", task.ID, err)
-		} else {
-			remove()
+			continue
 		}
+
+		remove, err := actions.TransferLogs(ctx, task.LogFilename)
+		if err != nil {
+			log.Warn("Cannot transfer logs of task %v: %v", task.ID, err)
+			continue
+		}
+		task.LogInStorage = true
+		if err := actions_model.UpdateTask(ctx, task, "log_in_storage"); err != nil {
+			log.Warn("Cannot update task %v: %v", task.ID, err)
+			continue
+		}
+		remove()
 	}
 
 	CreateCommitStatus(ctx, jobs...)