Improve live camera relay buffering

这个提交包含在:
cryptocommuniums-afk
2026-03-17 09:51:47 +08:00
父节点 63dbfd2787
当前提交 f3f7e1982c
修改 8 个文件,包含 2536 行新增1205 行删除

查看文件

@@ -53,6 +53,18 @@ const (
PreviewFailed PreviewStatus = "failed"
)
type SessionPurpose string
const (
PurposeRecording SessionPurpose = "recording"
PurposeRelay SessionPurpose = "relay"
)
const (
relayPreviewWindow = 60 * time.Second
relayCacheTTL = 30 * time.Minute
)
type PlaybackInfo struct {
WebMURL string `json:"webmUrl,omitempty"`
MP4URL string `json:"mp4Url,omitempty"`
@@ -81,35 +93,36 @@ type Marker struct {
}
type Session struct {
ID string `json:"id"`
UserID string `json:"userId"`
Title string `json:"title"`
Status SessionStatus `json:"status"`
ArchiveStatus ArchiveStatus `json:"archiveStatus"`
PreviewStatus PreviewStatus `json:"previewStatus"`
Format string `json:"format"`
MimeType string `json:"mimeType"`
QualityPreset string `json:"qualityPreset"`
FacingMode string `json:"facingMode"`
DeviceKind string `json:"deviceKind"`
ReconnectCount int `json:"reconnectCount"`
UploadedSegments int `json:"uploadedSegments"`
UploadedBytes int64 `json:"uploadedBytes"`
PreviewSegments int `json:"previewSegments"`
DurationMS int64 `json:"durationMs"`
LastError string `json:"lastError,omitempty"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
FinalizedAt string `json:"finalizedAt,omitempty"`
PreviewUpdatedAt string `json:"previewUpdatedAt,omitempty"`
StreamConnected bool `json:"streamConnected"`
LastStreamAt string `json:"lastStreamAt,omitempty"`
ViewerCount int `json:"viewerCount"`
LiveFrameURL string `json:"liveFrameUrl,omitempty"`
LiveFrameUpdated string `json:"liveFrameUpdatedAt,omitempty"`
Playback PlaybackInfo `json:"playback"`
Segments []SegmentMeta `json:"segments"`
Markers []Marker `json:"markers"`
ID string `json:"id"`
UserID string `json:"userId"`
Title string `json:"title"`
Purpose SessionPurpose `json:"purpose"`
Status SessionStatus `json:"status"`
ArchiveStatus ArchiveStatus `json:"archiveStatus"`
PreviewStatus PreviewStatus `json:"previewStatus"`
Format string `json:"format"`
MimeType string `json:"mimeType"`
QualityPreset string `json:"qualityPreset"`
FacingMode string `json:"facingMode"`
DeviceKind string `json:"deviceKind"`
ReconnectCount int `json:"reconnectCount"`
UploadedSegments int `json:"uploadedSegments"`
UploadedBytes int64 `json:"uploadedBytes"`
PreviewSegments int `json:"previewSegments"`
DurationMS int64 `json:"durationMs"`
LastError string `json:"lastError,omitempty"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
FinalizedAt string `json:"finalizedAt,omitempty"`
PreviewUpdatedAt string `json:"previewUpdatedAt,omitempty"`
StreamConnected bool `json:"streamConnected"`
LastStreamAt string `json:"lastStreamAt,omitempty"`
ViewerCount int `json:"viewerCount"`
LiveFrameURL string `json:"liveFrameUrl,omitempty"`
LiveFrameUpdated string `json:"liveFrameUpdatedAt,omitempty"`
Playback PlaybackInfo `json:"playback"`
Segments []SegmentMeta `json:"segments"`
Markers []Marker `json:"markers"`
}
func (s *Session) recomputeAggregates() {
@@ -134,6 +147,7 @@ type CreateSessionRequest struct {
QualityPreset string `json:"qualityPreset"`
FacingMode string `json:"facingMode"`
DeviceKind string `json:"deviceKind"`
Purpose string `json:"purpose"`
}
type SignalRequest struct {
@@ -157,10 +171,10 @@ type sessionStore struct {
rootDir string
public string
mu sync.RWMutex
sessions map[string]*Session
peers map[string]*webrtc.PeerConnection
viewerPeers map[string]map[string]*webrtc.PeerConnection
videoTracks map[string]*webrtc.TrackLocalStaticRTP
sessions map[string]*Session
peers map[string]*webrtc.PeerConnection
viewerPeers map[string]map[string]*webrtc.PeerConnection
videoTracks map[string]*webrtc.TrackLocalStaticRTP
}
func newSessionStore(rootDir string) (*sessionStore, error) {
@@ -213,6 +227,12 @@ func (s *sessionStore) refreshFromDisk() error {
if err != nil {
return err
}
for _, session := range sessions {
if session.Purpose == "" {
session.Purpose = PurposeRecording
}
session.recomputeAggregates()
}
s.mu.Lock()
defer s.mu.Unlock()
s.sessions = sessions
@@ -265,6 +285,7 @@ func (s *sessionStore) createSession(input CreateSessionRequest) (*Session, erro
ID: randomID(),
UserID: strings.TrimSpace(input.UserID),
Title: strings.TrimSpace(input.Title),
Purpose: SessionPurpose(defaultString(input.Purpose, string(PurposeRecording))),
Status: StatusCreated,
ArchiveStatus: ArchiveIdle,
PreviewStatus: PreviewIdle,
@@ -290,6 +311,106 @@ func (s *sessionStore) createSession(input CreateSessionRequest) (*Session, erro
return cloneSession(session), nil
}
func parseSessionTime(values ...string) time.Time {
for _, value := range values {
if strings.TrimSpace(value) == "" {
continue
}
if parsed, err := time.Parse(time.RFC3339, value); err == nil {
return parsed
}
}
return time.Time{}
}
func sortSegmentsBySequence(segments []SegmentMeta) {
sort.Slice(segments, func(i, j int) bool {
return segments[i].Sequence < segments[j].Sequence
})
}
func maxInt64(value int64, minimum int64) int64 {
if value < minimum {
return minimum
}
return value
}
func trimSegmentsToDuration(segments []SegmentMeta, maxDuration time.Duration) (kept []SegmentMeta, removed []SegmentMeta) {
if len(segments) == 0 {
return []SegmentMeta{}, []SegmentMeta{}
}
limitMS := maxDuration.Milliseconds()
total := int64(0)
startIndex := len(segments) - 1
for index := len(segments) - 1; index >= 0; index-- {
total += maxInt64(segments[index].DurationMS, 1)
startIndex = index
if total >= limitMS {
break
}
}
kept = append([]SegmentMeta(nil), segments[startIndex:]...)
removed = append([]SegmentMeta(nil), segments[:startIndex]...)
return kept, removed
}
func sessionNeedsPreview(session *Session) bool {
if len(session.Segments) == 0 {
return false
}
if session.PreviewStatus == PreviewProcessing {
return false
}
if session.PreviewStatus != PreviewReady || session.PreviewSegments < len(session.Segments) {
return true
}
previewUpdatedAt := parseSessionTime(session.PreviewUpdatedAt)
if previewUpdatedAt.IsZero() {
return true
}
for _, segment := range session.Segments {
uploadedAt := parseSessionTime(segment.UploadedAt)
if !uploadedAt.IsZero() && uploadedAt.After(previewUpdatedAt) {
return true
}
}
return false
}
func (s *sessionStore) pruneExpiredRelaySessions(maxAge time.Duration, now time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()
for id, session := range s.sessions {
if session.Purpose != PurposeRelay {
continue
}
lastActivity := parseSessionTime(session.UpdatedAt, session.LastStreamAt, session.LiveFrameUpdated, session.CreatedAt)
if lastActivity.IsZero() || now.Sub(lastActivity) < maxAge {
continue
}
delete(s.sessions, id)
delete(s.peers, id)
delete(s.viewerPeers, id)
delete(s.videoTracks, id)
if err := os.RemoveAll(s.sessionDir(id)); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
if err := os.RemoveAll(s.publicDir(id)); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
}
return nil
}
func (s *sessionStore) getSession(id string) (*Session, error) {
s.mu.RLock()
defer s.mu.RUnlock()
@@ -415,7 +536,7 @@ func (s *sessionStore) listProcessableSessions() []*Session {
items = append(items, cloneSession(session))
continue
}
if session.PreviewSegments < len(session.Segments) && session.PreviewStatus != PreviewProcessing {
if sessionNeedsPreview(session) {
items = append(items, cloneSession(session))
}
}
@@ -822,6 +943,7 @@ func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWrite
return
}
removedSegments := []SegmentMeta{}
session, err := m.store.updateSession(sessionID, func(session *Session) error {
meta := SegmentMeta{
Sequence: sequence,
@@ -842,9 +964,12 @@ func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWrite
if !found {
session.Segments = append(session.Segments, meta)
}
sort.Slice(session.Segments, func(i, j int) bool {
return session.Segments[i].Sequence < session.Segments[j].Sequence
})
sortSegmentsBySequence(session.Segments)
if session.Purpose == PurposeRelay {
var kept []SegmentMeta
kept, removedSegments = trimSegmentsToDuration(session.Segments, relayPreviewWindow)
session.Segments = kept
}
session.Status = StatusRecording
session.LastError = ""
return nil
@@ -853,6 +978,12 @@ func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWrite
writeError(w, http.StatusNotFound, err.Error())
return
}
for _, segment := range removedSegments {
segmentPath := filepath.Join(m.store.segmentsDir(sessionID), segment.Filename)
if removeErr := os.Remove(segmentPath); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) {
log.Printf("failed to remove pruned relay segment %s: %v", segmentPath, removeErr)
}
}
writeJSON(w, http.StatusAccepted, map[string]any{"session": session})
}
@@ -919,6 +1050,9 @@ func runWorkerLoop(ctx context.Context, store *sessionStore, interval time.Durat
log.Printf("[worker] failed to refresh session store: %v", err)
continue
}
if err := store.pruneExpiredRelaySessions(relayCacheTTL, time.Now().UTC()); err != nil {
log.Printf("[worker] failed to prune relay cache: %v", err)
}
sessions := store.listProcessableSessions()
for _, session := range sessions {
if err := processSession(store, session.ID); err != nil {
@@ -939,7 +1073,7 @@ func processSession(store *sessionStore, sessionID string) error {
return processFinalArchive(store, sessionID)
}
if current.PreviewSegments < len(current.Segments) {
if sessionNeedsPreview(current) {
return processRollingPreview(store, sessionID)
}
@@ -1010,9 +1144,7 @@ func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool
listFile := filepath.Join(store.sessionDir(sessionID), "concat.txt")
inputs := make([]string, 0, len(session.Segments))
sort.Slice(session.Segments, func(i, j int) bool {
return session.Segments[i].Sequence < session.Segments[j].Sequence
})
sortSegmentsBySequence(session.Segments)
for _, segment := range session.Segments {
inputs = append(inputs, filepath.Join(store.segmentsDir(sessionID), segment.Filename))
}
@@ -1038,9 +1170,11 @@ func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool
}
}
mp4Err := runFFmpeg("-y", "-i", outputWebM, "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", "-c:a", "aac", "-movflags", "+faststart", outputMP4)
if mp4Err != nil {
log.Printf("[worker] mp4 archive generation failed for %s: %v", sessionID, mp4Err)
if finalize {
mp4Err := runFFmpeg("-y", "-i", outputWebM, "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", "-c:a", "aac", "-movflags", "+faststart", outputMP4)
if mp4Err != nil {
log.Printf("[worker] mp4 archive generation failed for %s: %v", sessionID, mp4Err)
}
}
webmInfo, webmStatErr := os.Stat(outputWebM)
@@ -1049,13 +1183,15 @@ func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool
}
var mp4Size int64
var mp4URL string
if info, statErr := os.Stat(outputMP4); statErr == nil {
mp4Size = info.Size()
mp4URL = fmt.Sprintf("/media/assets/sessions/%s/recording.mp4", sessionID)
}
previewURL := fmt.Sprintf("/media/assets/sessions/%s/%s.webm", sessionID, baseName)
if mp4URL != "" {
previewURL = mp4URL
if finalize {
if info, statErr := os.Stat(outputMP4); statErr == nil {
mp4Size = info.Size()
mp4URL = fmt.Sprintf("/media/assets/sessions/%s/recording.mp4", sessionID)
}
if mp4URL != "" {
previewURL = mp4URL
}
}
_, updateErr := store.updateSession(sessionID, func(session *Session) error {

查看文件

@@ -2,12 +2,15 @@ package main
import (
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
)
func TestMediaHealthAndSessionLifecycle(t *testing.T) {
@@ -320,3 +323,130 @@ func TestLiveFrameUploadPublishesRelayFrame(t *testing.T) {
t.Fatalf("unexpected live frame content: %q", string(body))
}
}
func TestRelaySegmentUploadKeepsOnlyLatestMinute(t *testing.T) {
store, err := newSessionStore(t.TempDir())
if err != nil {
t.Fatalf("newSessionStore: %v", err)
}
server := newMediaServer(store)
session, err := store.createSession(CreateSessionRequest{UserID: "1", Title: "Relay Buffer", Purpose: "relay"})
if err != nil {
t.Fatalf("createSession: %v", err)
}
for sequence := 0; sequence < 3; sequence += 1 {
req := httptest.NewRequest(http.MethodPost, "/media/sessions/"+session.ID+"/segments?sequence="+strconv.Itoa(sequence)+"&durationMs=30000", strings.NewReader("segment"))
req.Header.Set("Content-Type", "video/webm")
res := httptest.NewRecorder()
server.routes().ServeHTTP(res, req)
if res.Code != http.StatusAccepted {
t.Fatalf("expected segment upload 202 for sequence %d, got %d", sequence, res.Code)
}
}
current, err := store.getSession(session.ID)
if err != nil {
t.Fatalf("getSession: %v", err)
}
if current.Purpose != PurposeRelay {
t.Fatalf("expected relay purpose, got %s", current.Purpose)
}
if len(current.Segments) != 2 {
t.Fatalf("expected latest 2 relay segments to remain, got %d", len(current.Segments))
}
if current.Segments[0].Sequence != 1 || current.Segments[1].Sequence != 2 {
t.Fatalf("expected relay segments 1 and 2 to remain, got %#v", current.Segments)
}
if _, err := os.Stat(filepath.Join(store.segmentsDir(session.ID), "000000.webm")); !errors.Is(err, os.ErrNotExist) {
t.Fatalf("expected earliest relay segment to be pruned from disk, got %v", err)
}
}
func TestProcessRelayPreviewPublishesBufferedWebM(t *testing.T) {
tempDir := t.TempDir()
store, err := newSessionStore(tempDir)
if err != nil {
t.Fatalf("newSessionStore: %v", err)
}
session, err := store.createSession(CreateSessionRequest{UserID: "1", Title: "Relay Preview", Purpose: "relay"})
if err != nil {
t.Fatalf("createSession: %v", err)
}
if err := os.WriteFile(filepath.Join(store.segmentsDir(session.ID), "000000.webm"), []byte("segment"), 0o644); err != nil {
t.Fatalf("write segment: %v", err)
}
if _, err := store.updateSession(session.ID, func(current *Session) error {
current.Segments = append(current.Segments, SegmentMeta{
Sequence: 0,
Filename: "000000.webm",
DurationMS: 60000,
SizeBytes: 7,
ContentType: "video/webm",
})
current.Purpose = PurposeRelay
return nil
}); err != nil {
t.Fatalf("updateSession: %v", err)
}
if err := processRollingPreview(store, session.ID); err != nil {
t.Fatalf("processRollingPreview: %v", err)
}
current, err := store.getSession(session.ID)
if err != nil {
t.Fatalf("getSession: %v", err)
}
if current.Playback.PreviewURL == "" || !strings.HasSuffix(current.Playback.PreviewURL, "/preview.webm") {
t.Fatalf("expected relay preview webm url, got %#v", current.Playback)
}
if current.Playback.MP4URL != "" {
t.Fatalf("expected relay preview to skip mp4 generation, got %#v", current.Playback)
}
}
func TestPruneExpiredRelaySessionsRemovesOldCache(t *testing.T) {
store, err := newSessionStore(t.TempDir())
if err != nil {
t.Fatalf("newSessionStore: %v", err)
}
session, err := store.createSession(CreateSessionRequest{UserID: "1", Title: "Old Relay", Purpose: "relay"})
if err != nil {
t.Fatalf("createSession: %v", err)
}
if err := os.WriteFile(filepath.Join(store.segmentsDir(session.ID), "000000.webm"), []byte("segment"), 0o644); err != nil {
t.Fatalf("write segment: %v", err)
}
if err := os.MkdirAll(store.publicDir(session.ID), 0o755); err != nil {
t.Fatalf("mkdir public dir: %v", err)
}
if err := os.WriteFile(filepath.Join(store.publicDir(session.ID), "preview.webm"), []byte("preview"), 0o644); err != nil {
t.Fatalf("write preview: %v", err)
}
store.mu.Lock()
store.sessions[session.ID].Purpose = PurposeRelay
store.sessions[session.ID].UpdatedAt = time.Now().UTC().Add(-31 * time.Minute).Format(time.RFC3339)
store.mu.Unlock()
if err := store.pruneExpiredRelaySessions(relayCacheTTL, time.Now().UTC()); err != nil {
t.Fatalf("pruneExpiredRelaySessions: %v", err)
}
if _, err := store.getSession(session.ID); err == nil {
t.Fatalf("expected relay session to be removed from store")
}
if _, err := os.Stat(store.sessionDir(session.ID)); !errors.Is(err, os.ErrNotExist) {
t.Fatalf("expected relay session directory to be removed, got %v", err)
}
if _, err := os.Stat(store.publicDir(session.ID)); !errors.Is(err, os.ErrNotExist) {
t.Fatalf("expected relay public directory to be removed, got %v", err)
}
}