Harden relay preview mp4 handling
这个提交包含在:
174
media/main.go
174
media/main.go
@@ -118,6 +118,7 @@ type Session struct {
|
||||
UpdatedAt string `json:"updatedAt"`
|
||||
FinalizedAt string `json:"finalizedAt,omitempty"`
|
||||
PreviewUpdatedAt string `json:"previewUpdatedAt,omitempty"`
|
||||
RelayInitFilename string `json:"relayInitFilename,omitempty"`
|
||||
StreamConnected bool `json:"streamConnected"`
|
||||
LastStreamAt string `json:"lastStreamAt,omitempty"`
|
||||
ViewerCount int `json:"viewerCount"`
|
||||
@@ -258,6 +259,10 @@ func (s *sessionStore) publicDir(id string) string {
|
||||
return filepath.Join(s.public, "sessions", id)
|
||||
}
|
||||
|
||||
func (s *sessionStore) relayInitPath(id string) string {
|
||||
return filepath.Join(s.sessionDir(id), "relay-init.mp4")
|
||||
}
|
||||
|
||||
func (s *sessionStore) liveFramePath(id string) string {
|
||||
return filepath.Join(s.publicDir(id), "live-frame.jpg")
|
||||
}
|
||||
@@ -974,6 +979,7 @@ func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWrite
|
||||
}
|
||||
|
||||
removedSegments := []SegmentMeta{}
|
||||
persistRelayInit := false
|
||||
session, err := m.store.updateSession(sessionID, func(session *Session) error {
|
||||
meta := SegmentMeta{
|
||||
Sequence: sequence,
|
||||
@@ -994,6 +1000,10 @@ func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWrite
|
||||
if !found {
|
||||
session.Segments = append(session.Segments, meta)
|
||||
}
|
||||
if session.Purpose == PurposeRelay && extension == "mp4" && session.RelayInitFilename == "" && sequence <= 1 {
|
||||
session.RelayInitFilename = filename
|
||||
persistRelayInit = true
|
||||
}
|
||||
sortSegmentsBySequence(session.Segments)
|
||||
if session.Purpose == PurposeRelay {
|
||||
var kept []SegmentMeta
|
||||
@@ -1008,6 +1018,11 @@ func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWrite
|
||||
writeError(w, http.StatusNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
if persistRelayInit {
|
||||
if copyErr := copyFile(segmentPath, m.store.relayInitPath(sessionID)); copyErr != nil {
|
||||
log.Printf("failed to persist relay init segment for %s: %v", sessionID, copyErr)
|
||||
}
|
||||
}
|
||||
for _, segment := range removedSegments {
|
||||
segmentPath := filepath.Join(m.store.segmentsDir(sessionID), segment.Filename)
|
||||
if removeErr := os.Remove(segmentPath); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) {
|
||||
@@ -1173,33 +1188,81 @@ func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool
|
||||
outputMP4 := filepath.Join(publicDir, baseName+".mp4")
|
||||
listFile := filepath.Join(store.sessionDir(sessionID), "concat.txt")
|
||||
|
||||
validSegments := make([]SegmentMeta, 0, len(session.Segments))
|
||||
inputs := make([]string, 0, len(session.Segments))
|
||||
sortSegmentsBySequence(session.Segments)
|
||||
for _, segment := range session.Segments {
|
||||
inputs = append(inputs, filepath.Join(store.segmentsDir(sessionID), segment.Filename))
|
||||
}
|
||||
if err := writeConcatList(listFile, inputs); err != nil {
|
||||
return markProcessingError(store, sessionID, err, finalize)
|
||||
}
|
||||
|
||||
if len(inputs) == 1 {
|
||||
body, copyErr := os.ReadFile(inputs[0])
|
||||
if copyErr != nil {
|
||||
return markProcessingError(store, sessionID, copyErr, finalize)
|
||||
inputPath := filepath.Join(store.segmentsDir(sessionID), segment.Filename)
|
||||
info, statErr := os.Stat(inputPath)
|
||||
if statErr != nil {
|
||||
continue
|
||||
}
|
||||
if writeErr := os.WriteFile(outputWebM, body, 0o644); writeErr != nil {
|
||||
return markProcessingError(store, sessionID, writeErr, finalize)
|
||||
if shouldSkipSegment(segment, info.Size()) {
|
||||
continue
|
||||
}
|
||||
validSegments = append(validSegments, segment)
|
||||
inputs = append(inputs, inputPath)
|
||||
}
|
||||
if len(inputs) == 0 {
|
||||
return markProcessingError(store, sessionID, errors.New("no valid uploaded segments found"), finalize)
|
||||
}
|
||||
if !finalize && session.Purpose == PurposeRelay && usesMP4Segments(validSegments) {
|
||||
mergedInput, cleanup, mergeErr := buildRelayMP4Source(store, session, validSegments, inputs)
|
||||
if cleanup != nil {
|
||||
defer cleanup()
|
||||
}
|
||||
if mergeErr == nil {
|
||||
transcodeErr := runFFmpeg(
|
||||
"-y",
|
||||
"-i",
|
||||
mergedInput,
|
||||
"-c:v",
|
||||
"libvpx-vp9",
|
||||
"-b:v",
|
||||
"1800k",
|
||||
"-c:a",
|
||||
"libopus",
|
||||
outputWebM,
|
||||
)
|
||||
if transcodeErr == nil {
|
||||
goto finalizePlayback
|
||||
}
|
||||
mergeErr = transcodeErr
|
||||
}
|
||||
if err := writeConcatList(listFile, inputs); err != nil {
|
||||
return markProcessingError(store, sessionID, err, finalize)
|
||||
}
|
||||
} else {
|
||||
copyErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c", "copy", outputWebM)
|
||||
if copyErr != nil {
|
||||
reencodeErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c:v", "libvpx-vp9", "-b:v", "1800k", "-c:a", "libopus", outputWebM)
|
||||
if reencodeErr != nil {
|
||||
return markProcessingError(store, sessionID, fmt.Errorf("concat failed: %w / %v", copyErr, reencodeErr), finalize)
|
||||
return markProcessingError(store, sessionID, fmt.Errorf("relay mp4 preview failed: %w / %v / %v", mergeErr, copyErr, reencodeErr), finalize)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := writeConcatList(listFile, inputs); err != nil {
|
||||
return markProcessingError(store, sessionID, err, finalize)
|
||||
}
|
||||
if len(inputs) == 1 {
|
||||
body, copyErr := os.ReadFile(inputs[0])
|
||||
if copyErr != nil {
|
||||
return markProcessingError(store, sessionID, copyErr, finalize)
|
||||
}
|
||||
if writeErr := os.WriteFile(outputWebM, body, 0o644); writeErr != nil {
|
||||
return markProcessingError(store, sessionID, writeErr, finalize)
|
||||
}
|
||||
} else {
|
||||
copyErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c", "copy", outputWebM)
|
||||
if copyErr != nil {
|
||||
reencodeErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c:v", "libvpx-vp9", "-b:v", "1800k", "-c:a", "libopus", outputWebM)
|
||||
if reencodeErr != nil {
|
||||
return markProcessingError(store, sessionID, fmt.Errorf("concat failed: %w / %v", copyErr, reencodeErr), finalize)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
finalizePlayback:
|
||||
if finalize {
|
||||
mp4Err := runFFmpeg("-y", "-i", outputWebM, "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", "-c:a", "aac", "-movflags", "+faststart", outputMP4)
|
||||
if mp4Err != nil {
|
||||
@@ -1226,7 +1289,7 @@ func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool
|
||||
|
||||
_, updateErr := store.updateSession(sessionID, func(session *Session) error {
|
||||
session.Playback.PreviewURL = previewURL
|
||||
session.PreviewSegments = len(inputs)
|
||||
session.PreviewSegments = len(validSegments)
|
||||
session.PreviewUpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
||||
session.PreviewStatus = PreviewReady
|
||||
session.LastError = ""
|
||||
@@ -1249,6 +1312,15 @@ func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool
|
||||
|
||||
func markProcessingError(store *sessionStore, sessionID string, err error, finalize bool) error {
|
||||
_, _ = store.updateSession(sessionID, func(session *Session) error {
|
||||
if !finalize {
|
||||
previewPath := filepath.Join(store.publicDir(sessionID), "preview.webm")
|
||||
if info, statErr := os.Stat(previewPath); statErr == nil && info.Size() > 0 {
|
||||
session.PreviewStatus = PreviewReady
|
||||
session.Playback.PreviewURL = fmt.Sprintf("/media/assets/sessions/%s/preview.webm", sessionID)
|
||||
session.LastError = err.Error()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
session.PreviewStatus = PreviewFailed
|
||||
if finalize {
|
||||
session.ArchiveStatus = ArchiveFailed
|
||||
@@ -1268,6 +1340,78 @@ func writeConcatList(path string, inputs []string) error {
|
||||
return os.WriteFile(path, []byte(strings.Join(lines, "\n")), 0o644)
|
||||
}
|
||||
|
||||
func usesMP4Segments(segments []SegmentMeta) bool {
|
||||
for _, segment := range segments {
|
||||
if strings.HasSuffix(strings.ToLower(segment.Filename), ".mp4") || strings.Contains(strings.ToLower(segment.ContentType), "mp4") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func shouldSkipSegment(segment SegmentMeta, sizeBytes int64) bool {
|
||||
if sizeBytes <= 0 {
|
||||
return true
|
||||
}
|
||||
if strings.HasSuffix(strings.ToLower(segment.Filename), ".mp4") && sizeBytes < 4096 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func buildRelayMP4Source(store *sessionStore, session *Session, segments []SegmentMeta, inputs []string) (string, func(), error) {
|
||||
sourceFiles := make([]string, 0, len(inputs)+1)
|
||||
initPath := store.relayInitPath(session.ID)
|
||||
if session.RelayInitFilename != "" && len(segments) > 0 && segments[0].Filename != session.RelayInitFilename {
|
||||
if info, err := os.Stat(initPath); err == nil && info.Size() > 0 {
|
||||
sourceFiles = append(sourceFiles, initPath)
|
||||
}
|
||||
}
|
||||
sourceFiles = append(sourceFiles, inputs...)
|
||||
if len(sourceFiles) == 0 {
|
||||
return "", nil, errors.New("no relay mp4 source segments found")
|
||||
}
|
||||
mergedPath := filepath.Join(store.sessionDir(session.ID), "relay-preview-source.mp4")
|
||||
output, err := os.Create(mergedPath)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
defer output.Close()
|
||||
for _, source := range sourceFiles {
|
||||
input, openErr := os.Open(source)
|
||||
if openErr != nil {
|
||||
return "", nil, openErr
|
||||
}
|
||||
if _, copyErr := io.Copy(output, input); copyErr != nil {
|
||||
input.Close()
|
||||
return "", nil, copyErr
|
||||
}
|
||||
if closeErr := input.Close(); closeErr != nil {
|
||||
return "", nil, closeErr
|
||||
}
|
||||
}
|
||||
return mergedPath, func() {
|
||||
_ = os.Remove(mergedPath)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func copyFile(source string, target string) error {
|
||||
input, err := os.Open(source)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer input.Close()
|
||||
output, err := os.Create(target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer output.Close()
|
||||
if _, err := io.Copy(output, input); err != nil {
|
||||
return err
|
||||
}
|
||||
return output.Close()
|
||||
}
|
||||
|
||||
func runFFmpeg(args ...string) error {
|
||||
cmd := exec.Command("ffmpeg", args...)
|
||||
output, err := cmd.CombinedOutput()
|
||||
|
||||
@@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
@@ -410,6 +411,186 @@ func TestProcessRelayPreviewPublishesBufferedWebM(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSegmentUploadPersistsRelayMP4InitSegment(t *testing.T) {
|
||||
store, err := newSessionStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("newSessionStore: %v", err)
|
||||
}
|
||||
|
||||
server := newMediaServer(store)
|
||||
session, err := store.createSession(CreateSessionRequest{UserID: "1", Title: "Relay MP4", Purpose: "relay", RelayBufferSeconds: 120})
|
||||
if err != nil {
|
||||
t.Fatalf("createSession: %v", err)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/media/sessions/"+session.ID+"/segments?sequence=1&durationMs=10000", strings.NewReader("mp4-init"))
|
||||
req.Header.Set("Content-Type", "video/mp4;codecs=avc1")
|
||||
res := httptest.NewRecorder()
|
||||
server.routes().ServeHTTP(res, req)
|
||||
if res.Code != http.StatusAccepted {
|
||||
t.Fatalf("expected segment upload 202, got %d", res.Code)
|
||||
}
|
||||
|
||||
current, err := store.getSession(session.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("getSession: %v", err)
|
||||
}
|
||||
if current.RelayInitFilename != "000001.mp4" {
|
||||
t.Fatalf("expected relay init filename to be recorded, got %q", current.RelayInitFilename)
|
||||
}
|
||||
body, err := os.ReadFile(store.relayInitPath(session.ID))
|
||||
if err != nil {
|
||||
t.Fatalf("read relay init: %v", err)
|
||||
}
|
||||
if string(body) != "mp4-init" {
|
||||
t.Fatalf("unexpected relay init contents: %q", string(body))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessRelayPreviewUsesPersistedInitForMP4Fragments(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
store, err := newSessionStore(tempDir)
|
||||
if err != nil {
|
||||
t.Fatalf("newSessionStore: %v", err)
|
||||
}
|
||||
|
||||
session, err := store.createSession(CreateSessionRequest{UserID: "1", Title: "Relay MP4 Preview", Purpose: "relay", RelayBufferSeconds: 120})
|
||||
if err != nil {
|
||||
t.Fatalf("createSession: %v", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(store.relayInitPath(session.ID), []byte(strings.Repeat("i", 6000)), 0o644); err != nil {
|
||||
t.Fatalf("write relay init: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(store.segmentsDir(session.ID), "000082.mp4"), []byte(strings.Repeat("a", 6000)), 0o644); err != nil {
|
||||
t.Fatalf("write segment 82: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(store.segmentsDir(session.ID), "000083.mp4"), []byte(strings.Repeat("b", 6000)), 0o644); err != nil {
|
||||
t.Fatalf("write segment 83: %v", err)
|
||||
}
|
||||
|
||||
if _, err := store.updateSession(session.ID, func(current *Session) error {
|
||||
current.Purpose = PurposeRelay
|
||||
current.RelayInitFilename = "000001.mp4"
|
||||
current.Segments = []SegmentMeta{
|
||||
{
|
||||
Sequence: 82,
|
||||
Filename: "000082.mp4",
|
||||
DurationMS: 10000,
|
||||
SizeBytes: 6000,
|
||||
ContentType: "video/mp4;codecs=avc1",
|
||||
},
|
||||
{
|
||||
Sequence: 83,
|
||||
Filename: "000083.mp4",
|
||||
DurationMS: 10000,
|
||||
SizeBytes: 6000,
|
||||
ContentType: "video/mp4;codecs=avc1",
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("updateSession: %v", err)
|
||||
}
|
||||
|
||||
fakeFFmpeg := filepath.Join(tempDir, "ffmpeg")
|
||||
script := "#!/bin/sh\ninput=''\noutput=''\nprev=''\nfor arg in \"$@\"; do\n if [ \"$prev\" = '-i' ]; then input=\"$arg\"; fi\n prev=\"$arg\"\n output=\"$arg\"\ndone\nif [ -n \"$input\" ] && [ -f \"$input\" ]; then cp \"$input\" \"$output\"; else : > \"$output\"; fi\n"
|
||||
if err := os.WriteFile(fakeFFmpeg, []byte(script), 0o755); err != nil {
|
||||
t.Fatalf("write fake ffmpeg: %v", err)
|
||||
}
|
||||
t.Setenv("PATH", tempDir+string(os.PathListSeparator)+os.Getenv("PATH"))
|
||||
|
||||
if err := processRollingPreview(store, session.ID); err != nil {
|
||||
t.Fatalf("processRollingPreview: %v", err)
|
||||
}
|
||||
|
||||
current, err := store.getSession(session.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("getSession: %v", err)
|
||||
}
|
||||
if current.PreviewStatus != PreviewReady {
|
||||
t.Fatalf("expected preview ready, got %s", current.PreviewStatus)
|
||||
}
|
||||
if current.Playback.PreviewURL == "" {
|
||||
t.Fatalf("expected preview url to be populated")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessRelayPreviewKeepsPreviousPreviewOnFailure(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
store, err := newSessionStore(tempDir)
|
||||
if err != nil {
|
||||
t.Fatalf("newSessionStore: %v", err)
|
||||
}
|
||||
|
||||
session, err := store.createSession(CreateSessionRequest{UserID: "1", Title: "Relay Existing Preview", Purpose: "relay", RelayBufferSeconds: 120})
|
||||
if err != nil {
|
||||
t.Fatalf("createSession: %v", err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(store.publicDir(session.ID), 0o755); err != nil {
|
||||
t.Fatalf("mkdir public dir: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(store.publicDir(session.ID), "preview.webm"), []byte("existing-preview"), 0o644); err != nil {
|
||||
t.Fatalf("write preview: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(store.segmentsDir(session.ID), "000001.webm"), []byte("segment-one"), 0o644); err != nil {
|
||||
t.Fatalf("write segment 1: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(store.segmentsDir(session.ID), "000002.webm"), []byte("segment-two"), 0o644); err != nil {
|
||||
t.Fatalf("write segment 2: %v", err)
|
||||
}
|
||||
if _, err := store.updateSession(session.ID, func(current *Session) error {
|
||||
current.Purpose = PurposeRelay
|
||||
current.PreviewStatus = PreviewReady
|
||||
current.Playback.PreviewURL = fmt.Sprintf("/media/assets/sessions/%s/preview.webm", session.ID)
|
||||
current.Segments = []SegmentMeta{
|
||||
{
|
||||
Sequence: 1,
|
||||
Filename: "000001.webm",
|
||||
DurationMS: 10000,
|
||||
SizeBytes: int64(len("segment-one")),
|
||||
ContentType: "video/webm",
|
||||
},
|
||||
{
|
||||
Sequence: 2,
|
||||
Filename: "000002.webm",
|
||||
DurationMS: 10000,
|
||||
SizeBytes: int64(len("segment-two")),
|
||||
ContentType: "video/webm",
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("updateSession: %v", err)
|
||||
}
|
||||
|
||||
fakeFFmpeg := filepath.Join(tempDir, "ffmpeg")
|
||||
script := "#!/bin/sh\nexit 1\n"
|
||||
if err := os.WriteFile(fakeFFmpeg, []byte(script), 0o755); err != nil {
|
||||
t.Fatalf("write fake ffmpeg: %v", err)
|
||||
}
|
||||
t.Setenv("PATH", tempDir+string(os.PathListSeparator)+os.Getenv("PATH"))
|
||||
|
||||
if err := processRollingPreview(store, session.ID); err == nil {
|
||||
t.Fatalf("expected processRollingPreview to surface failure")
|
||||
}
|
||||
|
||||
current, err := store.getSession(session.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("getSession: %v", err)
|
||||
}
|
||||
if current.PreviewStatus != PreviewReady {
|
||||
t.Fatalf("expected previous preview to remain ready, got %s", current.PreviewStatus)
|
||||
}
|
||||
if current.Playback.PreviewURL == "" {
|
||||
t.Fatalf("expected preview url to remain available")
|
||||
}
|
||||
if current.LastError == "" {
|
||||
t.Fatalf("expected last error to be recorded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPruneExpiredRelaySessionsRemovesOldCache(t *testing.T) {
|
||||
store, err := newSessionStore(t.TempDir())
|
||||
if err != nil {
|
||||
|
||||
在新工单中引用
屏蔽一个用户