From 1d5310c38c93766b294afdd380e6fb05170aaad5 Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Fri, 20 Sep 2019 00:31:15 -0700 Subject: Initial commit --- go.mod | 8 +++ go.sum | 102 ++++++++++++++++++++++++++ gst/gst.c | 110 ++++++++++++++++++++++++++++ gst/gst.go | 96 +++++++++++++++++++++++++ gst/gst.h | 20 ++++++ main.go | 236 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 572 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 gst/gst.c create mode 100644 gst/gst.go create mode 100644 gst/gst.h create mode 100644 main.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d300919 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/pion/rtwatch + +go 1.13 + +require ( + github.com/gorilla/websocket v1.4.1 + github.com/pion/webrtc/v2 v2.1.4 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..cb395e8 --- /dev/null +++ b/go.sum @@ -0,0 +1,102 @@ +github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= +github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk= +github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 h1:tbuodUh2vuhOVZAdW3NEUvosFHUMJwUNl7jk/VSEiwc= +github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw= +github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA= +github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pion/datachannel v1.4.6 h1:ALIwApJIxXOxA142PwJ7abvHwVdicVMqjkQNZC1WxLU= +github.com/pion/datachannel v1.4.6/go.mod h1:k02QlG7ZByXzavnjWj3gP3W1474H4ifzxSx2Y0Q5kvc= +github.com/pion/dtls v1.5.1 h1:LcCs1l9fzsHC4y+ENjLyuxOAe+k0DV65T2n4tjwM7xw= +github.com/pion/dtls v1.5.1/go.mod h1:CjlPLfQdsTg3G4AEXjJp8FY5bRweBlxHrgoFrN+fQsk= +github.com/pion/ice v0.5.15 h1:wy9C4JICwKZpQm+VX4WAnr2yVO6tNsqCWg87OOZWbmk= +github.com/pion/ice v0.5.15/go.mod h1:8of6pKdMoT9/TNLYdv+UdjSF4DvLUN65mzhhfmQDg5g= +github.com/pion/logging v0.2.1/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/mdns v0.0.3 h1:DxdOYd0pgwLKiDlIIxfU0qdG5iWh1Xn6CsS9vc6cMAY= +github.com/pion/mdns v0.0.3/go.mod h1:VrN3wefVgtfL8QgpEblPUC46ag1reLIfpqekCnKunLE= +github.com/pion/quic v0.1.1 h1:D951FV+TOqI9A0rTF7tHx0Loooqz+nyzjEyj8o3PuMA= +github.com/pion/quic v0.1.1/go.mod h1:zEU51v7ru8Mp4AUBJvj6psrSth5eEFNnVQK5K48oV3k= +github.com/pion/rtcp v1.2.1 h1:S3yG4KpYAiSmBVqKAfgRa5JdwBNj4zK3RLUa8JYdhak= +github.com/pion/rtcp v1.2.1/go.mod h1:a5dj2d6BKIKHl43EnAOIrCczcjESrtPuMgfmL6/K6QM= +github.com/pion/rtp v1.1.3 h1:GTYSTsSLF5vH+UqShGYQEBdoYasWjTTC9UeYglnUO+o= +github.com/pion/rtp v1.1.3/go.mod h1:/l4cvcKd0D3u9JLs2xSVI95YkfXW87a3br3nqmVtSlE= +github.com/pion/sctp v1.6.10 h1:7odhpD08TTc84a/OXwMAFEtOKka+HkD//Ze+sD8qcxU= +github.com/pion/sctp v1.6.10/go.mod h1:cCqpLdYvgEUdl715+qbWtgT439CuQrAgy8BZTp0aEfA= +github.com/pion/sdp/v2 v2.3.0 h1:5EhwPh1xKWYYjjvMuubHoMLy6M0B9U26Hh7q3f7vEGk= +github.com/pion/sdp/v2 v2.3.0/go.mod h1:idSlWxhfWQDtTy9J05cgxpHBu/POwXN2VDRGYxT/EjU= +github.com/pion/srtp v1.2.6 h1:mHQuAMh0P67R7/j1F260u3O+fbRWLyjKLRPZYYvODFM= +github.com/pion/srtp v1.2.6/go.mod h1:rd8imc5htjfs99XiEoOjLMEOcVjME63UHx9Ek9IGst0= +github.com/pion/stun v0.3.2 h1:Vsy6C+bTbJKEC2TH4vHYOnRKmozNPi5FpeKv9/bX16k= +github.com/pion/stun v0.3.2/go.mod h1:xrCld6XM+6GWDZdvjPlLMsTU21rNxnO6UO8XsAvHr/M= +github.com/pion/transport v0.6.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE= +github.com/pion/transport v0.7.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE= +github.com/pion/transport v0.8.6/go.mod h1:nAmRRnn+ArVtsoNuwktvAD+jrjSD7pA+H3iRmZwdUno= +github.com/pion/transport v0.8.7 h1:t7uYhWOoljd82rnkLH+H2Lw7/IGA5kV9Bl5sWrmcYSc= +github.com/pion/transport v0.8.7/go.mod h1:lpeSM6KJFejVtZf8k0fgeN7zE73APQpTF83WvA1FVP8= +github.com/pion/turn v1.3.6 h1:N49o5g3SRI5g5Sg8WVft1RDAFxIHOi5roHHdbC39g7g= +github.com/pion/turn v1.3.6/go.mod h1:D8XaX/CVKLkRozV9baRvhZmwOyKAJCTHrj23DdwMI4g= +github.com/pion/webrtc/v2 v2.1.4 h1:QTJwVMTqv8HO9jRIzWbwlZWwiBgPy7MIsQrJ3Lxbwjc= +github.com/pion/webrtc/v2 v2.1.4/go.mod h1:u+jSZHnB0YH48VDm121vA2zDt7yancBpfoDmzor7qdY= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190912160710-24e19bdeb0f2 h1:4dVFTC832rPn4pomLSz1vA+are2+dU19w1H8OngV7nc= +golang.org/x/net v0.0.0-20190912160710-24e19bdeb0f2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190912141932-bc967efca4b8 h1:41hwlulw1prEMBxLQSlMSux1zxJf07B3WPsdjJlKZxE= +golang.org/x/sys v0.0.0-20190912141932-bc967efca4b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/gst/gst.c b/gst/gst.c new file mode 100644 index 0000000..e84316e --- /dev/null +++ b/gst/gst.c @@ -0,0 +1,110 @@ +#include "gst.h" + +#include + +typedef struct SampleHandlerUserData { + int pipelineId; +} SampleHandlerUserData; + +GMainLoop *gstreamer_send_main_loop = NULL; +void gstreamer_send_start_mainloop(void) { + gstreamer_send_main_loop = g_main_loop_new(NULL, FALSE); + + g_main_loop_run(gstreamer_send_main_loop); +} + +static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) { + GstElement *pipeline = GST_ELEMENT(data); + + switch (GST_MESSAGE_TYPE(msg)) { + case GST_MESSAGE_EOS: + if (!gst_element_seek (pipeline, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SKIP, + GST_SEEK_TYPE_SET, 0, + GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) { + g_print ("EOS restart failed\n"); + exit(1); + } + break; + + case GST_MESSAGE_ERROR: { + gchar *debug; + GError *error; + + gst_message_parse_error(msg, &error, &debug); + g_free(debug); + + g_printerr("GStreamer Error: %s\n", error->message); + g_error_free(error); + exit(1); + } + default: + break; + } + + return TRUE; +} + +GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) { + GstSample *sample = NULL; + GstBuffer *buffer = NULL; + gpointer copy = NULL; + gsize copy_size = 0; + int *isVideo = (int *) user_data; + + g_signal_emit_by_name (object, "pull-sample", &sample); + if (sample) { + buffer = gst_sample_get_buffer(sample); + if (buffer) { + gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), ©, ©_size); + goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), *isVideo); + } + gst_sample_unref (sample); + } + + return GST_FLOW_OK; +} + +GstElement *gstreamer_send_create_pipeline(char *pipeline) { + gst_init(NULL, NULL); + GError *error = NULL; + return gst_parse_launch(pipeline, &error); +} + +void gstreamer_send_start_pipeline(GstElement *pipeline) { + GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, gstreamer_send_bus_call, pipeline); + gst_object_unref(bus); + + GstElement *audio = gst_bin_get_by_name(GST_BIN(pipeline), "audio"), + *video = gst_bin_get_by_name(GST_BIN(pipeline), "video"); + + int *isAudio = malloc(sizeof(int)), + *isVideo = malloc(sizeof(int)); + + *isVideo = 1; + *isAudio = 0; + + g_object_set(audio, "emit-signals", TRUE, NULL); + g_signal_connect(audio, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), isAudio); + + g_object_set(video, "emit-signals", TRUE, NULL); + g_signal_connect(video, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), isVideo); + + gstreamer_send_play_pipeline(pipeline); +} + +void gstreamer_send_play_pipeline(GstElement *pipeline) { + gst_element_set_state(pipeline, GST_STATE_PLAYING); +} + +void gstreamer_send_pause_pipeline(GstElement *pipeline) { + gst_element_set_state(pipeline, GST_STATE_PAUSED); +} + +void gstreamer_send_seek(GstElement *pipeline, int64_t seek_pos) { + if (!gst_element_seek (pipeline, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SKIP, + GST_SEEK_TYPE_SET, seek_pos * GST_SECOND, + GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) { + g_print ("Seek failed!\n"); + } +} diff --git a/gst/gst.go b/gst/gst.go new file mode 100644 index 0000000..f8a5385 --- /dev/null +++ b/gst/gst.go @@ -0,0 +1,96 @@ +package gst + +/* +#cgo pkg-config: gstreamer-1.0 gstreamer-app-1.0 + +#include "gst.h" + +*/ +import "C" +import ( + "fmt" + "io" + "sync" + "unsafe" + + "github.com/pion/webrtc/v2" + "github.com/pion/webrtc/v2/pkg/media" +) + +func init() { + go C.gstreamer_send_start_mainloop() +} + +// Pipeline is a wrapper for a GStreamer Pipeline +type Pipeline struct { + Pipeline *C.GstElement + audioTrack *webrtc.Track + videoTrack *webrtc.Track +} + +var pipeline = &Pipeline{} +var pipelinesLock sync.Mutex + +// CreatePipeline creates a GStreamer Pipeline +func CreatePipeline(containerPath string, audioTrack, videoTrack *webrtc.Track) *Pipeline { + pipelineStr := fmt.Sprintf("filesrc location=\"%s\" ! decodebin name=demux ! queue ! x264enc bframes=0 speed-preset=veryfast key-int-max=60 ! video/x-h264,stream-format=byte-stream ! appsink name=video demux. ! queue ! audioconvert ! audioresample ! opusenc ! appsink name=audio", containerPath) + + pipelineStrUnsafe := C.CString(pipelineStr) + defer C.free(unsafe.Pointer(pipelineStrUnsafe)) + + pipelinesLock.Lock() + defer pipelinesLock.Unlock() + pipeline = &Pipeline{ + Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe), + audioTrack: audioTrack, + videoTrack: videoTrack, + } + return pipeline +} + +// Start starts the GStreamer Pipeline +func (p *Pipeline) Start() { + // This will signal to goHandlePipelineBuffer + // and provide a method for cancelling sends. + C.gstreamer_send_start_pipeline(p.Pipeline) +} + +// Play sets the pipeline to PLAYING +func (p *Pipeline) Play() { + C.gstreamer_send_play_pipeline(p.Pipeline) +} + +// Pause sets the pipeline to PAUSED +func (p *Pipeline) Pause() { + C.gstreamer_send_pause_pipeline(p.Pipeline) +} + +// SeekToTime seeks on the pipeline +func (p *Pipeline) SeekToTime(seekPos int64) { + C.gstreamer_send_seek(p.Pipeline, C.int64_t(seekPos)) +} + +const ( + videoClockRate = 90000 + audioClockRate = 48000 +) + +//export goHandlePipelineBuffer +func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, isVideo C.int) { + var track *webrtc.Track + var samples uint32 + + if isVideo == 1 { + samples = uint32(videoClockRate * (float32(duration) / 1000000000)) + track = pipeline.videoTrack + } else { + samples = uint32(audioClockRate * (float32(duration) / 1000000000)) + track = pipeline.audioTrack + } + + if err := track.WriteSample(media.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}); err != nil && err != io.ErrClosedPipe { + panic(err) + } + + C.free(buffer) +} diff --git a/gst/gst.h b/gst/gst.h new file mode 100644 index 0000000..1103a6a --- /dev/null +++ b/gst/gst.h @@ -0,0 +1,20 @@ +#ifndef GST_H +#define GST_H + +#include +#include +#include +#include + +extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int isVideo); + +GstElement *gstreamer_send_create_pipeline(char *pipeline); +void gstreamer_send_start_pipeline(GstElement *pipeline); + +void gstreamer_send_play_pipeline(GstElement *pipeline); +void gstreamer_send_pause_pipeline(GstElement *pipeline); +void gstreamer_send_seek(GstElement *pipeline, int64_t seek_pos); + +void gstreamer_send_start_mainloop(void); + +#endif diff --git a/main.go b/main.go new file mode 100644 index 0000000..8e6befc --- /dev/null +++ b/main.go @@ -0,0 +1,236 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "net/http" + "strconv" + + "github.com/gorilla/websocket" + "github.com/pion/rtwatch/gst" + "github.com/pion/webrtc/v2" +) + +const homeHTML = ` + + + synced-playback + + + + +
+ + + + +
+ + + + +` + +var ( + upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + + peerConnectionConfig = webrtc.Configuration{} + + audioTrack = &webrtc.Track{} + videoTrack = &webrtc.Track{} + pipeline = &gst.Pipeline{} +) + +type websocketMessage struct { + Event string `json:"event"` + Data string `json:"data"` +} + +func main() { + containerPath := "" + httpListenAddress := "" + flag.StringVar(&containerPath, "container-path", "", "path to the media file you want to playback") + flag.StringVar(&httpListenAddress, "http-listen-address", ":8080", "address for HTTP server to listen on") + flag.Parse() + + if containerPath == "" { + panic("-container-path must be specified") + } + + pc, err := webrtc.NewPeerConnection(webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + }) + if err != nil { + log.Fatal(err) + } + + videoTrack, err = pc.NewTrack(webrtc.DefaultPayloadTypeH264, 5000, "synced-video", "synced-video") + if err != nil { + log.Fatal(err) + } + + audioTrack, err = pc.NewTrack(webrtc.DefaultPayloadTypeOpus, 5001, "synced-video", "synced-video") + if err != nil { + log.Fatal(err) + } + + pipeline = gst.CreatePipeline(containerPath, audioTrack, videoTrack) + pipeline.Start() + + http.HandleFunc("/", serveHome) + http.HandleFunc("/ws", serveWs) + + fmt.Println(fmt.Sprintf("Video file '%s' is now available on '%s', have fun!", containerPath, httpListenAddress)) + log.Fatal(http.ListenAndServe(httpListenAddress, nil)) +} + +func handleWebsocketMessage(pc *webrtc.PeerConnection, ws *websocket.Conn, message *websocketMessage) error { + switch message.Event { + case "play": + pipeline.Play() + case "pause": + pipeline.Pause() + case "seek": + i, err := strconv.ParseInt(message.Data, 0, 64) + if err != nil { + log.Print(err) + } + pipeline.SeekToTime(i) + case "offer": + offer := webrtc.SessionDescription{} + if err := json.Unmarshal([]byte(message.Data), &offer); err != nil { + return err + } + + if err := pc.SetRemoteDescription(offer); err != nil { + return err + } + + answer, err := pc.CreateAnswer(nil) + if err != nil { + return err + } + if err := pc.SetLocalDescription(answer); err != nil { + return err + } + + answerString, err := json.Marshal(answer) + if err != nil { + return err + } + + if err = ws.WriteJSON(&websocketMessage{ + Event: "answer", + Data: string(answerString), + }); err != nil { + return err + } + } + return nil +} + +func serveWs(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + if _, ok := err.(websocket.HandshakeError); !ok { + log.Println(err) + } + return + } + + peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig) + if err != nil { + log.Print(err) + return + } else if _, err = peerConnection.AddTrack(audioTrack); err != nil { + log.Print(err) + return + } else if _, err = peerConnection.AddTrack(videoTrack); err != nil { + log.Print(err) + return + } + + defer func() { + if err := peerConnection.Close(); err != nil { + log.Println(err) + } + }() + + message := &websocketMessage{} + for { + _, msg, err := ws.ReadMessage() + if err != nil { + break + } else if err := json.Unmarshal(msg, &message); err != nil { + log.Print(err) + return + } + + if err := handleWebsocketMessage(peerConnection, ws, message); err != nil { + log.Print(err) + } + } +} + +func serveHome(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + fmt.Fprintf(w, homeHTML) +} -- cgit v1.2.3