feat: add live camera multi-device viewer mode
这个提交包含在:
192
media/main.go
192
media/main.go
@@ -104,6 +104,7 @@ type Session struct {
|
||||
PreviewUpdatedAt string `json:"previewUpdatedAt,omitempty"`
|
||||
StreamConnected bool `json:"streamConnected"`
|
||||
LastStreamAt string `json:"lastStreamAt,omitempty"`
|
||||
ViewerCount int `json:"viewerCount"`
|
||||
Playback PlaybackInfo `json:"playback"`
|
||||
Segments []SegmentMeta `json:"segments"`
|
||||
Markers []Marker `json:"markers"`
|
||||
@@ -151,19 +152,23 @@ type FinalizeRequest struct {
|
||||
}
|
||||
|
||||
type sessionStore struct {
|
||||
rootDir string
|
||||
public string
|
||||
mu sync.RWMutex
|
||||
sessions map[string]*Session
|
||||
peers map[string]*webrtc.PeerConnection
|
||||
rootDir string
|
||||
public string
|
||||
mu sync.RWMutex
|
||||
sessions map[string]*Session
|
||||
peers map[string]*webrtc.PeerConnection
|
||||
viewerPeers map[string]map[string]*webrtc.PeerConnection
|
||||
videoTracks map[string]*webrtc.TrackLocalStaticRTP
|
||||
}
|
||||
|
||||
func newSessionStore(rootDir string) (*sessionStore, error) {
|
||||
store := &sessionStore{
|
||||
rootDir: rootDir,
|
||||
public: filepath.Join(rootDir, "public"),
|
||||
sessions: map[string]*Session{},
|
||||
peers: map[string]*webrtc.PeerConnection{},
|
||||
rootDir: rootDir,
|
||||
public: filepath.Join(rootDir, "public"),
|
||||
sessions: map[string]*Session{},
|
||||
peers: map[string]*webrtc.PeerConnection{},
|
||||
viewerPeers: map[string]map[string]*webrtc.PeerConnection{},
|
||||
videoTracks: map[string]*webrtc.TrackLocalStaticRTP{},
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Join(rootDir, "sessions"), 0o755); err != nil {
|
||||
return nil, err
|
||||
@@ -294,6 +299,42 @@ func (s *sessionStore) replacePeer(id string, peer *webrtc.PeerConnection) {
|
||||
s.peers[id] = peer
|
||||
}
|
||||
|
||||
func (s *sessionStore) replaceViewerPeer(sessionID string, viewerID string, peer *webrtc.PeerConnection) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if _, ok := s.viewerPeers[sessionID]; !ok {
|
||||
s.viewerPeers[sessionID] = map[string]*webrtc.PeerConnection{}
|
||||
}
|
||||
if existing, ok := s.viewerPeers[sessionID][viewerID]; ok {
|
||||
_ = existing.Close()
|
||||
}
|
||||
s.viewerPeers[sessionID][viewerID] = peer
|
||||
if session, ok := s.sessions[sessionID]; ok {
|
||||
session.ViewerCount = len(s.viewerPeers[sessionID])
|
||||
_ = s.saveSession(session)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sessionStore) removeViewerPeer(sessionID string, viewerID string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
viewers, ok := s.viewerPeers[sessionID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if existing, ok := viewers[viewerID]; ok {
|
||||
_ = existing.Close()
|
||||
delete(viewers, viewerID)
|
||||
}
|
||||
if len(viewers) == 0 {
|
||||
delete(s.viewerPeers, sessionID)
|
||||
}
|
||||
if session, ok := s.sessions[sessionID]; ok {
|
||||
session.ViewerCount = len(s.viewerPeers[sessionID])
|
||||
_ = s.saveSession(session)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sessionStore) closePeer(id string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@@ -301,6 +342,38 @@ func (s *sessionStore) closePeer(id string) {
|
||||
_ = existing.Close()
|
||||
delete(s.peers, id)
|
||||
}
|
||||
if viewers, ok := s.viewerPeers[id]; ok {
|
||||
for viewerID, peer := range viewers {
|
||||
_ = peer.Close()
|
||||
delete(viewers, viewerID)
|
||||
}
|
||||
delete(s.viewerPeers, id)
|
||||
}
|
||||
delete(s.videoTracks, id)
|
||||
if session, ok := s.sessions[id]; ok {
|
||||
session.ViewerCount = 0
|
||||
_ = s.saveSession(session)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sessionStore) getVideoTrack(sessionID string) *webrtc.TrackLocalStaticRTP {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.videoTracks[sessionID]
|
||||
}
|
||||
|
||||
func (s *sessionStore) ensureVideoTrack(sessionID string, codec webrtc.RTPCodecCapability) (*webrtc.TrackLocalStaticRTP, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if track, ok := s.videoTracks[sessionID]; ok {
|
||||
return track, nil
|
||||
}
|
||||
track, err := webrtc.NewTrackLocalStaticRTP(codec, "video", fmt.Sprintf("livecam-%s", sessionID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.videoTracks[sessionID] = track
|
||||
return track, nil
|
||||
}
|
||||
|
||||
func (s *sessionStore) updateSession(id string, update func(*Session) error) (*Session, error) {
|
||||
@@ -419,6 +492,12 @@ func (m *mediaServer) handleSession(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
m.handleSignal(sessionID, w, r)
|
||||
case "viewer-signal":
|
||||
if r.Method != http.MethodPost {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
m.handleViewerSignal(sessionID, w, r)
|
||||
case "segments":
|
||||
if r.Method != http.MethodPost {
|
||||
http.NotFound(w, r)
|
||||
@@ -509,12 +588,23 @@ func (m *mediaServer) handleSignal(sessionID string, w http.ResponseWriter, r *h
|
||||
|
||||
peer.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||
_ = receiver
|
||||
if track.Kind() != webrtc.RTPCodecTypeVideo {
|
||||
return
|
||||
}
|
||||
localTrack, trackErr := m.store.ensureVideoTrack(sessionID, track.Codec().RTPCodecCapability)
|
||||
if trackErr != nil {
|
||||
log.Printf("failed to create local viewer track for session %s: %v", sessionID, trackErr)
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
buffer := make([]byte, 1600)
|
||||
for {
|
||||
if _, _, readErr := track.Read(buffer); readErr != nil {
|
||||
packet, _, readErr := track.ReadRTP()
|
||||
if readErr != nil {
|
||||
return
|
||||
}
|
||||
if writeErr := localTrack.WriteRTP(packet); writeErr != nil && !errors.Is(writeErr, io.ErrClosedPipe) {
|
||||
log.Printf("failed to fan out RTP packet for session %s: %v", sessionID, writeErr)
|
||||
}
|
||||
_, _ = m.store.updateSession(sessionID, func(session *Session) error {
|
||||
session.StreamConnected = true
|
||||
session.Status = StatusStreaming
|
||||
@@ -556,6 +646,86 @@ func (m *mediaServer) handleSignal(sessionID string, w http.ResponseWriter, r *h
|
||||
})
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleViewerSignal(sessionID string, w http.ResponseWriter, r *http.Request) {
|
||||
var input SignalRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
if _, err := m.store.getSession(sessionID); err != nil {
|
||||
writeError(w, http.StatusNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
localTrack := m.store.getVideoTrack(sessionID)
|
||||
if localTrack == nil {
|
||||
writeError(w, http.StatusConflict, "viewer stream not ready")
|
||||
return
|
||||
}
|
||||
|
||||
config := webrtc.Configuration{
|
||||
ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}},
|
||||
}
|
||||
peer, err := webrtc.NewPeerConnection(config)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to create viewer peer connection")
|
||||
return
|
||||
}
|
||||
viewerID := randomID()
|
||||
m.store.replaceViewerPeer(sessionID, viewerID, peer)
|
||||
|
||||
sender, err := peer.AddTrack(localTrack)
|
||||
if err != nil {
|
||||
m.store.removeViewerPeer(sessionID, viewerID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to add viewer track")
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
rtcpBuf := make([]byte, 1500)
|
||||
for {
|
||||
if _, _, readErr := sender.Read(rtcpBuf); readErr != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
peer.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||
switch state {
|
||||
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
|
||||
m.store.removeViewerPeer(sessionID, viewerID)
|
||||
}
|
||||
})
|
||||
|
||||
offer := webrtc.SessionDescription{
|
||||
Type: parseSDPType(input.Type),
|
||||
SDP: input.SDP,
|
||||
}
|
||||
if err := peer.SetRemoteDescription(offer); err != nil {
|
||||
m.store.removeViewerPeer(sessionID, viewerID)
|
||||
writeError(w, http.StatusBadRequest, "failed to set remote description")
|
||||
return
|
||||
}
|
||||
|
||||
answer, err := peer.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
m.store.removeViewerPeer(sessionID, viewerID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to create viewer answer")
|
||||
return
|
||||
}
|
||||
gatherComplete := webrtc.GatheringCompletePromise(peer)
|
||||
if err := peer.SetLocalDescription(answer); err != nil {
|
||||
m.store.removeViewerPeer(sessionID, viewerID)
|
||||
writeError(w, http.StatusInternalServerError, "failed to set viewer local description")
|
||||
return
|
||||
}
|
||||
<-gatherComplete
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"viewerId": viewerID,
|
||||
"type": strings.ToLower(peer.LocalDescription().Type.String()),
|
||||
"sdp": peer.LocalDescription().SDP,
|
||||
})
|
||||
}
|
||||
|
||||
func (m *mediaServer) handleSegmentUpload(sessionID string, w http.ResponseWriter, r *http.Request) {
|
||||
sequence, err := strconv.Atoi(r.URL.Query().Get("sequence"))
|
||||
if err != nil || sequence < 0 {
|
||||
|
||||
@@ -256,3 +256,25 @@ func TestHandleSessionGetRefreshesSessionStateFromDisk(t *testing.T) {
|
||||
t.Fatalf("expected playback ready after refresh")
|
||||
}
|
||||
}
|
||||
|
||||
func TestViewerSignalReturnsConflictBeforePublisherTrackReady(t *testing.T) {
|
||||
store, err := newSessionStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("newSessionStore: %v", err)
|
||||
}
|
||||
|
||||
server := newMediaServer(store)
|
||||
session, err := store.createSession(CreateSessionRequest{UserID: "1", Title: "Viewer Pending"})
|
||||
if err != nil {
|
||||
t.Fatalf("createSession: %v", err)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/media/sessions/"+session.ID+"/viewer-signal", strings.NewReader(`{"type":"offer","sdp":"mock-offer"}`))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
res := httptest.NewRecorder()
|
||||
server.routes().ServeHTTP(res, req)
|
||||
|
||||
if res.Code != http.StatusConflict {
|
||||
t.Fatalf("expected viewer-signal 409 before video track is ready, got %d", res.Code)
|
||||
}
|
||||
}
|
||||
|
||||
在新工单中引用
屏蔽一个用户