Add multi-session auth and changelog tracking

这个提交包含在:
cryptocommuniums-afk
2026-03-15 17:30:19 +08:00
父节点 c4ec397ed3
当前提交 a9ea94fb78
修改 27 个文件,包含 1280 行新增89 行删除

查看文件

@@ -44,6 +44,15 @@ const (
ArchiveFailed ArchiveStatus = "failed"
)
type PreviewStatus string
const (
PreviewIdle PreviewStatus = "idle"
PreviewProcessing PreviewStatus = "processing"
PreviewReady PreviewStatus = "ready"
PreviewFailed PreviewStatus = "failed"
)
type PlaybackInfo struct {
WebMURL string `json:"webmUrl,omitempty"`
MP4URL string `json:"mp4Url,omitempty"`
@@ -77,6 +86,7 @@ type Session struct {
Title string `json:"title"`
Status SessionStatus `json:"status"`
ArchiveStatus ArchiveStatus `json:"archiveStatus"`
PreviewStatus PreviewStatus `json:"previewStatus"`
Format string `json:"format"`
MimeType string `json:"mimeType"`
QualityPreset string `json:"qualityPreset"`
@@ -85,11 +95,13 @@ type Session struct {
ReconnectCount int `json:"reconnectCount"`
UploadedSegments int `json:"uploadedSegments"`
UploadedBytes int64 `json:"uploadedBytes"`
PreviewSegments int `json:"previewSegments"`
DurationMS int64 `json:"durationMs"`
LastError string `json:"lastError,omitempty"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
FinalizedAt string `json:"finalizedAt,omitempty"`
PreviewUpdatedAt string `json:"previewUpdatedAt,omitempty"`
StreamConnected bool `json:"streamConnected"`
LastStreamAt string `json:"lastStreamAt,omitempty"`
Playback PlaybackInfo `json:"playback"`
@@ -159,7 +171,7 @@ func newSessionStore(rootDir string) (*sessionStore, error) {
if err := os.MkdirAll(store.public, 0o755); err != nil {
return nil, err
}
if err := store.load(); err != nil {
if err := store.refreshFromDisk(); err != nil {
return nil, err
}
for _, session := range store.sessions {
@@ -168,12 +180,13 @@ func newSessionStore(rootDir string) (*sessionStore, error) {
return store, nil
}
func (s *sessionStore) load() error {
func (s *sessionStore) loadSessionsFromDisk() (map[string]*Session, error) {
pattern := filepath.Join(s.rootDir, "sessions", "*", "session.json")
files, err := filepath.Glob(pattern)
if err != nil {
return err
return nil, err
}
sessions := make(map[string]*Session, len(files))
for _, file := range files {
body, readErr := os.ReadFile(file)
if readErr != nil {
@@ -183,8 +196,19 @@ func (s *sessionStore) load() error {
if unmarshalErr := json.Unmarshal(body, &session); unmarshalErr != nil {
continue
}
s.sessions[session.ID] = &session
sessions[session.ID] = &session
}
return sessions, nil
}
func (s *sessionStore) refreshFromDisk() error {
sessions, err := s.loadSessionsFromDisk()
if err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
s.sessions = sessions
return nil
}
@@ -228,6 +252,7 @@ func (s *sessionStore) createSession(input CreateSessionRequest) (*Session, erro
Title: strings.TrimSpace(input.Title),
Status: StatusCreated,
ArchiveStatus: ArchiveIdle,
PreviewStatus: PreviewIdle,
Format: defaultString(input.Format, "webm"),
MimeType: defaultString(input.MimeType, "video/webm"),
QualityPreset: defaultString(input.QualityPreset, "balanced"),
@@ -295,13 +320,20 @@ func (s *sessionStore) updateSession(id string, update func(*Session) error) (*S
return cloneSession(session), nil
}
func (s *sessionStore) listFinalizingSessions() []*Session {
func (s *sessionStore) listProcessableSessions() []*Session {
s.mu.RLock()
defer s.mu.RUnlock()
items := make([]*Session, 0, len(s.sessions))
for _, session := range s.sessions {
if len(session.Segments) == 0 {
continue
}
if session.ArchiveStatus == ArchiveQueued || session.ArchiveStatus == ArchiveProcessing {
items = append(items, cloneSession(session))
continue
}
if session.PreviewSegments < len(session.Segments) && session.PreviewStatus != PreviewProcessing {
items = append(items, cloneSession(session))
}
}
return items
@@ -315,6 +347,10 @@ func newMediaServer(store *sessionStore) *mediaServer {
return &mediaServer{store: store}
}
func (m *mediaServer) refreshSessionsForRead() error {
return m.store.refreshFromDisk()
}
func (m *mediaServer) routes() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/media/health", m.handleHealth)
@@ -359,6 +395,10 @@ func (m *mediaServer) handleSession(w http.ResponseWriter, r *http.Request) {
}
sessionID := parts[0]
if len(parts) == 1 && r.Method == http.MethodGet {
if err := m.refreshSessionsForRead(); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
session, err := m.store.getSession(sessionID)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
@@ -402,6 +442,10 @@ func (m *mediaServer) handleSession(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
return
}
if err := m.refreshSessionsForRead(); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
session, err := m.store.getSession(sessionID)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
@@ -632,7 +676,11 @@ func runWorkerLoop(ctx context.Context, store *sessionStore, interval time.Durat
case <-ctx.Done():
return
case <-ticker.C:
sessions := store.listFinalizingSessions()
if err := store.refreshFromDisk(); err != nil {
log.Printf("[worker] failed to refresh session store: %v", err)
continue
}
sessions := store.listProcessableSessions()
for _, session := range sessions {
if err := processSession(store, session.ID); err != nil {
log.Printf("[worker] failed to process session %s: %v", session.ID, err)
@@ -643,6 +691,42 @@ func runWorkerLoop(ctx context.Context, store *sessionStore, interval time.Durat
}
func processSession(store *sessionStore, sessionID string) error {
current, err := store.getSession(sessionID)
if err != nil {
return err
}
if current.ArchiveStatus == ArchiveQueued || current.ArchiveStatus == ArchiveProcessing {
return processFinalArchive(store, sessionID)
}
if current.PreviewSegments < len(current.Segments) {
return processRollingPreview(store, sessionID)
}
return nil
}
func processRollingPreview(store *sessionStore, sessionID string) error {
session, err := store.updateSession(sessionID, func(session *Session) error {
if session.PreviewStatus == PreviewProcessing {
return errors.New("preview already processing")
}
session.PreviewStatus = PreviewProcessing
session.LastError = ""
return nil
})
if err != nil {
if strings.Contains(err.Error(), "preview already processing") {
return nil
}
return err
}
return buildPlaybackArtifacts(store, session, false)
}
func processFinalArchive(store *sessionStore, sessionID string) error {
session, err := store.updateSession(sessionID, func(session *Session) error {
if session.ArchiveStatus == ArchiveProcessing {
return errors.New("already processing")
@@ -668,12 +752,22 @@ func processSession(store *sessionStore, sessionID string) error {
return errors.New("no uploaded segments found")
}
return buildPlaybackArtifacts(store, session, true)
}
func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool) error {
sessionID := session.ID
publicDir := store.publicDir(sessionID)
if err := os.MkdirAll(publicDir, 0o755); err != nil {
return err
}
outputWebM := filepath.Join(publicDir, "recording.webm")
outputMP4 := filepath.Join(publicDir, "recording.mp4")
baseName := "preview"
if finalize {
baseName = "recording"
}
outputWebM := filepath.Join(publicDir, baseName+".webm")
outputMP4 := filepath.Join(publicDir, baseName+".mp4")
listFile := filepath.Join(store.sessionDir(sessionID), "concat.txt")
inputs := make([]string, 0, len(session.Segments))
@@ -684,23 +778,23 @@ func processSession(store *sessionStore, sessionID string) error {
inputs = append(inputs, filepath.Join(store.segmentsDir(sessionID), segment.Filename))
}
if err := writeConcatList(listFile, inputs); err != nil {
return markArchiveError(store, sessionID, err)
return markProcessingError(store, sessionID, err, finalize)
}
if len(inputs) == 1 {
body, copyErr := os.ReadFile(inputs[0])
if copyErr != nil {
return markArchiveError(store, sessionID, copyErr)
return markProcessingError(store, sessionID, copyErr, finalize)
}
if writeErr := os.WriteFile(outputWebM, body, 0o644); writeErr != nil {
return markArchiveError(store, sessionID, writeErr)
return markProcessingError(store, sessionID, writeErr, finalize)
}
} else {
copyErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c", "copy", outputWebM)
if copyErr != nil {
reencodeErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c:v", "libvpx-vp9", "-b:v", "1800k", "-c:a", "libopus", outputWebM)
if reencodeErr != nil {
return markArchiveError(store, sessionID, fmt.Errorf("concat failed: %w / %v", copyErr, reencodeErr))
return markProcessingError(store, sessionID, fmt.Errorf("concat failed: %w / %v", copyErr, reencodeErr), finalize)
}
}
}
@@ -712,7 +806,7 @@ func processSession(store *sessionStore, sessionID string) error {
webmInfo, webmStatErr := os.Stat(outputWebM)
if webmStatErr != nil {
return markArchiveError(store, sessionID, webmStatErr)
return markProcessingError(store, sessionID, webmStatErr, finalize)
}
var mp4Size int64
var mp4URL string
@@ -720,27 +814,41 @@ func processSession(store *sessionStore, sessionID string) error {
mp4Size = info.Size()
mp4URL = fmt.Sprintf("/media/assets/sessions/%s/recording.mp4", sessionID)
}
_, err = store.updateSession(sessionID, func(session *Session) error {
session.ArchiveStatus = ArchiveCompleted
session.Status = StatusArchived
session.Playback = PlaybackInfo{
WebMURL: fmt.Sprintf("/media/assets/sessions/%s/recording.webm", sessionID),
MP4URL: mp4URL,
WebMSize: webmInfo.Size(),
MP4Size: mp4Size,
Ready: true,
PreviewURL: fmt.Sprintf("/media/assets/sessions/%s/recording.webm", sessionID),
}
previewURL := fmt.Sprintf("/media/assets/sessions/%s/%s.webm", sessionID, baseName)
if mp4URL != "" {
previewURL = mp4URL
}
_, updateErr := store.updateSession(sessionID, func(session *Session) error {
session.Playback.PreviewURL = previewURL
session.PreviewSegments = len(inputs)
session.PreviewUpdatedAt = time.Now().UTC().Format(time.RFC3339)
session.PreviewStatus = PreviewReady
session.LastError = ""
if finalize {
session.ArchiveStatus = ArchiveCompleted
session.Status = StatusArchived
session.Playback = PlaybackInfo{
WebMURL: fmt.Sprintf("/media/assets/sessions/%s/recording.webm", sessionID),
MP4URL: mp4URL,
WebMSize: webmInfo.Size(),
MP4Size: mp4Size,
Ready: true,
PreviewURL: previewURL,
}
}
return nil
})
return err
return updateErr
}
func markArchiveError(store *sessionStore, sessionID string, err error) error {
func markProcessingError(store *sessionStore, sessionID string, err error, finalize bool) error {
_, _ = store.updateSession(sessionID, func(session *Session) error {
session.ArchiveStatus = ArchiveFailed
session.Status = StatusFailed
session.PreviewStatus = PreviewFailed
if finalize {
session.ArchiveStatus = ArchiveFailed
session.Status = StatusFailed
}
session.LastError = err.Error()
return nil
})

查看文件

@@ -1,6 +1,7 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
@@ -128,3 +129,130 @@ func TestProcessSessionArchivesPlayback(t *testing.T) {
t.Fatalf("expected webm playback url, got %#v", archived.Playback)
}
}
func TestRefreshFromDiskPicksUpSessionsCreatedAfterWorkerStartup(t *testing.T) {
tempDir := t.TempDir()
workerStore, err := newSessionStore(tempDir)
if err != nil {
t.Fatalf("newSessionStore(worker): %v", err)
}
if got := len(workerStore.listProcessableSessions()); got != 0 {
t.Fatalf("expected no processable sessions at startup, got %d", got)
}
appStore, err := newSessionStore(tempDir)
if err != nil {
t.Fatalf("newSessionStore(app): %v", err)
}
session, err := appStore.createSession(CreateSessionRequest{UserID: "1", Title: "Queued Session"})
if err != nil {
t.Fatalf("createSession: %v", err)
}
if err := os.WriteFile(filepath.Join(appStore.segmentsDir(session.ID), "000000.webm"), []byte("segment"), 0o644); err != nil {
t.Fatalf("write segment: %v", err)
}
if _, err := appStore.updateSession(session.ID, func(current *Session) error {
current.Segments = append(current.Segments, SegmentMeta{
Sequence: 0,
Filename: "000000.webm",
DurationMS: 60000,
SizeBytes: 7,
ContentType: "video/webm",
})
current.ArchiveStatus = ArchiveQueued
current.Status = StatusFinalizing
return nil
}); err != nil {
t.Fatalf("updateSession: %v", err)
}
if err := workerStore.refreshFromDisk(); err != nil {
t.Fatalf("refreshFromDisk: %v", err)
}
processable := workerStore.listProcessableSessions()
if len(processable) != 1 {
t.Fatalf("expected worker to pick up queued session after refresh, got %d", len(processable))
}
if processable[0].ID != session.ID {
t.Fatalf("expected session %s, got %s", session.ID, processable[0].ID)
}
}
func TestHandleSessionGetRefreshesSessionStateFromDisk(t *testing.T) {
tempDir := t.TempDir()
serverStore, err := newSessionStore(tempDir)
if err != nil {
t.Fatalf("newSessionStore(server): %v", err)
}
server := newMediaServer(serverStore)
writerStore, err := newSessionStore(tempDir)
if err != nil {
t.Fatalf("newSessionStore(writer): %v", err)
}
session, err := writerStore.createSession(CreateSessionRequest{UserID: "1", Title: "Fresh Session"})
if err != nil {
t.Fatalf("createSession: %v", err)
}
if _, err := writerStore.updateSession(session.ID, func(current *Session) error {
current.Status = StatusFinalizing
current.ArchiveStatus = ArchiveQueued
return nil
}); err != nil {
t.Fatalf("queue session: %v", err)
}
getReq := httptest.NewRequest(http.MethodGet, "/media/sessions/"+session.ID, nil)
getRes := httptest.NewRecorder()
server.routes().ServeHTTP(getRes, getReq)
if getRes.Code != http.StatusOK {
t.Fatalf("expected get session 200, got %d", getRes.Code)
}
var queuedResponse struct {
Session Session `json:"session"`
}
if err := json.NewDecoder(getRes.Body).Decode(&queuedResponse); err != nil {
t.Fatalf("decode queued response: %v", err)
}
if queuedResponse.Session.ArchiveStatus != ArchiveQueued {
t.Fatalf("expected queued archive status, got %s", queuedResponse.Session.ArchiveStatus)
}
if _, err := writerStore.updateSession(session.ID, func(current *Session) error {
current.Status = StatusArchived
current.ArchiveStatus = ArchiveCompleted
current.Playback = PlaybackInfo{
WebMURL: "/media/assets/sessions/" + session.ID + "/recording.webm",
Ready: true,
}
return nil
}); err != nil {
t.Fatalf("complete session: %v", err)
}
refreshReq := httptest.NewRequest(http.MethodGet, "/media/sessions/"+session.ID, nil)
refreshRes := httptest.NewRecorder()
server.routes().ServeHTTP(refreshRes, refreshReq)
if refreshRes.Code != http.StatusOK {
t.Fatalf("expected refreshed get session 200, got %d", refreshRes.Code)
}
var completedResponse struct {
Session Session `json:"session"`
}
if err := json.NewDecoder(refreshRes.Body).Decode(&completedResponse); err != nil {
t.Fatalf("decode completed response: %v", err)
}
if completedResponse.Session.ArchiveStatus != ArchiveCompleted {
t.Fatalf("expected completed archive status, got %s", completedResponse.Session.ArchiveStatus)
}
if !completedResponse.Session.Playback.Ready {
t.Fatalf("expected playback ready after refresh")
}
}