文件
2026-03-15 17:30:19 +08:00

970 行
28 KiB
Go

package main
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/pion/webrtc/v4"
)
type SessionStatus string
const (
StatusCreated SessionStatus = "created"
StatusRecording SessionStatus = "recording"
StatusStreaming SessionStatus = "streaming"
StatusReconnecting SessionStatus = "reconnecting"
StatusFinalizing SessionStatus = "finalizing"
StatusArchived SessionStatus = "archived"
StatusFailed SessionStatus = "failed"
)
type ArchiveStatus string
const (
ArchiveIdle ArchiveStatus = "idle"
ArchiveQueued ArchiveStatus = "queued"
ArchiveProcessing ArchiveStatus = "processing"
ArchiveCompleted ArchiveStatus = "completed"
ArchiveFailed ArchiveStatus = "failed"
)
type PreviewStatus string
const (
PreviewIdle PreviewStatus = "idle"
PreviewProcessing PreviewStatus = "processing"
PreviewReady PreviewStatus = "ready"
PreviewFailed PreviewStatus = "failed"
)
type PlaybackInfo struct {
WebMURL string `json:"webmUrl,omitempty"`
MP4URL string `json:"mp4Url,omitempty"`
WebMSize int64 `json:"webmSize,omitempty"`
MP4Size int64 `json:"mp4Size,omitempty"`
Ready bool `json:"ready"`
PreviewURL string `json:"previewUrl,omitempty"`
}
type SegmentMeta struct {
Sequence int `json:"sequence"`
Filename string `json:"filename"`
DurationMS int64 `json:"durationMs"`
SizeBytes int64 `json:"sizeBytes"`
UploadedAt string `json:"uploadedAt"`
ContentType string `json:"contentType"`
}
type Marker struct {
ID string `json:"id"`
Type string `json:"type"`
Label string `json:"label"`
Timestamp int64 `json:"timestampMs"`
Confidence float64 `json:"confidence,omitempty"`
CreatedAt string `json:"createdAt"`
}
type Session struct {
ID string `json:"id"`
UserID string `json:"userId"`
Title string `json:"title"`
Status SessionStatus `json:"status"`
ArchiveStatus ArchiveStatus `json:"archiveStatus"`
PreviewStatus PreviewStatus `json:"previewStatus"`
Format string `json:"format"`
MimeType string `json:"mimeType"`
QualityPreset string `json:"qualityPreset"`
FacingMode string `json:"facingMode"`
DeviceKind string `json:"deviceKind"`
ReconnectCount int `json:"reconnectCount"`
UploadedSegments int `json:"uploadedSegments"`
UploadedBytes int64 `json:"uploadedBytes"`
PreviewSegments int `json:"previewSegments"`
DurationMS int64 `json:"durationMs"`
LastError string `json:"lastError,omitempty"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
FinalizedAt string `json:"finalizedAt,omitempty"`
PreviewUpdatedAt string `json:"previewUpdatedAt,omitempty"`
StreamConnected bool `json:"streamConnected"`
LastStreamAt string `json:"lastStreamAt,omitempty"`
Playback PlaybackInfo `json:"playback"`
Segments []SegmentMeta `json:"segments"`
Markers []Marker `json:"markers"`
}
func (s *Session) recomputeAggregates() {
s.UploadedSegments = len(s.Segments)
var totalBytes int64
var totalDuration int64
for _, segment := range s.Segments {
totalBytes += segment.SizeBytes
totalDuration += segment.DurationMS
}
s.UploadedBytes = totalBytes
if totalDuration > 0 {
s.DurationMS = totalDuration
}
}
type CreateSessionRequest struct {
UserID string `json:"userId"`
Title string `json:"title"`
Format string `json:"format"`
MimeType string `json:"mimeType"`
QualityPreset string `json:"qualityPreset"`
FacingMode string `json:"facingMode"`
DeviceKind string `json:"deviceKind"`
}
type SignalRequest struct {
SDP string `json:"sdp"`
Type string `json:"type"`
}
type MarkerRequest struct {
Type string `json:"type"`
Label string `json:"label"`
Timestamp int64 `json:"timestampMs"`
Confidence float64 `json:"confidence,omitempty"`
}
type FinalizeRequest struct {
Title string `json:"title"`
DurationMS int64 `json:"durationMs"`
}
type sessionStore struct {
rootDir string
public string
mu sync.RWMutex
sessions map[string]*Session
peers map[string]*webrtc.PeerConnection
}
func newSessionStore(rootDir string) (*sessionStore, error) {
store := &sessionStore{
rootDir: rootDir,
public: filepath.Join(rootDir, "public"),
sessions: map[string]*Session{},
peers: map[string]*webrtc.PeerConnection{},
}
if err := os.MkdirAll(filepath.Join(rootDir, "sessions"), 0o755); err != nil {
return nil, err
}
if err := os.MkdirAll(store.public, 0o755); err != nil {
return nil, err
}
if err := store.refreshFromDisk(); err != nil {
return nil, err
}
for _, session := range store.sessions {
session.recomputeAggregates()
}
return store, nil
}
func (s *sessionStore) loadSessionsFromDisk() (map[string]*Session, error) {
pattern := filepath.Join(s.rootDir, "sessions", "*", "session.json")
files, err := filepath.Glob(pattern)
if err != nil {
return nil, err
}
sessions := make(map[string]*Session, len(files))
for _, file := range files {
body, readErr := os.ReadFile(file)
if readErr != nil {
continue
}
var session Session
if unmarshalErr := json.Unmarshal(body, &session); unmarshalErr != nil {
continue
}
sessions[session.ID] = &session
}
return sessions, nil
}
func (s *sessionStore) refreshFromDisk() error {
sessions, err := s.loadSessionsFromDisk()
if err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
s.sessions = sessions
return nil
}
func (s *sessionStore) sessionDir(id string) string {
return filepath.Join(s.rootDir, "sessions", id)
}
func (s *sessionStore) segmentsDir(id string) string {
return filepath.Join(s.sessionDir(id), "segments")
}
func (s *sessionStore) publicDir(id string) string {
return filepath.Join(s.public, "sessions", id)
}
func (s *sessionStore) saveSession(session *Session) error {
session.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
dir := s.sessionDir(session.ID)
if err := os.MkdirAll(dir, 0o755); err != nil {
return err
}
body, err := json.MarshalIndent(session, "", " ")
if err != nil {
return err
}
return os.WriteFile(filepath.Join(dir, "session.json"), body, 0o644)
}
func cloneSession(session *Session) *Session {
body, _ := json.Marshal(session)
var copy Session
_ = json.Unmarshal(body, &copy)
return &copy
}
func (s *sessionStore) createSession(input CreateSessionRequest) (*Session, error) {
now := time.Now().UTC().Format(time.RFC3339)
session := &Session{
ID: randomID(),
UserID: strings.TrimSpace(input.UserID),
Title: strings.TrimSpace(input.Title),
Status: StatusCreated,
ArchiveStatus: ArchiveIdle,
PreviewStatus: PreviewIdle,
Format: defaultString(input.Format, "webm"),
MimeType: defaultString(input.MimeType, "video/webm"),
QualityPreset: defaultString(input.QualityPreset, "balanced"),
FacingMode: defaultString(input.FacingMode, "environment"),
DeviceKind: defaultString(input.DeviceKind, "desktop"),
CreatedAt: now,
UpdatedAt: now,
Segments: []SegmentMeta{},
Markers: []Marker{},
}
s.mu.Lock()
defer s.mu.Unlock()
s.sessions[session.ID] = session
if err := os.MkdirAll(s.segmentsDir(session.ID), 0o755); err != nil {
return nil, err
}
if err := s.saveSession(session); err != nil {
return nil, err
}
return cloneSession(session), nil
}
func (s *sessionStore) getSession(id string) (*Session, error) {
s.mu.RLock()
defer s.mu.RUnlock()
session, ok := s.sessions[id]
if !ok {
return nil, errors.New("session not found")
}
return cloneSession(session), nil
}
func (s *sessionStore) replacePeer(id string, peer *webrtc.PeerConnection) {
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.peers[id]; ok {
_ = existing.Close()
}
s.peers[id] = peer
}
func (s *sessionStore) closePeer(id string) {
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.peers[id]; ok {
_ = existing.Close()
delete(s.peers, id)
}
}
func (s *sessionStore) updateSession(id string, update func(*Session) error) (*Session, error) {
s.mu.Lock()
defer s.mu.Unlock()
session, ok := s.sessions[id]
if !ok {
return nil, errors.New("session not found")
}
if err := update(session); err != nil {
return nil, err
}
session.recomputeAggregates()
if err := s.saveSession(session); err != nil {
return nil, err
}
return cloneSession(session), nil
}
func (s *sessionStore) listProcessableSessions() []*Session {
s.mu.RLock()
defer s.mu.RUnlock()
items := make([]*Session, 0, len(s.sessions))
for _, session := range s.sessions {
if len(session.Segments) == 0 {
continue
}
if session.ArchiveStatus == ArchiveQueued || session.ArchiveStatus == ArchiveProcessing {
items = append(items, cloneSession(session))
continue
}
if session.PreviewSegments < len(session.Segments) && session.PreviewStatus != PreviewProcessing {
items = append(items, cloneSession(session))
}
}
return items
}
type mediaServer struct {
store *sessionStore
}
func newMediaServer(store *sessionStore) *mediaServer {
return &mediaServer{store: store}
}
func (m *mediaServer) refreshSessionsForRead() error {
return m.store.refreshFromDisk()
}
func (m *mediaServer) routes() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/media/health", m.handleHealth)
mux.HandleFunc("/media/sessions", m.handleSessions)
mux.HandleFunc("/media/sessions/", m.handleSession)
fileServer := http.FileServer(http.Dir(m.store.public))
mux.Handle("/media/assets/", http.StripPrefix("/media/assets/", cacheControl(fileServer)))
return withCORS(mux)
}
func (m *mediaServer) handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"timestamp": time.Now().UTC().Format(time.RFC3339),
})
}
func (m *mediaServer) handleSessions(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
var input CreateSessionRequest
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
session, err := m.store.createSession(input)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{"session": session})
}
func (m *mediaServer) handleSession(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/media/sessions/")
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) == 0 || parts[0] == "" {
http.NotFound(w, r)
return
}
sessionID := parts[0]
if len(parts) == 1 && r.Method == http.MethodGet {
if err := m.refreshSessionsForRead(); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
session, err := m.store.getSession(sessionID)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"session": session})
return
}
if len(parts) < 2 {
http.NotFound(w, r)
return
}
switch parts[1] {
case "signal":
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
m.handleSignal(sessionID, w, r)
case "segments":
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
m.handleSegmentUpload(sessionID, w, r)
case "markers":
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
m.handleMarker(sessionID, w, r)
case "finalize":
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
m.handleFinalize(sessionID, w, r)
case "playback":
if r.Method != http.MethodGet {
http.NotFound(w, r)
return
}
if err := m.refreshSessionsForRead(); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
session, err := m.store.getSession(sessionID)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"playback": session.Playback, "session": session})
default:
http.NotFound(w, r)
}
}
func (m *mediaServer) handleSignal(sessionID string, w http.ResponseWriter, r *http.Request) {
var input SignalRequest
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
session, err := m.store.getSession(sessionID)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}},
}
peer, err := webrtc.NewPeerConnection(config)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create peer connection")
return
}
m.store.replacePeer(sessionID, peer)
_, _ = peer.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
_, _ = peer.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
peer.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
_, _ = m.store.updateSession(sessionID, func(session *Session) error {
session.StreamConnected = state == webrtc.PeerConnectionStateConnected
session.LastStreamAt = time.Now().UTC().Format(time.RFC3339)
switch state {
case webrtc.PeerConnectionStateConnected:
session.Status = StatusStreaming
session.LastError = ""
case webrtc.PeerConnectionStateDisconnected:
session.Status = StatusReconnecting
session.ReconnectCount++
case webrtc.PeerConnectionStateFailed:
session.Status = StatusFailed
session.LastError = "webrtc peer connection failed"
case webrtc.PeerConnectionStateClosed:
if session.Status != StatusArchived && session.Status != StatusFinalizing {
session.StreamConnected = false
}
}
return nil
})
})
peer.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
_ = receiver
go func() {
buffer := make([]byte, 1600)
for {
if _, _, readErr := track.Read(buffer); readErr != nil {
return
}
_, _ = m.store.updateSession(sessionID, func(session *Session) error {
session.StreamConnected = true
session.Status = StatusStreaming
session.LastStreamAt = time.Now().UTC().Format(time.RFC3339)
return nil
})
}
}()
})
offer := webrtc.SessionDescription{
Type: parseSDPType(input.Type),
SDP: input.SDP,
}
if err := peer.SetRemoteDescription(offer); err != nil {
writeError(w, http.StatusBadRequest, "failed to set remote description")
return
}
answer, err := peer.CreateAnswer(nil)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create answer")
return
}
gatherComplete := webrtc.GatheringCompletePromise(peer)
if err := peer.SetLocalDescription(answer); err != nil {
writeError(w, http.StatusInternalServerError, "failed to set local description")
return
}
<-gatherComplete
_, _ = m.store.updateSession(session.ID, func(current *Session) error {
current.Status = StatusRecording
current.StreamConnected = true
current.LastStreamAt = time.Now().UTC().Format(time.RFC3339)
return nil
})
writeJSON(w, http.StatusOK, map[string]any{
"type": strings.ToLower(peer.LocalDescription().Type.String()),
"sdp": peer.LocalDescription().SDP,
})
}
func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWriter, r *http.Request) {
sequence, err := strconv.Atoi(r.URL.Query().Get("sequence"))
if err != nil || sequence < 0 {
writeError(w, http.StatusBadRequest, "invalid sequence")
return
}
durationMS, _ := strconv.ParseInt(r.URL.Query().Get("durationMs"), 10, 64)
contentType := r.Header.Get("Content-Type")
extension := detectExtension(contentType)
filename := fmt.Sprintf("%06d.%s", sequence, extension)
segmentPath := filepath.Join(m.store.segmentsDir(sessionID), filename)
if err := os.MkdirAll(m.store.segmentsDir(sessionID), 0o755); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
file, err := os.Create(segmentPath)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
defer file.Close()
size, err := io.Copy(file, r.Body)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
session, err := m.store.updateSession(sessionID, func(session *Session) error {
meta := SegmentMeta{
Sequence: sequence,
Filename: filename,
DurationMS: durationMS,
SizeBytes: size,
UploadedAt: time.Now().UTC().Format(time.RFC3339),
ContentType: defaultString(contentType, "video/webm"),
}
found := false
for index := range session.Segments {
if session.Segments[index].Sequence == sequence {
session.Segments[index] = meta
found = true
break
}
}
if !found {
session.Segments = append(session.Segments, meta)
}
sort.Slice(session.Segments, func(i, j int) bool {
return session.Segments[i].Sequence < session.Segments[j].Sequence
})
session.Status = StatusRecording
session.LastError = ""
return nil
})
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
writeJSON(w, http.StatusAccepted, map[string]any{"session": session})
}
func (m *mediaServer) handleMarker(sessionID string, w http.ResponseWriter, r *http.Request) {
var input MarkerRequest
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
session, err := m.store.updateSession(sessionID, func(session *Session) error {
session.Markers = append(session.Markers, Marker{
ID: randomID(),
Type: defaultString(input.Type, "manual"),
Label: defaultString(input.Label, "标记点"),
Timestamp: input.Timestamp,
Confidence: input.Confidence,
CreatedAt: time.Now().UTC().Format(time.RFC3339),
})
sort.Slice(session.Markers, func(i, j int) bool {
return session.Markers[i].Timestamp < session.Markers[j].Timestamp
})
return nil
})
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
writeJSON(w, http.StatusAccepted, map[string]any{"session": session})
}
func (m *mediaServer) handleFinalize(sessionID string, w http.ResponseWriter, r *http.Request) {
var input FinalizeRequest
_ = json.NewDecoder(r.Body).Decode(&input)
m.store.closePeer(sessionID)
session, err := m.store.updateSession(sessionID, func(session *Session) error {
session.Status = StatusFinalizing
session.ArchiveStatus = ArchiveQueued
session.FinalizedAt = time.Now().UTC().Format(time.RFC3339)
if strings.TrimSpace(input.Title) != "" {
session.Title = strings.TrimSpace(input.Title)
}
if input.DurationMS > 0 {
session.DurationMS = input.DurationMS
}
session.StreamConnected = false
return nil
})
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
writeJSON(w, http.StatusAccepted, map[string]any{"session": session})
}
func runWorkerLoop(ctx context.Context, store *sessionStore, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := store.refreshFromDisk(); err != nil {
log.Printf("[worker] failed to refresh session store: %v", err)
continue
}
sessions := store.listProcessableSessions()
for _, session := range sessions {
if err := processSession(store, session.ID); err != nil {
log.Printf("[worker] failed to process session %s: %v", session.ID, err)
}
}
}
}
}
func processSession(store *sessionStore, sessionID string) error {
current, err := store.getSession(sessionID)
if err != nil {
return err
}
if current.ArchiveStatus == ArchiveQueued || current.ArchiveStatus == ArchiveProcessing {
return processFinalArchive(store, sessionID)
}
if current.PreviewSegments < len(current.Segments) {
return processRollingPreview(store, sessionID)
}
return nil
}
func processRollingPreview(store *sessionStore, sessionID string) error {
session, err := store.updateSession(sessionID, func(session *Session) error {
if session.PreviewStatus == PreviewProcessing {
return errors.New("preview already processing")
}
session.PreviewStatus = PreviewProcessing
session.LastError = ""
return nil
})
if err != nil {
if strings.Contains(err.Error(), "preview already processing") {
return nil
}
return err
}
return buildPlaybackArtifacts(store, session, false)
}
func processFinalArchive(store *sessionStore, sessionID string) error {
session, err := store.updateSession(sessionID, func(session *Session) error {
if session.ArchiveStatus == ArchiveProcessing {
return errors.New("already processing")
}
session.ArchiveStatus = ArchiveProcessing
session.Status = StatusFinalizing
session.LastError = ""
return nil
})
if err != nil {
if strings.Contains(err.Error(), "already processing") {
return nil
}
return err
}
if len(session.Segments) == 0 {
_, _ = store.updateSession(sessionID, func(session *Session) error {
session.ArchiveStatus = ArchiveFailed
session.Status = StatusFailed
session.LastError = "no uploaded segments found"
return nil
})
return errors.New("no uploaded segments found")
}
return buildPlaybackArtifacts(store, session, true)
}
func buildPlaybackArtifacts(store *sessionStore, session *Session, finalize bool) error {
sessionID := session.ID
publicDir := store.publicDir(sessionID)
if err := os.MkdirAll(publicDir, 0o755); err != nil {
return err
}
baseName := "preview"
if finalize {
baseName = "recording"
}
outputWebM := filepath.Join(publicDir, baseName+".webm")
outputMP4 := filepath.Join(publicDir, baseName+".mp4")
listFile := filepath.Join(store.sessionDir(sessionID), "concat.txt")
inputs := make([]string, 0, len(session.Segments))
sort.Slice(session.Segments, func(i, j int) bool {
return session.Segments[i].Sequence < session.Segments[j].Sequence
})
for _, segment := range session.Segments {
inputs = append(inputs, filepath.Join(store.segmentsDir(sessionID), segment.Filename))
}
if err := writeConcatList(listFile, inputs); err != nil {
return markProcessingError(store, sessionID, err, finalize)
}
if len(inputs) == 1 {
body, copyErr := os.ReadFile(inputs[0])
if copyErr != nil {
return markProcessingError(store, sessionID, copyErr, finalize)
}
if writeErr := os.WriteFile(outputWebM, body, 0o644); writeErr != nil {
return markProcessingError(store, sessionID, writeErr, finalize)
}
} else {
copyErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c", "copy", outputWebM)
if copyErr != nil {
reencodeErr := runFFmpeg("-y", "-f", "concat", "-safe", "0", "-i", listFile, "-c:v", "libvpx-vp9", "-b:v", "1800k", "-c:a", "libopus", outputWebM)
if reencodeErr != nil {
return markProcessingError(store, sessionID, fmt.Errorf("concat failed: %w / %v", copyErr, reencodeErr), finalize)
}
}
}
mp4Err := runFFmpeg("-y", "-i", outputWebM, "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", "-c:a", "aac", "-movflags", "+faststart", outputMP4)
if mp4Err != nil {
log.Printf("[worker] mp4 archive generation failed for %s: %v", sessionID, mp4Err)
}
webmInfo, webmStatErr := os.Stat(outputWebM)
if webmStatErr != nil {
return markProcessingError(store, sessionID, webmStatErr, finalize)
}
var mp4Size int64
var mp4URL string
if info, statErr := os.Stat(outputMP4); statErr == nil {
mp4Size = info.Size()
mp4URL = fmt.Sprintf("/media/assets/sessions/%s/recording.mp4", sessionID)
}
previewURL := fmt.Sprintf("/media/assets/sessions/%s/%s.webm", sessionID, baseName)
if mp4URL != "" {
previewURL = mp4URL
}
_, updateErr := store.updateSession(sessionID, func(session *Session) error {
session.Playback.PreviewURL = previewURL
session.PreviewSegments = len(inputs)
session.PreviewUpdatedAt = time.Now().UTC().Format(time.RFC3339)
session.PreviewStatus = PreviewReady
session.LastError = ""
if finalize {
session.ArchiveStatus = ArchiveCompleted
session.Status = StatusArchived
session.Playback = PlaybackInfo{
WebMURL: fmt.Sprintf("/media/assets/sessions/%s/recording.webm", sessionID),
MP4URL: mp4URL,
WebMSize: webmInfo.Size(),
MP4Size: mp4Size,
Ready: true,
PreviewURL: previewURL,
}
}
return nil
})
return updateErr
}
func markProcessingError(store *sessionStore, sessionID string, err error, finalize bool) error {
_, _ = store.updateSession(sessionID, func(session *Session) error {
session.PreviewStatus = PreviewFailed
if finalize {
session.ArchiveStatus = ArchiveFailed
session.Status = StatusFailed
}
session.LastError = err.Error()
return nil
})
return err
}
func writeConcatList(path string, inputs []string) error {
lines := make([]string, 0, len(inputs))
for _, input := range inputs {
lines = append(lines, fmt.Sprintf("file '%s'", strings.ReplaceAll(input, "'", "'\\''")))
}
return os.WriteFile(path, []byte(strings.Join(lines, "\n")), 0o644)
}
func runFFmpeg(args ...string) error {
cmd := exec.Command("ffmpeg", args...)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%w: %s", err, strings.TrimSpace(string(output)))
}
return nil
}
func parseSDPType(value string) webrtc.SDPType {
switch strings.ToLower(value) {
case "offer":
return webrtc.SDPTypeOffer
case "pranswer":
return webrtc.SDPTypePranswer
case "rollback":
return webrtc.SDPTypeRollback
default:
return webrtc.SDPTypeOffer
}
}
func detectExtension(contentType string) string {
switch {
case strings.Contains(contentType, "mp4"):
return "mp4"
case strings.Contains(contentType, "ogg"):
return "ogg"
default:
return "webm"
}
}
func defaultString(value string, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return strings.TrimSpace(value)
}
func randomID() string {
buffer := make([]byte, 12)
if _, err := rand.Read(buffer); err != nil {
return strconv.FormatInt(time.Now().UnixNano(), 36)
}
return hex.EncodeToString(buffer)
}
func withCORS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization,X-User-Id")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
func cacheControl(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "public, max-age=31536000, immutable")
next.ServeHTTP(w, r)
})
}
func writeJSON(w http.ResponseWriter, status int, body any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(body)
}
func writeError(w http.ResponseWriter, status int, message string) {
writeJSON(w, status, map[string]string{"error": message})
}
func main() {
mode := defaultString(os.Getenv("MEDIA_MODE"), "serve")
dataDir := defaultString(os.Getenv("MEDIA_DATA_DIR"), "./data/media")
addr := defaultString(os.Getenv("MEDIA_ADDR"), ":8081")
workerInterval := 3 * time.Second
store, err := newSessionStore(dataDir)
if err != nil {
log.Fatalf("failed to create store: %v", err)
}
switch mode {
case "worker":
log.Printf("media worker running with data dir %s", dataDir)
runWorkerLoop(context.Background(), store, workerInterval)
default:
server := newMediaServer(store)
if os.Getenv("MEDIA_EMBEDDED_WORKER") != "0" {
go runWorkerLoop(context.Background(), store, workerInterval)
}
log.Printf("media service listening on %s with data dir %s", addr, dataDir)
if err := http.ListenAndServe(addr, server.routes()); err != nil {
log.Fatal(err)
}
}
}