aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean DuBois <seaduboi@amazon.com>2019-09-20 00:31:15 -0700
committerSean DuBois <seaduboi@amazon.com>2019-09-20 00:31:15 -0700
commit1d5310c38c93766b294afdd380e6fb05170aaad5 (patch)
treec86c66cbdee6b3049d5e0d080896ffbc7f97f29b
downloadrtwatch-1d5310c38c93766b294afdd380e6fb05170aaad5.tar.gz
rtwatch-1d5310c38c93766b294afdd380e6fb05170aaad5.tar.bz2
rtwatch-1d5310c38c93766b294afdd380e6fb05170aaad5.zip
Initial commit
-rw-r--r--go.mod8
-rw-r--r--go.sum102
-rw-r--r--gst/gst.c110
-rw-r--r--gst/gst.go96
-rw-r--r--gst/gst.h20
-rw-r--r--main.go236
6 files changed, 572 insertions, 0 deletions
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..d300919
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,8 @@
+module github.com/pion/rtwatch
+
+go 1.13
+
+require (
+ github.com/gorilla/websocket v1.4.1
+ github.com/pion/webrtc/v2 v2.1.4
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..cb395e8
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,102 @@
+github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
+github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
+github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
+github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 h1:tbuodUh2vuhOVZAdW3NEUvosFHUMJwUNl7jk/VSEiwc=
+github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw=
+github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA=
+github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
+github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
+github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/pion/datachannel v1.4.6 h1:ALIwApJIxXOxA142PwJ7abvHwVdicVMqjkQNZC1WxLU=
+github.com/pion/datachannel v1.4.6/go.mod h1:k02QlG7ZByXzavnjWj3gP3W1474H4ifzxSx2Y0Q5kvc=
+github.com/pion/dtls v1.5.1 h1:LcCs1l9fzsHC4y+ENjLyuxOAe+k0DV65T2n4tjwM7xw=
+github.com/pion/dtls v1.5.1/go.mod h1:CjlPLfQdsTg3G4AEXjJp8FY5bRweBlxHrgoFrN+fQsk=
+github.com/pion/ice v0.5.15 h1:wy9C4JICwKZpQm+VX4WAnr2yVO6tNsqCWg87OOZWbmk=
+github.com/pion/ice v0.5.15/go.mod h1:8of6pKdMoT9/TNLYdv+UdjSF4DvLUN65mzhhfmQDg5g=
+github.com/pion/logging v0.2.1/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
+github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
+github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
+github.com/pion/mdns v0.0.3 h1:DxdOYd0pgwLKiDlIIxfU0qdG5iWh1Xn6CsS9vc6cMAY=
+github.com/pion/mdns v0.0.3/go.mod h1:VrN3wefVgtfL8QgpEblPUC46ag1reLIfpqekCnKunLE=
+github.com/pion/quic v0.1.1 h1:D951FV+TOqI9A0rTF7tHx0Loooqz+nyzjEyj8o3PuMA=
+github.com/pion/quic v0.1.1/go.mod h1:zEU51v7ru8Mp4AUBJvj6psrSth5eEFNnVQK5K48oV3k=
+github.com/pion/rtcp v1.2.1 h1:S3yG4KpYAiSmBVqKAfgRa5JdwBNj4zK3RLUa8JYdhak=
+github.com/pion/rtcp v1.2.1/go.mod h1:a5dj2d6BKIKHl43EnAOIrCczcjESrtPuMgfmL6/K6QM=
+github.com/pion/rtp v1.1.3 h1:GTYSTsSLF5vH+UqShGYQEBdoYasWjTTC9UeYglnUO+o=
+github.com/pion/rtp v1.1.3/go.mod h1:/l4cvcKd0D3u9JLs2xSVI95YkfXW87a3br3nqmVtSlE=
+github.com/pion/sctp v1.6.10 h1:7odhpD08TTc84a/OXwMAFEtOKka+HkD//Ze+sD8qcxU=
+github.com/pion/sctp v1.6.10/go.mod h1:cCqpLdYvgEUdl715+qbWtgT439CuQrAgy8BZTp0aEfA=
+github.com/pion/sdp/v2 v2.3.0 h1:5EhwPh1xKWYYjjvMuubHoMLy6M0B9U26Hh7q3f7vEGk=
+github.com/pion/sdp/v2 v2.3.0/go.mod h1:idSlWxhfWQDtTy9J05cgxpHBu/POwXN2VDRGYxT/EjU=
+github.com/pion/srtp v1.2.6 h1:mHQuAMh0P67R7/j1F260u3O+fbRWLyjKLRPZYYvODFM=
+github.com/pion/srtp v1.2.6/go.mod h1:rd8imc5htjfs99XiEoOjLMEOcVjME63UHx9Ek9IGst0=
+github.com/pion/stun v0.3.2 h1:Vsy6C+bTbJKEC2TH4vHYOnRKmozNPi5FpeKv9/bX16k=
+github.com/pion/stun v0.3.2/go.mod h1:xrCld6XM+6GWDZdvjPlLMsTU21rNxnO6UO8XsAvHr/M=
+github.com/pion/transport v0.6.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE=
+github.com/pion/transport v0.7.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE=
+github.com/pion/transport v0.8.6/go.mod h1:nAmRRnn+ArVtsoNuwktvAD+jrjSD7pA+H3iRmZwdUno=
+github.com/pion/transport v0.8.7 h1:t7uYhWOoljd82rnkLH+H2Lw7/IGA5kV9Bl5sWrmcYSc=
+github.com/pion/transport v0.8.7/go.mod h1:lpeSM6KJFejVtZf8k0fgeN7zE73APQpTF83WvA1FVP8=
+github.com/pion/turn v1.3.6 h1:N49o5g3SRI5g5Sg8WVft1RDAFxIHOi5roHHdbC39g7g=
+github.com/pion/turn v1.3.6/go.mod h1:D8XaX/CVKLkRozV9baRvhZmwOyKAJCTHrj23DdwMI4g=
+github.com/pion/webrtc/v2 v2.1.4 h1:QTJwVMTqv8HO9jRIzWbwlZWwiBgPy7MIsQrJ3Lxbwjc=
+github.com/pion/webrtc/v2 v2.1.4/go.mod h1:u+jSZHnB0YH48VDm121vA2zDt7yancBpfoDmzor7qdY=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE=
+golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190912160710-24e19bdeb0f2 h1:4dVFTC832rPn4pomLSz1vA+are2+dU19w1H8OngV7nc=
+golang.org/x/net v0.0.0-20190912160710-24e19bdeb0f2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190912141932-bc967efca4b8 h1:41hwlulw1prEMBxLQSlMSux1zxJf07B3WPsdjJlKZxE=
+golang.org/x/sys v0.0.0-20190912141932-bc967efca4b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/gst/gst.c b/gst/gst.c
new file mode 100644
index 0000000..e84316e
--- /dev/null
+++ b/gst/gst.c
@@ -0,0 +1,110 @@
+#include "gst.h"
+
+#include <gst/app/gstappsrc.h>
+
+typedef struct SampleHandlerUserData {
+ int pipelineId;
+} SampleHandlerUserData;
+
+GMainLoop *gstreamer_send_main_loop = NULL;
+void gstreamer_send_start_mainloop(void) {
+ gstreamer_send_main_loop = g_main_loop_new(NULL, FALSE);
+
+ g_main_loop_run(gstreamer_send_main_loop);
+}
+
+static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) {
+ GstElement *pipeline = GST_ELEMENT(data);
+
+ switch (GST_MESSAGE_TYPE(msg)) {
+ case GST_MESSAGE_EOS:
+ if (!gst_element_seek (pipeline, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SKIP,
+ GST_SEEK_TYPE_SET, 0,
+ GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) {
+ g_print ("EOS restart failed\n");
+ exit(1);
+ }
+ break;
+
+ case GST_MESSAGE_ERROR: {
+ gchar *debug;
+ GError *error;
+
+ gst_message_parse_error(msg, &error, &debug);
+ g_free(debug);
+
+ g_printerr("GStreamer Error: %s\n", error->message);
+ g_error_free(error);
+ exit(1);
+ }
+ default:
+ break;
+ }
+
+ return TRUE;
+}
+
+GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) {
+ GstSample *sample = NULL;
+ GstBuffer *buffer = NULL;
+ gpointer copy = NULL;
+ gsize copy_size = 0;
+ int *isVideo = (int *) user_data;
+
+ g_signal_emit_by_name (object, "pull-sample", &sample);
+ if (sample) {
+ buffer = gst_sample_get_buffer(sample);
+ if (buffer) {
+ gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), &copy, &copy_size);
+ goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), *isVideo);
+ }
+ gst_sample_unref (sample);
+ }
+
+ return GST_FLOW_OK;
+}
+
+GstElement *gstreamer_send_create_pipeline(char *pipeline) {
+ gst_init(NULL, NULL);
+ GError *error = NULL;
+ return gst_parse_launch(pipeline, &error);
+}
+
+void gstreamer_send_start_pipeline(GstElement *pipeline) {
+ GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
+ gst_bus_add_watch(bus, gstreamer_send_bus_call, pipeline);
+ gst_object_unref(bus);
+
+ GstElement *audio = gst_bin_get_by_name(GST_BIN(pipeline), "audio"),
+ *video = gst_bin_get_by_name(GST_BIN(pipeline), "video");
+
+ int *isAudio = malloc(sizeof(int)),
+ *isVideo = malloc(sizeof(int));
+
+ *isVideo = 1;
+ *isAudio = 0;
+
+ g_object_set(audio, "emit-signals", TRUE, NULL);
+ g_signal_connect(audio, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), isAudio);
+
+ g_object_set(video, "emit-signals", TRUE, NULL);
+ g_signal_connect(video, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), isVideo);
+
+ gstreamer_send_play_pipeline(pipeline);
+}
+
+void gstreamer_send_play_pipeline(GstElement *pipeline) {
+ gst_element_set_state(pipeline, GST_STATE_PLAYING);
+}
+
+void gstreamer_send_pause_pipeline(GstElement *pipeline) {
+ gst_element_set_state(pipeline, GST_STATE_PAUSED);
+}
+
+void gstreamer_send_seek(GstElement *pipeline, int64_t seek_pos) {
+ if (!gst_element_seek (pipeline, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SKIP,
+ GST_SEEK_TYPE_SET, seek_pos * GST_SECOND,
+ GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) {
+ g_print ("Seek failed!\n");
+ }
+}
diff --git a/gst/gst.go b/gst/gst.go
new file mode 100644
index 0000000..f8a5385
--- /dev/null
+++ b/gst/gst.go
@@ -0,0 +1,96 @@
+package gst
+
+/*
+#cgo pkg-config: gstreamer-1.0 gstreamer-app-1.0
+
+#include "gst.h"
+
+*/
+import "C"
+import (
+ "fmt"
+ "io"
+ "sync"
+ "unsafe"
+
+ "github.com/pion/webrtc/v2"
+ "github.com/pion/webrtc/v2/pkg/media"
+)
+
+func init() {
+ go C.gstreamer_send_start_mainloop()
+}
+
+// Pipeline is a wrapper for a GStreamer Pipeline
+type Pipeline struct {
+ Pipeline *C.GstElement
+ audioTrack *webrtc.Track
+ videoTrack *webrtc.Track
+}
+
+var pipeline = &Pipeline{}
+var pipelinesLock sync.Mutex
+
+// CreatePipeline creates a GStreamer Pipeline
+func CreatePipeline(containerPath string, audioTrack, videoTrack *webrtc.Track) *Pipeline {
+ pipelineStr := fmt.Sprintf("filesrc location=\"%s\" ! decodebin name=demux ! queue ! x264enc bframes=0 speed-preset=veryfast key-int-max=60 ! video/x-h264,stream-format=byte-stream ! appsink name=video demux. ! queue ! audioconvert ! audioresample ! opusenc ! appsink name=audio", containerPath)
+
+ pipelineStrUnsafe := C.CString(pipelineStr)
+ defer C.free(unsafe.Pointer(pipelineStrUnsafe))
+
+ pipelinesLock.Lock()
+ defer pipelinesLock.Unlock()
+ pipeline = &Pipeline{
+ Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
+ audioTrack: audioTrack,
+ videoTrack: videoTrack,
+ }
+ return pipeline
+}
+
+// Start starts the GStreamer Pipeline
+func (p *Pipeline) Start() {
+ // This will signal to goHandlePipelineBuffer
+ // and provide a method for cancelling sends.
+ C.gstreamer_send_start_pipeline(p.Pipeline)
+}
+
+// Play sets the pipeline to PLAYING
+func (p *Pipeline) Play() {
+ C.gstreamer_send_play_pipeline(p.Pipeline)
+}
+
+// Pause sets the pipeline to PAUSED
+func (p *Pipeline) Pause() {
+ C.gstreamer_send_pause_pipeline(p.Pipeline)
+}
+
+// SeekToTime seeks on the pipeline
+func (p *Pipeline) SeekToTime(seekPos int64) {
+ C.gstreamer_send_seek(p.Pipeline, C.int64_t(seekPos))
+}
+
+const (
+ videoClockRate = 90000
+ audioClockRate = 48000
+)
+
+//export goHandlePipelineBuffer
+func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, isVideo C.int) {
+ var track *webrtc.Track
+ var samples uint32
+
+ if isVideo == 1 {
+ samples = uint32(videoClockRate * (float32(duration) / 1000000000))
+ track = pipeline.videoTrack
+ } else {
+ samples = uint32(audioClockRate * (float32(duration) / 1000000000))
+ track = pipeline.audioTrack
+ }
+
+ if err := track.WriteSample(media.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}); err != nil && err != io.ErrClosedPipe {
+ panic(err)
+ }
+
+ C.free(buffer)
+}
diff --git a/gst/gst.h b/gst/gst.h
new file mode 100644
index 0000000..1103a6a
--- /dev/null
+++ b/gst/gst.h
@@ -0,0 +1,20 @@
+#ifndef GST_H
+#define GST_H
+
+#include <glib.h>
+#include <gst/gst.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int isVideo);
+
+GstElement *gstreamer_send_create_pipeline(char *pipeline);
+void gstreamer_send_start_pipeline(GstElement *pipeline);
+
+void gstreamer_send_play_pipeline(GstElement *pipeline);
+void gstreamer_send_pause_pipeline(GstElement *pipeline);
+void gstreamer_send_seek(GstElement *pipeline, int64_t seek_pos);
+
+void gstreamer_send_start_mainloop(void);
+
+#endif
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..8e6befc
--- /dev/null
+++ b/main.go
@@ -0,0 +1,236 @@
+package main
+
+import (
+ "encoding/json"
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "strconv"
+
+ "github.com/gorilla/websocket"
+ "github.com/pion/rtwatch/gst"
+ "github.com/pion/webrtc/v2"
+)
+
+const homeHTML = `<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>synced-playback</title>
+ </head>
+ <body id="body">
+ <video id="video1" autoplay playsinline></video>
+
+ <div>
+ <input type="number" id="seekTime" value="30">
+ <button type="button" onClick="seekClick()">Seek</button>
+ <button type="button" onClick="playClick()">Play</button>
+ <button type="button" onClick="pauseClick()">Pause</button>
+ </div>
+
+ <script>
+ let conn = new WebSocket('ws://' + window.location.host + '/ws')
+ let pc = new RTCPeerConnection()
+
+ window.seekClick = () => {
+ conn.send(JSON.stringify({event: 'seek', data: document.getElementById('seekTime').value}))
+ }
+ window.playClick = () => {
+ conn.send(JSON.stringify({event: 'play', data: ''}))
+ }
+ window.pauseClick = () => {
+ conn.send(JSON.stringify({event: 'pause', data: ''}))
+ }
+
+ pc.ontrack = function (event) {
+ if (event.track.kind === 'audio') {
+ return
+ }
+ var el = document.getElementById('video1')
+ el.srcObject = event.streams[0]
+ el.autoplay = true
+ el.controls = true
+ }
+
+ conn.onopen = () => {
+ pc.createOffer({offerToReceiveVideo: true, offerToReceiveAudio: true}).then(offer => {
+ pc.setLocalDescription(offer)
+ conn.send(JSON.stringify({event: 'offer', data: JSON.stringify(offer)}))
+ })
+ }
+ conn.onclose = evt => {
+ console.log('Connection closed')
+ }
+ conn.onmessage = evt => {
+ let msg = JSON.parse(evt.data)
+ if (!msg) {
+ return console.log('failed to parse msg')
+ }
+
+ switch (msg.event) {
+ case 'answer':
+ answer = JSON.parse(msg.data)
+ if (!answer) {
+ return console.log('failed to parse answer')
+ }
+ pc.setRemoteDescription(answer)
+ }
+ }
+ window.conn = conn
+ </script>
+ </body>
+</html>
+`
+
+var (
+ upgrader = websocket.Upgrader{
+ ReadBufferSize: 1024,
+ WriteBufferSize: 1024,
+ }
+
+ peerConnectionConfig = webrtc.Configuration{}
+
+ audioTrack = &webrtc.Track{}
+ videoTrack = &webrtc.Track{}
+ pipeline = &gst.Pipeline{}
+)
+
+type websocketMessage struct {
+ Event string `json:"event"`
+ Data string `json:"data"`
+}
+
+func main() {
+ containerPath := ""
+ httpListenAddress := ""
+ flag.StringVar(&containerPath, "container-path", "", "path to the media file you want to playback")
+ flag.StringVar(&httpListenAddress, "http-listen-address", ":8080", "address for HTTP server to listen on")
+ flag.Parse()
+
+ if containerPath == "" {
+ panic("-container-path must be specified")
+ }
+
+ pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
+ ICEServers: []webrtc.ICEServer{
+ {
+ URLs: []string{"stun:stun.l.google.com:19302"},
+ },
+ },
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ videoTrack, err = pc.NewTrack(webrtc.DefaultPayloadTypeH264, 5000, "synced-video", "synced-video")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ audioTrack, err = pc.NewTrack(webrtc.DefaultPayloadTypeOpus, 5001, "synced-video", "synced-video")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ pipeline = gst.CreatePipeline(containerPath, audioTrack, videoTrack)
+ pipeline.Start()
+
+ http.HandleFunc("/", serveHome)
+ http.HandleFunc("/ws", serveWs)
+
+ fmt.Println(fmt.Sprintf("Video file '%s' is now available on '%s', have fun!", containerPath, httpListenAddress))
+ log.Fatal(http.ListenAndServe(httpListenAddress, nil))
+}
+
+func handleWebsocketMessage(pc *webrtc.PeerConnection, ws *websocket.Conn, message *websocketMessage) error {
+ switch message.Event {
+ case "play":
+ pipeline.Play()
+ case "pause":
+ pipeline.Pause()
+ case "seek":
+ i, err := strconv.ParseInt(message.Data, 0, 64)
+ if err != nil {
+ log.Print(err)
+ }
+ pipeline.SeekToTime(i)
+ case "offer":
+ offer := webrtc.SessionDescription{}
+ if err := json.Unmarshal([]byte(message.Data), &offer); err != nil {
+ return err
+ }
+
+ if err := pc.SetRemoteDescription(offer); err != nil {
+ return err
+ }
+
+ answer, err := pc.CreateAnswer(nil)
+ if err != nil {
+ return err
+ }
+ if err := pc.SetLocalDescription(answer); err != nil {
+ return err
+ }
+
+ answerString, err := json.Marshal(answer)
+ if err != nil {
+ return err
+ }
+
+ if err = ws.WriteJSON(&websocketMessage{
+ Event: "answer",
+ Data: string(answerString),
+ }); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func serveWs(w http.ResponseWriter, r *http.Request) {
+ ws, err := upgrader.Upgrade(w, r, nil)
+ if err != nil {
+ if _, ok := err.(websocket.HandshakeError); !ok {
+ log.Println(err)
+ }
+ return
+ }
+
+ peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig)
+ if err != nil {
+ log.Print(err)
+ return
+ } else if _, err = peerConnection.AddTrack(audioTrack); err != nil {
+ log.Print(err)
+ return
+ } else if _, err = peerConnection.AddTrack(videoTrack); err != nil {
+ log.Print(err)
+ return
+ }
+
+ defer func() {
+ if err := peerConnection.Close(); err != nil {
+ log.Println(err)
+ }
+ }()
+
+ message := &websocketMessage{}
+ for {
+ _, msg, err := ws.ReadMessage()
+ if err != nil {
+ break
+ } else if err := json.Unmarshal(msg, &message); err != nil {
+ log.Print(err)
+ return
+ }
+
+ if err := handleWebsocketMessage(peerConnection, ws, message); err != nil {
+ log.Print(err)
+ }
+ }
+}
+
+func serveHome(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ fmt.Fprintf(w, homeHTML)
+}