Checkpoint: v4.0 media service, compose deploy, and verified docs
这个提交包含在:
861
media/main.go
普通文件
861
media/main.go
普通文件
@@ -0,0 +1,861 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
type SessionStatus string
|
||||
|
||||
const (
|
||||
StatusCreated SessionStatus = "created"
|
||||
StatusRecording SessionStatus = "recording"
|
||||
StatusStreaming SessionStatus = "streaming"
|
||||
StatusReconnecting SessionStatus = "reconnecting"
|
||||
StatusFinalizing SessionStatus = "finalizing"
|
||||
StatusArchived SessionStatus = "archived"
|
||||
StatusFailed SessionStatus = "failed"
|
||||
)
|
||||
|
||||
type ArchiveStatus string
|
||||
|
||||
const (
|
||||
ArchiveIdle ArchiveStatus = "idle"
|
||||
ArchiveQueued ArchiveStatus = "queued"
|
||||
ArchiveProcessing ArchiveStatus = "processing"
|
||||
ArchiveCompleted ArchiveStatus = "completed"
|
||||
ArchiveFailed ArchiveStatus = "failed"
|
||||
)
|
||||
|
||||
type PlaybackInfo struct {
|
||||
WebMURL string `json:"webmUrl,omitempty"`
|
||||
MP4URL string `json:"mp4Url,omitempty"`
|
||||
WebMSize int64 `json:"webmSize,omitempty"`
|
||||
MP4Size int64 `json:"mp4Size,omitempty"`
|
||||
Ready bool `json:"ready"`
|
||||
PreviewURL string `json:"previewUrl,omitempty"`
|
||||
}
|
||||
|
||||
type SegmentMeta struct {
|
||||
Sequence int `json:"sequence"`
|
||||
Filename string `json:"filename"`
|
||||
DurationMS int64 `json:"durationMs"`
|
||||
SizeBytes int64 `json:"sizeBytes"`
|
||||
UploadedAt string `json:"uploadedAt"`
|
||||
ContentType string `json:"contentType"`
|
||||
}
|
||||
|
||||
type Marker struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Label string `json:"label"`
|
||||
Timestamp int64 `json:"timestampMs"`
|
||||
Confidence float64 `json:"confidence,omitempty"`
|
||||
CreatedAt string `json:"createdAt"`
|
||||
}
|
||||
|
||||
type Session struct {
|
||||
ID string `json:"id"`
|
||||
UserID string `json:"userId"`
|
||||
Title string `json:"title"`
|
||||
Status SessionStatus `json:"status"`
|
||||
ArchiveStatus ArchiveStatus `json:"archiveStatus"`
|
||||
Format string `json:"format"`
|
||||
MimeType string `json:"mimeType"`
|
||||
QualityPreset string `json:"qualityPreset"`
|
||||
FacingMode string `json:"facingMode"`
|
||||
DeviceKind string `json:"deviceKind"`
|
||||
ReconnectCount int `json:"reconnectCount"`
|
||||
UploadedSegments int `json:"uploadedSegments"`
|
||||
UploadedBytes int64 `json:"uploadedBytes"`
|
||||
DurationMS int64 `json:"durationMs"`
|
||||
LastError string `json:"lastError,omitempty"`
|
||||
CreatedAt string `json:"createdAt"`
|
||||
UpdatedAt string `json:"updatedAt"`
|
||||
FinalizedAt string `json:"finalizedAt,omitempty"`
|
||||
StreamConnected bool `json:"streamConnected"`
|
||||
LastStreamAt string `json:"lastStreamAt,omitempty"`
|
||||
Playback PlaybackInfo `json:"playback"`
|
||||
Segments []SegmentMeta `json:"segments"`
|
||||
Markers []Marker `json:"markers"`
|
||||
}
|
||||
|
||||
func (s *Session) recomputeAggregates() {
|
||||
s.UploadedSegments = len(s.Segments)
|
||||
var totalBytes int64
|
||||
var totalDuration int64
|
||||
for _, segment := range s.Segments {
|
||||
totalBytes += segment.SizeBytes
|
||||
totalDuration += segment.DurationMS
|
||||
}
|
||||
s.UploadedBytes = totalBytes
|
||||
if totalDuration > 0 {
|
||||
s.DurationMS = totalDuration
|
||||
}
|
||||
}
|
||||
|
||||
type CreateSessionRequest struct {
|
||||
UserID string `json:"userId"`
|
||||
Title string `json:"title"`
|
||||
Format string `json:"format"`
|
||||
MimeType string `json:"mimeType"`
|
||||
QualityPreset string `json:"qualityPreset"`
|
||||
FacingMode string `json:"facingMode"`
|
||||
DeviceKind string `json:"deviceKind"`
|
||||
}
|
||||
|
||||
type SignalRequest struct {
|
||||
SDP string `json:"sdp"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
type MarkerRequest struct {
|
||||
Type string `json:"type"`
|
||||
Label string `json:"label"`
|
||||
Timestamp int64 `json:"timestampMs"`
|
||||
Confidence float64 `json:"confidence,omitempty"`
|
||||
}
|
||||
|
||||
type FinalizeRequest struct {
|
||||
Title string `json:"title"`
|
||||
DurationMS int64 `json:"durationMs"`
|
||||
}
|
||||
|
||||
type sessionStore struct {
|
||||
rootDir string
|
||||
public string
|
||||
mu sync.RWMutex
|
||||
sessions map[string]*Session
|
||||
peers map[string]*webrtc.PeerConnection
|
||||
}
|
||||
|
||||
func newSessionStore(rootDir string) (*sessionStore, error) {
|
||||
store := &sessionStore{
|
||||
rootDir: rootDir,
|
||||
public: filepath.Join(rootDir, "public"),
|
||||
sessions: map[string]*Session{},
|
||||
peers: map[string]*webrtc.PeerConnection{},
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Join(rootDir, "sessions"), 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.MkdirAll(store.public, 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := store.load(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, session := range store.sessions {
|
||||
session.recomputeAggregates()
|
||||
}
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func (s *sessionStore) load() error {
|
||||
pattern := filepath.Join(s.rootDir, "sessions", "*", "session.json")
|
||||
files, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
body, readErr := os.ReadFile(file)
|
||||
if readErr != nil {
|
||||
continue
|
||||
}
|
||||
var session Session
|
||||
if unmarshalErr := json.Unmarshal(body, &session); unmarshalErr != nil {
|
||||
continue
|
||||
}
|
||||
s.sessions[session.ID] = &session
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sessionStore) sessionDir(id string) string {
|
||||
return filepath.Join(s.rootDir, "sessions", id)
|
||||
}
|
||||
|
||||
func (s *sessionStore) segmentsDir(id string) string {
|
||||
return filepath.Join(s.sessionDir(id), "segments")
|
||||
}
|
||||
|
||||
func (s *sessionStore) publicDir(id string) string {
|
||||
return filepath.Join(s.public, "sessions", id)
|
||||
}
|
||||
|
||||
func (s *sessionStore) saveSession(session *Session) error {
|
||||
session.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
||||
dir := s.sessionDir(session.ID)
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
body, err := json.MarshalIndent(session, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(filepath.Join(dir, "session.json"), body, 0o644)
|
||||
}
|
||||
|
||||
func cloneSession(session *Session) *Session {
|
||||
body, _ := json.Marshal(session)
|
||||
var copy Session
|
||||
_ = json.Unmarshal(body, ©)
|
||||
return ©
|
||||
}
|
||||
|
||||
func (s *sessionStore) createSession(input CreateSessionRequest) (*Session, error) {
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
session := &Session{
|
||||
ID: randomID(),
|
||||
UserID: strings.TrimSpace(input.UserID),
|
||||
Title: strings.TrimSpace(input.Title),
|
||||
Status: StatusCreated,
|
||||
ArchiveStatus: ArchiveIdle,
|
||||
Format: defaultString(input.Format, "webm"),
|
||||
MimeType: defaultString(input.MimeType, "video/webm"),
|
||||
QualityPreset: defaultString(input.QualityPreset, "balanced"),
|
||||
FacingMode: defaultString(input.FacingMode, "environment"),
|
||||
DeviceKind: defaultString(input.DeviceKind, "desktop"),
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
Segments: []SegmentMeta{},
|
||||
Markers: []Marker{},
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.sessions[session.ID] = session
|
||||
if err := os.MkdirAll(s.segmentsDir(session.ID), 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.saveSession(session); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cloneSession(session), nil
|
||||
}
|
||||
|
||||
func (s *sessionStore) getSession(id string) (*Session, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
session, ok := s.sessions[id]
|
||||
if !ok {
|
||||
return nil, errors.New("session not found")
|
||||
}
|
||||
return cloneSession(session), nil
|
||||
}
|
||||
|
||||
func (s *sessionStore) replacePeer(id string, peer *webrtc.PeerConnection) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if existing, ok := s.peers[id]; ok {
|
||||
_ = existing.Close()
|
||||
}
|
||||
s.peers[id] = peer
|
||||
}
|
||||
|
||||
func (s *sessionStore) closePeer(id string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if existing, ok := s.peers[id]; ok {
|
||||
_ = existing.Close()
|
||||
delete(s.peers, id)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sessionStore) updateSession(id string, update func(*Session) error) (*Session, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
session, ok := s.sessions[id]
|
||||
if !ok {
|
||||
return nil, errors.New("session not found")
|
||||
}
|
||||
if err := update(session); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
session.recomputeAggregates()
|
||||
if err := s.saveSession(session); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cloneSession(session), nil
|
||||
}
|
||||
|
||||
func (s *sessionStore) listFinalizingSessions() []*Session {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
items := make([]*Session, 0, len(s.sessions))
|
||||
for _, session := range s.sessions {
|
||||
if session.ArchiveStatus == ArchiveQueued || session.ArchiveStatus == ArchiveProcessing {
|
||||
items = append(items, cloneSession(session))
|
||||
}
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
type mediaServer struct {
|
||||
store *sessionStore
|
||||
}
|
||||
|
||||
func newMediaServer(store *sessionStore) *mediaServer {
|
||||
return &mediaServer{store: store}
|
||||
}
|
||||
|
||||
func (m *mediaServer) routes() http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/media/health", m.handleHealth)
|
||||
mux.HandleFunc("/media/sessions", m.handleSessions)
|
||||
mux.HandleFunc("/media/sessions/", m.handleSession)
|
||||
fileServer := http.FileServer(http.Dir(m.store.public))
|
||||
mux.Handle("/media/assets/", http.StripPrefix("/media/assets/", cacheControl(fileServer)))
|
||||
return withCORS(mux)
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"ok": true,
|
||||
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleSessions(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
var input CreateSessionRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
session, err := m.store.createSession(input)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, map[string]any{"session": session})
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleSession(w http.ResponseWriter, r *http.Request) {
|
||||
path := strings.TrimPrefix(r.URL.Path, "/media/sessions/")
|
||||
parts := strings.Split(strings.Trim(path, "/"), "/")
|
||||
if len(parts) == 0 || parts[0] == "" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
sessionID := parts[0]
|
||||
if len(parts) == 1 && r.Method == http.MethodGet {
|
||||
session, err := m.store.getSession(sessionID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"session": session})
|
||||
return
|
||||
}
|
||||
if len(parts) < 2 {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
switch parts[1] {
|
||||
case "signal":
|
||||
if r.Method != http.MethodPost {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
m.handleSignal(sessionID, w, r)
|
||||
case "segments":
|
||||
if r.Method != http.MethodPost {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
m.handleSegmentUpload(sessionID, w, r)
|
||||
case "markers":
|
||||
if r.Method != http.MethodPost {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
m.handleMarker(sessionID, w, r)
|
||||
case "finalize":
|
||||
if r.Method != http.MethodPost {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
m.handleFinalize(sessionID, w, r)
|
||||
case "playback":
|
||||
if r.Method != http.MethodGet {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
session, err := m.store.getSession(sessionID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"playback": session.Playback, "session": session})
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleSignal(sessionID string, w http.ResponseWriter, r *http.Request) {
|
||||
var input SignalRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
session, err := m.store.getSession(sessionID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
config := webrtc.Configuration{
|
||||
ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}},
|
||||
}
|
||||
peer, err := webrtc.NewPeerConnection(config)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to create peer connection")
|
||||
return
|
||||
}
|
||||
m.store.replacePeer(sessionID, peer)
|
||||
_, _ = peer.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
|
||||
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
||||
})
|
||||
_, _ = peer.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
|
||||
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
||||
})
|
||||
|
||||
peer.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||
_, _ = m.store.updateSession(sessionID, func(session *Session) error {
|
||||
session.StreamConnected = state == webrtc.PeerConnectionStateConnected
|
||||
session.LastStreamAt = time.Now().UTC().Format(time.RFC3339)
|
||||
switch state {
|
||||
case webrtc.PeerConnectionStateConnected:
|
||||
session.Status = StatusStreaming
|
||||
session.LastError = ""
|
||||
case webrtc.PeerConnectionStateDisconnected:
|
||||
session.Status = StatusReconnecting
|
||||
session.ReconnectCount++
|
||||
case webrtc.PeerConnectionStateFailed:
|
||||
session.Status = StatusFailed
|
||||
session.LastError = "webrtc peer connection failed"
|
||||
case webrtc.PeerConnectionStateClosed:
|
||||
if session.Status != StatusArchived && session.Status != StatusFinalizing {
|
||||
session.StreamConnected = false
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
peer.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||
_ = receiver
|
||||
go func() {
|
||||
buffer := make([]byte, 1600)
|
||||
for {
|
||||
if _, _, readErr := track.Read(buffer); readErr != nil {
|
||||
return
|
||||
}
|
||||
_, _ = m.store.updateSession(sessionID, func(session *Session) error {
|
||||
session.StreamConnected = true
|
||||
session.Status = StatusStreaming
|
||||
session.LastStreamAt = time.Now().UTC().Format(time.RFC3339)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
offer := webrtc.SessionDescription{
|
||||
Type: parseSDPType(input.Type),
|
||||
SDP: input.SDP,
|
||||
}
|
||||
if err := peer.SetRemoteDescription(offer); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "failed to set remote description")
|
||||
return
|
||||
}
|
||||
answer, err := peer.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to create answer")
|
||||
return
|
||||
}
|
||||
gatherComplete := webrtc.GatheringCompletePromise(peer)
|
||||
if err := peer.SetLocalDescription(answer); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to set local description")
|
||||
return
|
||||
}
|
||||
<-gatherComplete
|
||||
_, _ = m.store.updateSession(session.ID, func(current *Session) error {
|
||||
current.Status = StatusRecording
|
||||
current.StreamConnected = true
|
||||
current.LastStreamAt = time.Now().UTC().Format(time.RFC3339)
|
||||
return nil
|
||||
})
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"type": strings.ToLower(peer.LocalDescription().Type.String()),
|
||||
"sdp": peer.LocalDescription().SDP,
|
||||
})
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWriter, r *http.Request) {
|
||||
sequence, err := strconv.Atoi(r.URL.Query().Get("sequence"))
|
||||
if err != nil || sequence < 0 {
|
||||
writeError(w, http.StatusBadRequest, "invalid sequence")
|
||||
return
|
||||
}
|
||||
durationMS, _ := strconv.ParseInt(r.URL.Query().Get("durationMs"), 10, 64)
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
extension := detectExtension(contentType)
|
||||
filename := fmt.Sprintf("%06d.%s", sequence, extension)
|
||||
segmentPath := filepath.Join(m.store.segmentsDir(sessionID), filename)
|
||||
if err := os.MkdirAll(m.store.segmentsDir(sessionID), 0o755); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
file, err := os.Create(segmentPath)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
size, err := io.Copy(file, r.Body)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
session, err := m.store.updateSession(sessionID, func(session *Session) error {
|
||||
meta := SegmentMeta{
|
||||
Sequence: sequence,
|
||||
Filename: filename,
|
||||
DurationMS: durationMS,
|
||||
SizeBytes: size,
|
||||
UploadedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
ContentType: defaultString(contentType, "video/webm"),
|
||||
}
|
||||
found := false
|
||||
for index := range session.Segments {
|
||||
if session.Segments[index].Sequence == sequence {
|
||||
session.Segments[index] = meta
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
session.Segments = append(session.Segments, meta)
|
||||
}
|
||||
sort.Slice(session.Segments, func(i, j int) bool {
|
||||
return session.Segments[i].Sequence < session.Segments[j].Sequence
|
||||
})
|
||||
session.Status = StatusRecording
|
||||
session.LastError = ""
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusAccepted, map[string]any{"session": session})
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleMarker(sessionID string, w http.ResponseWriter, r *http.Request) {
|
||||
var input MarkerRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
session, err := m.store.updateSession(sessionID, func(session *Session) error {
|
||||
session.Markers = append(session.Markers, Marker{
|
||||
ID: randomID(),
|
||||
Type: defaultString(input.Type, "manual"),
|
||||
Label: defaultString(input.Label, "标记点"),
|
||||
Timestamp: input.Timestamp,
|
||||
Confidence: input.Confidence,
|
||||
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
})
|
||||
sort.Slice(session.Markers, func(i, j int) bool {
|
||||
return session.Markers[i].Timestamp < session.Markers[j].Timestamp
|
||||
})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusAccepted, map[string]any{"session": session})
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleFinalize(sessionID string, w http.ResponseWriter, r *http.Request) {
|
||||
var input FinalizeRequest
|
||||
_ = json.NewDecoder(r.Body).Decode(&input)
|
||||
m.store.closePeer(sessionID)
|
||||
session, err := m.store.updateSession(sessionID, func(session *Session) error {
|
||||
session.Status = StatusFinalizing
|
||||
session.ArchiveStatus = ArchiveQueued
|
||||
session.FinalizedAt = time.Now().UTC().Format(time.RFC3339)
|
||||
if strings.TrimSpace(input.Title) != "" {
|
||||
session.Title = strings.TrimSpace(input.Title)
|
||||
}
|
||||
if input.DurationMS > 0 {
|
||||
session.DurationMS = input.DurationMS
|
||||
}
|
||||
session.StreamConnected = false
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusAccepted, map[string]any{"session": session})
|
||||
}
|
||||
|
||||
func runWorkerLoop(ctx context.Context, store *sessionStore, interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
sessions := store.listFinalizingSessions()
|
||||
for _, session := range sessions {
|
||||
if err := processSession(store, session.ID); err != nil {
|
||||
log.Printf("[worker] failed to process session %s: %v", session.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processSession(store *sessionStore, sessionID string) error {
|
||||
session, err := store.updateSession(sessionID, func(session *Session) error {
|
||||
if session.ArchiveStatus == ArchiveProcessing {
|
||||
return errors.New("already processing")
|
||||
}
|
||||
session.ArchiveStatus = ArchiveProcessing
|
||||
session.Status = StatusFinalizing
|
||||
session.LastError = ""
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "already processing") {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
if len(session.Segments) == 0 {
|
||||
_, _ = store.updateSession(sessionID, func(session *Session) error {
|
||||
session.ArchiveStatus = ArchiveFailed
|
||||
session.Status = StatusFailed
|
||||
session.LastError = "no uploaded segments found"
|
||||
return nil
|
||||
})
|
||||
return errors.New("no uploaded segments found")
|
||||
}
|
||||
|
||||
publicDir := store.publicDir(sessionID)
|
||||
if err := os.MkdirAll(publicDir, 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
outputWebM := filepath.Join(publicDir, "recording.webm")
|
||||
outputMP4 := filepath.Join(publicDir, "recording.mp4")
|
||||
listFile := filepath.Join(store.sessionDir(sessionID), "concat.txt")
|
||||
|
||||
inputs := make([]string, 0, len(session.Segments))
|
||||
sort.Slice(session.Segments, func(i, j int) bool {
|
||||
return session.Segments[i].Sequence < session.Segments[j].Sequence
|
||||
})
|
||||
for _, segment := range session.Segments {
|
||||
inputs = append(inputs, filepath.Join(store.segmentsDir(sessionID), segment.Filename))
|
||||
}
|
||||
if err := writeConcatList(listFile, inputs); err != nil {
|
||||
return markArchiveError(store, sessionID, err)
|
||||
}
|
||||
|
||||
if len(inputs) == 1 {
|
||||
body, copyErr := os.ReadFile(inputs[0])
|
||||
if copyErr != nil {
|
||||
return markArchiveError(store, sessionID, copyErr)
|
||||
}
|
||||
if writeErr := os.WriteFile(outputWebM, body, 0o644); writeErr != nil {
|
||||
return markArchiveError(store, sessionID, writeErr)
|
||||
}
|
||||
} else {
|
||||
copyErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c", "copy", outputWebM)
|
||||
if copyErr != nil {
|
||||
reencodeErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c:v", "libvpx-vp9", "-b:v", "1800k", "-c:a", "libopus", outputWebM)
|
||||
if reencodeErr != nil {
|
||||
return markArchiveError(store, sessionID, fmt.Errorf("concat failed: %w / %v", copyErr, reencodeErr))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mp4Err := runFFmpeg("-y", "-i", outputWebM, "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", "-c:a", "aac", "-movflags", "+faststart", outputMP4)
|
||||
if mp4Err != nil {
|
||||
log.Printf("[worker] mp4 archive generation failed for %s: %v", sessionID, mp4Err)
|
||||
}
|
||||
|
||||
webmInfo, webmStatErr := os.Stat(outputWebM)
|
||||
if webmStatErr != nil {
|
||||
return markArchiveError(store, sessionID, webmStatErr)
|
||||
}
|
||||
var mp4Size int64
|
||||
var mp4URL string
|
||||
if info, statErr := os.Stat(outputMP4); statErr == nil {
|
||||
mp4Size = info.Size()
|
||||
mp4URL = fmt.Sprintf("/media/assets/sessions/%s/recording.mp4", sessionID)
|
||||
}
|
||||
_, err = store.updateSession(sessionID, func(session *Session) error {
|
||||
session.ArchiveStatus = ArchiveCompleted
|
||||
session.Status = StatusArchived
|
||||
session.Playback = PlaybackInfo{
|
||||
WebMURL: fmt.Sprintf("/media/assets/sessions/%s/recording.webm", sessionID),
|
||||
MP4URL: mp4URL,
|
||||
WebMSize: webmInfo.Size(),
|
||||
MP4Size: mp4Size,
|
||||
Ready: true,
|
||||
PreviewURL: fmt.Sprintf("/media/assets/sessions/%s/recording.webm", sessionID),
|
||||
}
|
||||
session.LastError = ""
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func markArchiveError(store *sessionStore, sessionID string, err error) error {
|
||||
_, _ = store.updateSession(sessionID, func(session *Session) error {
|
||||
session.ArchiveStatus = ArchiveFailed
|
||||
session.Status = StatusFailed
|
||||
session.LastError = err.Error()
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func writeConcatList(path string, inputs []string) error {
|
||||
lines := make([]string, 0, len(inputs))
|
||||
for _, input := range inputs {
|
||||
lines = append(lines, fmt.Sprintf("file '%s'", strings.ReplaceAll(input, "'", "'\\''")))
|
||||
}
|
||||
return os.WriteFile(path, []byte(strings.Join(lines, "\n")), 0o644)
|
||||
}
|
||||
|
||||
func runFFmpeg(args ...string) error {
|
||||
cmd := exec.Command("ffmpeg", args...)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %s", err, strings.TrimSpace(string(output)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseSDPType(value string) webrtc.SDPType {
|
||||
switch strings.ToLower(value) {
|
||||
case "offer":
|
||||
return webrtc.SDPTypeOffer
|
||||
case "pranswer":
|
||||
return webrtc.SDPTypePranswer
|
||||
case "rollback":
|
||||
return webrtc.SDPTypeRollback
|
||||
default:
|
||||
return webrtc.SDPTypeOffer
|
||||
}
|
||||
}
|
||||
|
||||
func detectExtension(contentType string) string {
|
||||
switch {
|
||||
case strings.Contains(contentType, "mp4"):
|
||||
return "mp4"
|
||||
case strings.Contains(contentType, "ogg"):
|
||||
return "ogg"
|
||||
default:
|
||||
return "webm"
|
||||
}
|
||||
}
|
||||
|
||||
func defaultString(value string, fallback string) string {
|
||||
if strings.TrimSpace(value) == "" {
|
||||
return fallback
|
||||
}
|
||||
return strings.TrimSpace(value)
|
||||
}
|
||||
|
||||
func randomID() string {
|
||||
buffer := make([]byte, 12)
|
||||
if _, err := rand.Read(buffer); err != nil {
|
||||
return strconv.FormatInt(time.Now().UnixNano(), 36)
|
||||
}
|
||||
return hex.EncodeToString(buffer)
|
||||
}
|
||||
|
||||
func withCORS(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization,X-User-Id")
|
||||
if r.Method == http.MethodOptions {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func cacheControl(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Cache-Control", "public, max-age=31536000, immutable")
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func writeJSON(w http.ResponseWriter, status int, body any) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
_ = json.NewEncoder(w).Encode(body)
|
||||
}
|
||||
|
||||
func writeError(w http.ResponseWriter, status int, message string) {
|
||||
writeJSON(w, status, map[string]string{"error": message})
|
||||
}
|
||||
|
||||
func main() {
|
||||
mode := defaultString(os.Getenv("MEDIA_MODE"), "serve")
|
||||
dataDir := defaultString(os.Getenv("MEDIA_DATA_DIR"), "./data/media")
|
||||
addr := defaultString(os.Getenv("MEDIA_ADDR"), ":8081")
|
||||
workerInterval := 3 * time.Second
|
||||
|
||||
store, err := newSessionStore(dataDir)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create store: %v", err)
|
||||
}
|
||||
|
||||
switch mode {
|
||||
case "worker":
|
||||
log.Printf("media worker running with data dir %s", dataDir)
|
||||
runWorkerLoop(context.Background(), store, workerInterval)
|
||||
default:
|
||||
server := newMediaServer(store)
|
||||
if os.Getenv("MEDIA_EMBEDDED_WORKER") != "0" {
|
||||
go runWorkerLoop(context.Background(), store, workerInterval)
|
||||
}
|
||||
log.Printf("media service listening on %s with data dir %s", addr, dataDir)
|
||||
if err := http.ListenAndServe(addr, server.routes()); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
在新工单中引用
屏蔽一个用户