package main import ( "context" "crypto/rand" "encoding/hex" "encoding/json" "errors" "fmt" "io" "log" "net/http" "os" "os/exec" "path/filepath" "sort" "strconv" "strings" "sync" "time" "github.com/pion/webrtc/v4" ) type SessionStatus string const ( StatusCreated SessionStatus = "created" StatusRecording SessionStatus = "recording" StatusStreaming SessionStatus = "streaming" StatusReconnecting SessionStatus = "reconnecting" StatusFinalizing SessionStatus = "finalizing" StatusArchived SessionStatus = "archived" StatusFailed SessionStatus = "failed" ) type ArchiveStatus string const ( ArchiveIdle ArchiveStatus = "idle" ArchiveQueued ArchiveStatus = "queued" ArchiveProcessing ArchiveStatus = "processing" ArchiveCompleted ArchiveStatus = "completed" ArchiveFailed ArchiveStatus = "failed" ) type PreviewStatus string const ( PreviewIdle PreviewStatus = "idle" PreviewProcessing PreviewStatus = "processing" PreviewReady PreviewStatus = "ready" PreviewFailed PreviewStatus = "failed" ) type PlaybackInfo struct { WebMURL string `json:"webmUrl,omitempty"` MP4URL string `json:"mp4Url,omitempty"` WebMSize int64 `json:"webmSize,omitempty"` MP4Size int64 `json:"mp4Size,omitempty"` Ready bool `json:"ready"` PreviewURL string `json:"previewUrl,omitempty"` } type SegmentMeta struct { Sequence int `json:"sequence"` Filename string `json:"filename"` DurationMS int64 `json:"durationMs"` SizeBytes int64 `json:"sizeBytes"` UploadedAt string `json:"uploadedAt"` ContentType string `json:"contentType"` } type Marker struct { ID string `json:"id"` Type string `json:"type"` Label string `json:"label"` Timestamp int64 `json:"timestampMs"` Confidence float64 `json:"confidence,omitempty"` CreatedAt string `json:"createdAt"` } type Session struct { ID string `json:"id"` UserID string `json:"userId"` Title string `json:"title"` Status SessionStatus `json:"status"` ArchiveStatus ArchiveStatus `json:"archiveStatus"` PreviewStatus PreviewStatus `json:"previewStatus"` Format string `json:"format"` MimeType string `json:"mimeType"` QualityPreset string `json:"qualityPreset"` FacingMode string `json:"facingMode"` DeviceKind string `json:"deviceKind"` ReconnectCount int `json:"reconnectCount"` UploadedSegments int `json:"uploadedSegments"` UploadedBytes int64 `json:"uploadedBytes"` PreviewSegments int `json:"previewSegments"` DurationMS int64 `json:"durationMs"` LastError string `json:"lastError,omitempty"` CreatedAt string `json:"createdAt"` UpdatedAt string `json:"updatedAt"` FinalizedAt string `json:"finalizedAt,omitempty"` PreviewUpdatedAt string `json:"previewUpdatedAt,omitempty"` StreamConnected bool `json:"streamConnected"` LastStreamAt string `json:"lastStreamAt,omitempty"` Playback PlaybackInfo `json:"playback"` Segments []SegmentMeta `json:"segments"` Markers []Marker `json:"markers"` } func (s *Session) recomputeAggregates() { s.UploadedSegments = len(s.Segments) var totalBytes int64 var totalDuration int64 for _, segment := range s.Segments { totalBytes += segment.SizeBytes totalDuration += segment.DurationMS } s.UploadedBytes = totalBytes if totalDuration > 0 { s.DurationMS = totalDuration } } type CreateSessionRequest struct { UserID string `json:"userId"` Title string `json:"title"` Format string `json:"format"` MimeType string `json:"mimeType"` QualityPreset string `json:"qualityPreset"` FacingMode string `json:"facingMode"` DeviceKind string `json:"deviceKind"` } type SignalRequest struct { SDP string `json:"sdp"` Type string `json:"type"` } type MarkerRequest struct { Type string `json:"type"` Label string `json:"label"` Timestamp int64 `json:"timestampMs"` Confidence float64 `json:"confidence,omitempty"` } type FinalizeRequest struct { Title string `json:"title"` DurationMS int64 `json:"durationMs"` } type sessionStore struct { rootDir string public string mu sync.RWMutex sessions map[string]*Session peers map[string]*webrtc.PeerConnection } func newSessionStore(rootDir string) (*sessionStore, error) { store := &sessionStore{ rootDir: rootDir, public: filepath.Join(rootDir, "public"), sessions: map[string]*Session{}, peers: map[string]*webrtc.PeerConnection{}, } if err := os.MkdirAll(filepath.Join(rootDir, "sessions"), 0o755); err != nil { return nil, err } if err := os.MkdirAll(store.public, 0o755); err != nil { return nil, err } if err := store.refreshFromDisk(); err != nil { return nil, err } for _, session := range store.sessions { session.recomputeAggregates() } return store, nil } func (s *sessionStore) loadSessionsFromDisk() (map[string]*Session, error) { pattern := filepath.Join(s.rootDir, "sessions", "*", "session.json") files, err := filepath.Glob(pattern) if err != nil { return nil, err } sessions := make(map[string]*Session, len(files)) for _, file := range files { body, readErr := os.ReadFile(file) if readErr != nil { continue } var session Session if unmarshalErr := json.Unmarshal(body, &session); unmarshalErr != nil { continue } sessions[session.ID] = &session } return sessions, nil } func (s *sessionStore) refreshFromDisk() error { sessions, err := s.loadSessionsFromDisk() if err != nil { return err } s.mu.Lock() defer s.mu.Unlock() s.sessions = sessions return nil } func (s *sessionStore) sessionDir(id string) string { return filepath.Join(s.rootDir, "sessions", id) } func (s *sessionStore) segmentsDir(id string) string { return filepath.Join(s.sessionDir(id), "segments") } func (s *sessionStore) publicDir(id string) string { return filepath.Join(s.public, "sessions", id) } func (s *sessionStore) saveSession(session *Session) error { session.UpdatedAt = time.Now().UTC().Format(time.RFC3339) dir := s.sessionDir(session.ID) if err := os.MkdirAll(dir, 0o755); err != nil { return err } body, err := json.MarshalIndent(session, "", " ") if err != nil { return err } return os.WriteFile(filepath.Join(dir, "session.json"), body, 0o644) } func cloneSession(session *Session) *Session { body, _ := json.Marshal(session) var copy Session _ = json.Unmarshal(body, ©) return © } func (s *sessionStore) createSession(input CreateSessionRequest) (*Session, error) { now := time.Now().UTC().Format(time.RFC3339) session := &Session{ ID: randomID(), UserID: strings.TrimSpace(input.UserID), Title: strings.TrimSpace(input.Title), Status: StatusCreated, ArchiveStatus: ArchiveIdle, PreviewStatus: PreviewIdle, Format: defaultString(input.Format, "webm"), MimeType: defaultString(input.MimeType, "video/webm"), QualityPreset: defaultString(input.QualityPreset, "balanced"), FacingMode: defaultString(input.FacingMode, "environment"), DeviceKind: defaultString(input.DeviceKind, "desktop"), CreatedAt: now, UpdatedAt: now, Segments: []SegmentMeta{}, Markers: []Marker{}, } s.mu.Lock() defer s.mu.Unlock() s.sessions[session.ID] = session if err := os.MkdirAll(s.segmentsDir(session.ID), 0o755); err != nil { return nil, err } if err := s.saveSession(session); err != nil { return nil, err } return cloneSession(session), nil } func (s *sessionStore) getSession(id string) (*Session, error) { s.mu.RLock() defer s.mu.RUnlock() session, ok := s.sessions[id] if !ok { return nil, errors.New("session not found") } return cloneSession(session), nil } func (s *sessionStore) replacePeer(id string, peer *webrtc.PeerConnection) { s.mu.Lock() defer s.mu.Unlock() if existing, ok := s.peers[id]; ok { _ = existing.Close() } s.peers[id] = peer } func (s *sessionStore) closePeer(id string) { s.mu.Lock() defer s.mu.Unlock() if existing, ok := s.peers[id]; ok { _ = existing.Close() delete(s.peers, id) } } func (s *sessionStore) updateSession(id string, update func(*Session) error) (*Session, error) { s.mu.Lock() defer s.mu.Unlock() session, ok := s.sessions[id] if !ok { return nil, errors.New("session not found") } if err := update(session); err != nil { return nil, err } session.recomputeAggregates() if err := s.saveSession(session); err != nil { return nil, err } return cloneSession(session), nil } func (s *sessionStore) listProcessableSessions() []*Session { s.mu.RLock() defer s.mu.RUnlock() items := make([]*Session, 0, len(s.sessions)) for _, session := range s.sessions { if len(session.Segments) == 0 { continue } if session.ArchiveStatus == ArchiveQueued || session.ArchiveStatus == ArchiveProcessing { items = append(items, cloneSession(session)) continue } if session.PreviewSegments < len(session.Segments) && session.PreviewStatus != PreviewProcessing { items = append(items, cloneSession(session)) } } return items } type mediaServer struct { store *sessionStore } func newMediaServer(store *sessionStore) *mediaServer { return &mediaServer{store: store} } func (m *mediaServer) refreshSessionsForRead() error { return m.store.refreshFromDisk() } func (m *mediaServer) routes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/media/health", m.handleHealth) mux.HandleFunc("/media/sessions", m.handleSessions) mux.HandleFunc("/media/sessions/", m.handleSession) fileServer := http.FileServer(http.Dir(m.store.public)) mux.Handle("/media/assets/", http.StripPrefix("/media/assets/", cacheControl(fileServer))) return withCORS(mux) } func (m *mediaServer) handleHealth(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{ "ok": true, "timestamp": time.Now().UTC().Format(time.RFC3339), }) } func (m *mediaServer) handleSessions(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.NotFound(w, r) return } var input CreateSessionRequest if err := json.NewDecoder(r.Body).Decode(&input); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } session, err := m.store.createSession(input) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusCreated, map[string]any{"session": session}) } func (m *mediaServer) handleSession(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/media/sessions/") parts := strings.Split(strings.Trim(path, "/"), "/") if len(parts) == 0 || parts[0] == "" { http.NotFound(w, r) return } sessionID := parts[0] if len(parts) == 1 && r.Method == http.MethodGet { if err := m.refreshSessionsForRead(); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } session, err := m.store.getSession(sessionID) if err != nil { writeError(w, http.StatusNotFound, err.Error()) return } writeJSON(w, http.StatusOK, map[string]any{"session": session}) return } if len(parts) < 2 { http.NotFound(w, r) return } switch parts[1] { case "signal": if r.Method != http.MethodPost { http.NotFound(w, r) return } m.handleSignal(sessionID, w, r) case "segments": if r.Method != http.MethodPost { http.NotFound(w, r) return } m.handleSegmentUpload(sessionID, w, r) case "markers": if r.Method != http.MethodPost { http.NotFound(w, r) return } m.handleMarker(sessionID, w, r) case "finalize": if r.Method != http.MethodPost { http.NotFound(w, r) return } m.handleFinalize(sessionID, w, r) case "playback": if r.Method != http.MethodGet { http.NotFound(w, r) return } if err := m.refreshSessionsForRead(); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } session, err := m.store.getSession(sessionID) if err != nil { writeError(w, http.StatusNotFound, err.Error()) return } writeJSON(w, http.StatusOK, map[string]any{"playback": session.Playback, "session": session}) default: http.NotFound(w, r) } } func (m *mediaServer) handleSignal(sessionID string, w http.ResponseWriter, r *http.Request) { var input SignalRequest if err := json.NewDecoder(r.Body).Decode(&input); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } session, err := m.store.getSession(sessionID) if err != nil { writeError(w, http.StatusNotFound, err.Error()) return } config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}, } peer, err := webrtc.NewPeerConnection(config) if err != nil { writeError(w, http.StatusInternalServerError, "failed to create peer connection") return } m.store.replacePeer(sessionID, peer) _, _ = peer.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionRecvonly, }) _, _ = peer.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionRecvonly, }) peer.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { _, _ = m.store.updateSession(sessionID, func(session *Session) error { session.StreamConnected = state == webrtc.PeerConnectionStateConnected session.LastStreamAt = time.Now().UTC().Format(time.RFC3339) switch state { case webrtc.PeerConnectionStateConnected: session.Status = StatusStreaming session.LastError = "" case webrtc.PeerConnectionStateDisconnected: session.Status = StatusReconnecting session.ReconnectCount++ case webrtc.PeerConnectionStateFailed: session.Status = StatusFailed session.LastError = "webrtc peer connection failed" case webrtc.PeerConnectionStateClosed: if session.Status != StatusArchived && session.Status != StatusFinalizing { session.StreamConnected = false } } return nil }) }) peer.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { _ = receiver go func() { buffer := make([]byte, 1600) for { if _, _, readErr := track.Read(buffer); readErr != nil { return } _, _ = m.store.updateSession(sessionID, func(session *Session) error { session.StreamConnected = true session.Status = StatusStreaming session.LastStreamAt = time.Now().UTC().Format(time.RFC3339) return nil }) } }() }) offer := webrtc.SessionDescription{ Type: parseSDPType(input.Type), SDP: input.SDP, } if err := peer.SetRemoteDescription(offer); err != nil { writeError(w, http.StatusBadRequest, "failed to set remote description") return } answer, err := peer.CreateAnswer(nil) if err != nil { writeError(w, http.StatusInternalServerError, "failed to create answer") return } gatherComplete := webrtc.GatheringCompletePromise(peer) if err := peer.SetLocalDescription(answer); err != nil { writeError(w, http.StatusInternalServerError, "failed to set local description") return } <-gatherComplete _, _ = m.store.updateSession(session.ID, func(current *Session) error { current.Status = StatusRecording current.StreamConnected = true current.LastStreamAt = time.Now().UTC().Format(time.RFC3339) return nil }) writeJSON(w, http.StatusOK, map[string]any{ "type": strings.ToLower(peer.LocalDescription().Type.String()), "sdp": peer.LocalDescription().SDP, }) } func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWriter, r *http.Request) { sequence, err := strconv.Atoi(r.URL.Query().Get("sequence")) if err != nil || sequence < 0 { writeError(w, http.StatusBadRequest, "invalid sequence") return } durationMS, _ := strconv.ParseInt(r.URL.Query().Get("durationMs"), 10, 64) contentType := r.Header.Get("Content-Type") extension := detectExtension(contentType) filename := fmt.Sprintf("%06d.%s", sequence, extension) segmentPath := filepath.Join(m.store.segmentsDir(sessionID), filename) if err := os.MkdirAll(m.store.segmentsDir(sessionID), 0o755); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } file, err := os.Create(segmentPath) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } defer file.Close() size, err := io.Copy(file, r.Body) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } session, err := m.store.updateSession(sessionID, func(session *Session) error { meta := SegmentMeta{ Sequence: sequence, Filename: filename, DurationMS: durationMS, SizeBytes: size, UploadedAt: time.Now().UTC().Format(time.RFC3339), ContentType: defaultString(contentType, "video/webm"), } found := false for index := range session.Segments { if session.Segments[index].Sequence == sequence { session.Segments[index] = meta found = true break } } if !found { session.Segments = append(session.Segments, meta) } sort.Slice(session.Segments, func(i, j int) bool { return session.Segments[i].Sequence < session.Segments[j].Sequence }) session.Status = StatusRecording session.LastError = "" return nil }) if err != nil { writeError(w, http.StatusNotFound, err.Error()) return } writeJSON(w, http.StatusAccepted, map[string]any{"session": session}) } func (m *mediaServer) handleMarker(sessionID string, w http.ResponseWriter, r *http.Request) { var input MarkerRequest if err := json.NewDecoder(r.Body).Decode(&input); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } session, err := m.store.updateSession(sessionID, func(session *Session) error { session.Markers = append(session.Markers, Marker{ ID: randomID(), Type: defaultString(input.Type, "manual"), Label: defaultString(input.Label, "标记点"), Timestamp: input.Timestamp, Confidence: input.Confidence, CreatedAt: time.Now().UTC().Format(time.RFC3339), }) sort.Slice(session.Markers, func(i, j int) bool { return session.Markers[i].Timestamp < session.Markers[j].Timestamp }) return nil }) if err != nil { writeError(w, http.StatusNotFound, err.Error()) return } writeJSON(w, http.StatusAccepted, map[string]any{"session": session}) } func (m *mediaServer) handleFinalize(sessionID string, w http.ResponseWriter, r *http.Request) { var input FinalizeRequest _ = json.NewDecoder(r.Body).Decode(&input) m.store.closePeer(sessionID) session, err := m.store.updateSession(sessionID, func(session *Session) error { session.Status = StatusFinalizing session.ArchiveStatus = ArchiveQueued session.FinalizedAt = time.Now().UTC().Format(time.RFC3339) if strings.TrimSpace(input.Title) != "" { session.Title = strings.TrimSpace(input.Title) } if input.DurationMS > 0 { session.DurationMS = input.DurationMS } session.StreamConnected = false return nil }) if err != nil { writeError(w, http.StatusNotFound, err.Error()) return } writeJSON(w, http.StatusAccepted, map[string]any{"session": session}) } func runWorkerLoop(ctx context.Context, store *sessionStore, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if err := store.refreshFromDisk(); err != nil { log.Printf("[worker] failed to refresh session store: %v", err) continue } sessions := store.listProcessableSessions() for _, session := range sessions { if err := processSession(store, session.ID); err != nil { log.Printf("[worker] failed to process session %s: %v", session.ID, err) } } } } } func processSession(store *sessionStore, sessionID string) error { current, err := store.getSession(sessionID) if err != nil { return err } if current.ArchiveStatus == ArchiveQueued || current.ArchiveStatus == ArchiveProcessing { return processFinalArchive(store, sessionID) } if current.PreviewSegments < len(current.Segments) { return processRollingPreview(store, sessionID) } return nil } func processRollingPreview(store *sessionStore, sessionID string) error { session, err := store.updateSession(sessionID, func(session *Session) error { if session.PreviewStatus == PreviewProcessing { return errors.New("preview already processing") } session.PreviewStatus = PreviewProcessing session.LastError = "" return nil }) if err != nil { if strings.Contains(err.Error(), "preview already processing") { return nil } return err } return buildPlaybackArtifacts(store, session, false) } func processFinalArchive(store *sessionStore, sessionID string) error { session, err := store.updateSession(sessionID, func(session *Session) error { if session.ArchiveStatus == ArchiveProcessing { return errors.New("already processing") } session.ArchiveStatus = ArchiveProcessing session.Status = StatusFinalizing session.LastError = "" return nil }) if err != nil { if strings.Contains(err.Error(), "already processing") { return nil } return err } if len(session.Segments) == 0 { _, _ = store.updateSession(sessionID, func(session *Session) error { session.ArchiveStatus = ArchiveFailed session.Status = StatusFailed session.LastError = "no uploaded segments found" return nil }) return errors.New("no uploaded segments found") } return buildPlaybackArtifacts(store, session, true) } func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool) error { sessionID := session.ID publicDir := store.publicDir(sessionID) if err := os.MkdirAll(publicDir, 0o755); err != nil { return err } baseName := "preview" if finalize { baseName = "recording" } outputWebM := filepath.Join(publicDir, baseName+".webm") outputMP4 := filepath.Join(publicDir, baseName+".mp4") listFile := filepath.Join(store.sessionDir(sessionID), "concat.txt") inputs := make([]string, 0, len(session.Segments)) sort.Slice(session.Segments, func(i, j int) bool { return session.Segments[i].Sequence < session.Segments[j].Sequence }) for _, segment := range session.Segments { inputs = append(inputs, filepath.Join(store.segmentsDir(sessionID), segment.Filename)) } if err := writeConcatList(listFile, inputs); err != nil { return markProcessingError(store, sessionID, err, finalize) } if len(inputs) == 1 { body, copyErr := os.ReadFile(inputs[0]) if copyErr != nil { return markProcessingError(store, sessionID, copyErr, finalize) } if writeErr := os.WriteFile(outputWebM, body, 0o644); writeErr != nil { return markProcessingError(store, sessionID, writeErr, finalize) } } else { copyErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c", "copy", outputWebM) if copyErr != nil { reencodeErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c:v", "libvpx-vp9", "-b:v", "1800k", "-c:a", "libopus", outputWebM) if reencodeErr != nil { return markProcessingError(store, sessionID, fmt.Errorf("concat failed: %w / %v", copyErr, reencodeErr), finalize) } } } mp4Err := runFFmpeg("-y", "-i", outputWebM, "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", "-c:a", "aac", "-movflags", "+faststart", outputMP4) if mp4Err != nil { log.Printf("[worker] mp4 archive generation failed for %s: %v", sessionID, mp4Err) } webmInfo, webmStatErr := os.Stat(outputWebM) if webmStatErr != nil { return markProcessingError(store, sessionID, webmStatErr, finalize) } var mp4Size int64 var mp4URL string if info, statErr := os.Stat(outputMP4); statErr == nil { mp4Size = info.Size() mp4URL = fmt.Sprintf("/media/assets/sessions/%s/recording.mp4", sessionID) } previewURL := fmt.Sprintf("/media/assets/sessions/%s/%s.webm", sessionID, baseName) if mp4URL != "" { previewURL = mp4URL } _, updateErr := store.updateSession(sessionID, func(session *Session) error { session.Playback.PreviewURL = previewURL session.PreviewSegments = len(inputs) session.PreviewUpdatedAt = time.Now().UTC().Format(time.RFC3339) session.PreviewStatus = PreviewReady session.LastError = "" if finalize { session.ArchiveStatus = ArchiveCompleted session.Status = StatusArchived session.Playback = PlaybackInfo{ WebMURL: fmt.Sprintf("/media/assets/sessions/%s/recording.webm", sessionID), MP4URL: mp4URL, WebMSize: webmInfo.Size(), MP4Size: mp4Size, Ready: true, PreviewURL: previewURL, } } return nil }) return updateErr } func markProcessingError(store *sessionStore, sessionID string, err error, finalize bool) error { _, _ = store.updateSession(sessionID, func(session *Session) error { session.PreviewStatus = PreviewFailed if finalize { session.ArchiveStatus = ArchiveFailed session.Status = StatusFailed } session.LastError = err.Error() return nil }) return err } func writeConcatList(path string, inputs []string) error { lines := make([]string, 0, len(inputs)) for _, input := range inputs { lines = append(lines, fmt.Sprintf("file '%s'", strings.ReplaceAll(input, "'", "'\\''"))) } return os.WriteFile(path, []byte(strings.Join(lines, "\n")), 0o644) } func runFFmpeg(args ...string) error { cmd := exec.Command("ffmpeg", args...) output, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("%w: %s", err, strings.TrimSpace(string(output))) } return nil } func parseSDPType(value string) webrtc.SDPType { switch strings.ToLower(value) { case "offer": return webrtc.SDPTypeOffer case "pranswer": return webrtc.SDPTypePranswer case "rollback": return webrtc.SDPTypeRollback default: return webrtc.SDPTypeOffer } } func detectExtension(contentType string) string { switch { case strings.Contains(contentType, "mp4"): return "mp4" case strings.Contains(contentType, "ogg"): return "ogg" default: return "webm" } } func defaultString(value string, fallback string) string { if strings.TrimSpace(value) == "" { return fallback } return strings.TrimSpace(value) } func randomID() string { buffer := make([]byte, 12) if _, err := rand.Read(buffer); err != nil { return strconv.FormatInt(time.Now().UnixNano(), 36) } return hex.EncodeToString(buffer) } func withCORS(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization,X-User-Id") if r.Method == http.MethodOptions { w.WriteHeader(http.StatusNoContent) return } next.ServeHTTP(w, r) }) } func cacheControl(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") next.ServeHTTP(w, r) }) } func writeJSON(w http.ResponseWriter, status int, body any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(body) } func writeError(w http.ResponseWriter, status int, message string) { writeJSON(w, status, map[string]string{"error": message}) } func main() { mode := defaultString(os.Getenv("MEDIA_MODE"), "serve") dataDir := defaultString(os.Getenv("MEDIA_DATA_DIR"), "./data/media") addr := defaultString(os.Getenv("MEDIA_ADDR"), ":8081") workerInterval := 3 * time.Second store, err := newSessionStore(dataDir) if err != nil { log.Fatalf("failed to create store: %v", err) } switch mode { case "worker": log.Printf("media worker running with data dir %s", dataDir) runWorkerLoop(context.Background(), store, workerInterval) default: server := newMediaServer(store) if os.Getenv("MEDIA_EMBEDDED_WORKER") != "0" { go runWorkerLoop(context.Background(), store, workerInterval) } log.Printf("media service listening on %s with data dir %s", addr, dataDir) if err := http.ListenAndServe(addr, server.routes()); err != nil { log.Fatal(err) } } }