package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"strconv"
"github.com/gorilla/websocket"
"github.com/pion/logging"
"github.com/pion/rtwatch/gst"
"github.com/pion/webrtc/v2"
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: checkOrigin,
}
peerConnectionConfig = webrtc.Configuration{}
webrtcApi = &webrtc.API{}
audioTrack = &webrtc.Track{}
videoTrack = &webrtc.Track{}
pipeline = &gst.Pipeline{}
)
type websocketMessage struct {
Event string `json:"event"`
Data string `json:"data"`
}
func main() {
containerPath := ""
httpListenAddress := ""
httpPath := ""
flag.StringVar(&containerPath, "container-path", "", "path to the media file you want to playback")
flag.StringVar(&httpListenAddress, "http-listen-address", ":8080", "address for HTTP server to listen on")
flag.StringVar(&httpPath, "http-path", "/rtwatch", "URI path prefix")
flag.Parse()
if containerPath == "" {
panic("-container-path must be specified")
}
log.Println("Initializing WebRTC PeerConnection")
lf := logging.NewDefaultLoggerFactory()
lf.DefaultLogLevel = logging.LogLevelDebug
se := webrtc.SettingEngine{
LoggerFactory: lf,
}
me := webrtc.MediaEngine{}
me.RegisterDefaultCodecs()
webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(me),webrtc.WithSettingEngine(se))
pc, err := webrtcApi.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
})
if err != nil {
log.Fatal(err)
}
log.Println("Initializing WebRTC tracks")
videoTrack, err = pc.NewTrack(webrtc.DefaultPayloadTypeVP8, 5000, "sync", "sync")
if err != nil {
log.Fatal(err)
}
audioTrack, err = pc.NewTrack(webrtc.DefaultPayloadTypeOpus, 5001, "sync", "sync")
if err != nil {
log.Fatal(err)
}
log.Println("Creating and starting pipeline")
pipeline = gst.CreatePipeline(containerPath, audioTrack, videoTrack)
pipeline.Start()
http.HandleFunc(httpPath + "/ws/", serveWs)
fmt.Println(fmt.Sprintf("Video file '%s' is now available on '%s', have fun!", containerPath, httpListenAddress))
log.Fatal(http.ListenAndServe(httpListenAddress, nil))
}
func handleWebsocketMessage(pc *webrtc.PeerConnection, ws *websocket.Conn, message *websocketMessage) error {
switch message.Event {
case "play":
pipeline.Play()
case "pause":
pipeline.Pause()
case "seek":
i, err := strconv.ParseInt(message.Data, 0, 64)
if err != nil {
log.Print(err)
}
pipeline.SeekToTime(i)
case "answer":
answer := webrtc.SessionDescription{}
if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
return err
}
if err := pc.SetRemoteDescription(answer); err != nil {
return err
}
}
return nil
}
func checkOrigin(r *http.Request) bool {
return true;
}
func serveWs(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Println(err)
}
return
}
defer ws.Close()
peerConnection, err := webrtcApi.NewPeerConnection(peerConnectionConfig)
if err != nil {
log.Print(err)
return
} else if _, err = peerConnection.AddTrack(audioTrack); err != nil {
log.Print(err)
return
} else if _, err = peerConnection.AddTrack(videoTrack); err != nil {
log.Print(err)
return
}
defer func() {
if err := peerConnection.Close(); err != nil {
log.Println(err)
}
}()
sdp, err := peerConnection.CreateOffer(nil)
if err != nil {
log.Println(err)
return
}
if err := peerConnection.SetLocalDescription(sdp); err != nil {
log.Println(err)
return
}
sdpData, err := json.Marshal(sdp)
if err != nil {
log.Print(err)
return
}
offerMsg := &websocketMessage{
Event: "offer",
Data: string(sdpData),
}
if err := ws.WriteJSON(offerMsg); err != nil {
log.Print(err)
return
}
message := &websocketMessage{}
for {
_, msg, err := ws.ReadMessage()
if err != nil {
break
} else if err := json.Unmarshal(msg, &message); err != nil {
log.Print(err)
return
}
if err := handleWebsocketMessage(peerConnection, ws, message); err != nil {
log.Print(err)
}
}
}