feat relay live viewer frames through media service

这个提交包含在:
cryptocommuniums-afk
2026-03-16 22:43:08 +08:00
父节点 bacd712dbc
当前提交 bb46d26c0e
修改 4 个文件,包含 220 行新增127 行删除

查看文件

@@ -105,6 +105,8 @@ type Session struct {
StreamConnected bool `json:"streamConnected"`
LastStreamAt string `json:"lastStreamAt,omitempty"`
ViewerCount int `json:"viewerCount"`
LiveFrameURL string `json:"liveFrameUrl,omitempty"`
LiveFrameUpdated string `json:"liveFrameUpdatedAt,omitempty"`
Playback PlaybackInfo `json:"playback"`
Segments []SegmentMeta `json:"segments"`
Markers []Marker `json:"markers"`
@@ -229,6 +231,14 @@ func (s *sessionStore) publicDir(id string) string {
return filepath.Join(s.public, "sessions", id)
}
func (s *sessionStore) liveFramePath(id string) string {
return filepath.Join(s.publicDir(id), "live-frame.jpg")
}
func (s *sessionStore) liveFrameURL(id string) string {
return fmt.Sprintf("/media/assets/sessions/%s/live-frame.jpg", id)
}
func (s *sessionStore) saveSession(session *Session) error {
session.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
dir := s.sessionDir(session.ID)
@@ -504,6 +514,12 @@ func (m *mediaServer) handleSession(w http.ResponseWriter, r *http.Request) {
return
}
m.handleSegmentUpload(sessionID, w, r)
case "live-frame":
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
m.handleLiveFrameUpload(sessionID, w, r)
case "markers":
if r.Method != http.MethodPost {
http.NotFound(w, r)
@@ -726,6 +742,59 @@ func (m *mediaServer) handleViewerSignal(sessionID string, w http.ResponseWriter
})
}
func (m *mediaServer) handleLiveFrameUpload(sessionID string, w http.ResponseWriter, r *http.Request) {
if _, err := m.store.getSession(sessionID); err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
body := http.MaxBytesReader(w, r.Body, 4<<20)
defer body.Close()
frame, err := io.ReadAll(body)
if err != nil || len(frame) == 0 {
writeError(w, http.StatusBadRequest, "invalid live frame payload")
return
}
publicDir := m.store.publicDir(sessionID)
if err := os.MkdirAll(publicDir, 0o755); err != nil {
writeError(w, http.StatusInternalServerError, "failed to create live frame directory")
return
}
tmpFile := filepath.Join(publicDir, fmt.Sprintf("live-frame-%s.tmp", randomID()))
if err := os.WriteFile(tmpFile, frame, 0o644); err != nil {
writeError(w, http.StatusInternalServerError, "failed to write live frame")
return
}
defer os.Remove(tmpFile)
finalFile := m.store.liveFramePath(sessionID)
if err := os.Rename(tmpFile, finalFile); err != nil {
writeError(w, http.StatusInternalServerError, "failed to publish live frame")
return
}
session, err := m.store.updateSession(sessionID, func(session *Session) error {
session.LiveFrameURL = m.store.liveFrameURL(sessionID)
session.LiveFrameUpdated = time.Now().UTC().Format(time.RFC3339)
session.StreamConnected = true
session.LastStreamAt = session.LiveFrameUpdated
if session.Status == StatusCreated || session.Status == StatusReconnecting {
session.Status = StatusStreaming
}
session.LastError = ""
return nil
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to update live frame session state")
return
}
writeJSON(w, http.StatusAccepted, map[string]any{"session": session})
}
func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWriter, r *http.Request) {
sequence, err := strconv.Atoi(r.URL.Query().Get("sequence"))
if err != nil || sequence < 0 {

查看文件

@@ -278,3 +278,45 @@ func TestViewerSignalReturnsConflictBeforePublisherTrackReady(t *testing.T) {
t.Fatalf("expected viewer-signal 409 before video track is ready, got %d", res.Code)
}
}
func TestLiveFrameUploadPublishesRelayFrame(t *testing.T) {
store, err := newSessionStore(t.TempDir())
if err != nil {
t.Fatalf("newSessionStore: %v", err)
}
server := newMediaServer(store)
session, err := store.createSession(CreateSessionRequest{UserID: "1", Title: "Relay Session"})
if err != nil {
t.Fatalf("createSession: %v", err)
}
req := httptest.NewRequest(http.MethodPost, "/media/sessions/"+session.ID+"/live-frame", strings.NewReader("jpeg-frame"))
req.Header.Set("Content-Type", "image/jpeg")
res := httptest.NewRecorder()
server.routes().ServeHTTP(res, req)
if res.Code != http.StatusAccepted {
t.Fatalf("expected live-frame upload 202, got %d", res.Code)
}
current, err := store.getSession(session.ID)
if err != nil {
t.Fatalf("getSession: %v", err)
}
if current.LiveFrameURL == "" || current.LiveFrameUpdated == "" {
t.Fatalf("expected live frame metadata to be recorded, got %#v", current)
}
if !current.StreamConnected {
t.Fatalf("expected session stream connected after frame upload")
}
framePath := store.liveFramePath(session.ID)
body, err := os.ReadFile(framePath)
if err != nil {
t.Fatalf("read live frame: %v", err)
}
if string(body) != "jpeg-frame" {
t.Fatalf("unexpected live frame content: %q", string(body))
}
}