aboutsummaryrefslogtreecommitdiff
path: root/gst
diff options
context:
space:
mode:
authorSean DuBois <seaduboi@amazon.com>2019-09-20 00:31:15 -0700
committerSean DuBois <seaduboi@amazon.com>2019-09-20 00:31:15 -0700
commit1d5310c38c93766b294afdd380e6fb05170aaad5 (patch)
treec86c66cbdee6b3049d5e0d080896ffbc7f97f29b /gst
downloadrtwatch-1d5310c38c93766b294afdd380e6fb05170aaad5.tar.gz
rtwatch-1d5310c38c93766b294afdd380e6fb05170aaad5.tar.bz2
rtwatch-1d5310c38c93766b294afdd380e6fb05170aaad5.zip
Initial commit
Diffstat (limited to 'gst')
-rw-r--r--gst/gst.c110
-rw-r--r--gst/gst.go96
-rw-r--r--gst/gst.h20
3 files changed, 226 insertions, 0 deletions
diff --git a/gst/gst.c b/gst/gst.c
new file mode 100644
index 0000000..e84316e
--- /dev/null
+++ b/gst/gst.c
@@ -0,0 +1,110 @@
+#include "gst.h"
+
+#include <gst/app/gstappsrc.h>
+
+typedef struct SampleHandlerUserData {
+ int pipelineId;
+} SampleHandlerUserData;
+
+GMainLoop *gstreamer_send_main_loop = NULL;
+void gstreamer_send_start_mainloop(void) {
+ gstreamer_send_main_loop = g_main_loop_new(NULL, FALSE);
+
+ g_main_loop_run(gstreamer_send_main_loop);
+}
+
+static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) {
+ GstElement *pipeline = GST_ELEMENT(data);
+
+ switch (GST_MESSAGE_TYPE(msg)) {
+ case GST_MESSAGE_EOS:
+ if (!gst_element_seek (pipeline, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SKIP,
+ GST_SEEK_TYPE_SET, 0,
+ GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) {
+ g_print ("EOS restart failed\n");
+ exit(1);
+ }
+ break;
+
+ case GST_MESSAGE_ERROR: {
+ gchar *debug;
+ GError *error;
+
+ gst_message_parse_error(msg, &error, &debug);
+ g_free(debug);
+
+ g_printerr("GStreamer Error: %s\n", error->message);
+ g_error_free(error);
+ exit(1);
+ }
+ default:
+ break;
+ }
+
+ return TRUE;
+}
+
+GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) {
+ GstSample *sample = NULL;
+ GstBuffer *buffer = NULL;
+ gpointer copy = NULL;
+ gsize copy_size = 0;
+ int *isVideo = (int *) user_data;
+
+ g_signal_emit_by_name (object, "pull-sample", &sample);
+ if (sample) {
+ buffer = gst_sample_get_buffer(sample);
+ if (buffer) {
+ gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), &copy, &copy_size);
+ goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), *isVideo);
+ }
+ gst_sample_unref (sample);
+ }
+
+ return GST_FLOW_OK;
+}
+
+GstElement *gstreamer_send_create_pipeline(char *pipeline) {
+ gst_init(NULL, NULL);
+ GError *error = NULL;
+ return gst_parse_launch(pipeline, &error);
+}
+
+void gstreamer_send_start_pipeline(GstElement *pipeline) {
+ GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
+ gst_bus_add_watch(bus, gstreamer_send_bus_call, pipeline);
+ gst_object_unref(bus);
+
+ GstElement *audio = gst_bin_get_by_name(GST_BIN(pipeline), "audio"),
+ *video = gst_bin_get_by_name(GST_BIN(pipeline), "video");
+
+ int *isAudio = malloc(sizeof(int)),
+ *isVideo = malloc(sizeof(int));
+
+ *isVideo = 1;
+ *isAudio = 0;
+
+ g_object_set(audio, "emit-signals", TRUE, NULL);
+ g_signal_connect(audio, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), isAudio);
+
+ g_object_set(video, "emit-signals", TRUE, NULL);
+ g_signal_connect(video, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), isVideo);
+
+ gstreamer_send_play_pipeline(pipeline);
+}
+
+void gstreamer_send_play_pipeline(GstElement *pipeline) {
+ gst_element_set_state(pipeline, GST_STATE_PLAYING);
+}
+
+void gstreamer_send_pause_pipeline(GstElement *pipeline) {
+ gst_element_set_state(pipeline, GST_STATE_PAUSED);
+}
+
+void gstreamer_send_seek(GstElement *pipeline, int64_t seek_pos) {
+ if (!gst_element_seek (pipeline, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SKIP,
+ GST_SEEK_TYPE_SET, seek_pos * GST_SECOND,
+ GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) {
+ g_print ("Seek failed!\n");
+ }
+}
diff --git a/gst/gst.go b/gst/gst.go
new file mode 100644
index 0000000..f8a5385
--- /dev/null
+++ b/gst/gst.go
@@ -0,0 +1,96 @@
+package gst
+
+/*
+#cgo pkg-config: gstreamer-1.0 gstreamer-app-1.0
+
+#include "gst.h"
+
+*/
+import "C"
+import (
+ "fmt"
+ "io"
+ "sync"
+ "unsafe"
+
+ "github.com/pion/webrtc/v2"
+ "github.com/pion/webrtc/v2/pkg/media"
+)
+
+func init() {
+ go C.gstreamer_send_start_mainloop()
+}
+
+// Pipeline is a wrapper for a GStreamer Pipeline
+type Pipeline struct {
+ Pipeline *C.GstElement
+ audioTrack *webrtc.Track
+ videoTrack *webrtc.Track
+}
+
+var pipeline = &Pipeline{}
+var pipelinesLock sync.Mutex
+
+// CreatePipeline creates a GStreamer Pipeline
+func CreatePipeline(containerPath string, audioTrack, videoTrack *webrtc.Track) *Pipeline {
+ pipelineStr := fmt.Sprintf("filesrc location=\"%s\" ! decodebin name=demux ! queue ! x264enc bframes=0 speed-preset=veryfast key-int-max=60 ! video/x-h264,stream-format=byte-stream ! appsink name=video demux. ! queue ! audioconvert ! audioresample ! opusenc ! appsink name=audio", containerPath)
+
+ pipelineStrUnsafe := C.CString(pipelineStr)
+ defer C.free(unsafe.Pointer(pipelineStrUnsafe))
+
+ pipelinesLock.Lock()
+ defer pipelinesLock.Unlock()
+ pipeline = &Pipeline{
+ Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
+ audioTrack: audioTrack,
+ videoTrack: videoTrack,
+ }
+ return pipeline
+}
+
+// Start starts the GStreamer Pipeline
+func (p *Pipeline) Start() {
+ // This will signal to goHandlePipelineBuffer
+ // and provide a method for cancelling sends.
+ C.gstreamer_send_start_pipeline(p.Pipeline)
+}
+
+// Play sets the pipeline to PLAYING
+func (p *Pipeline) Play() {
+ C.gstreamer_send_play_pipeline(p.Pipeline)
+}
+
+// Pause sets the pipeline to PAUSED
+func (p *Pipeline) Pause() {
+ C.gstreamer_send_pause_pipeline(p.Pipeline)
+}
+
+// SeekToTime seeks on the pipeline
+func (p *Pipeline) SeekToTime(seekPos int64) {
+ C.gstreamer_send_seek(p.Pipeline, C.int64_t(seekPos))
+}
+
+const (
+ videoClockRate = 90000
+ audioClockRate = 48000
+)
+
+//export goHandlePipelineBuffer
+func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, isVideo C.int) {
+ var track *webrtc.Track
+ var samples uint32
+
+ if isVideo == 1 {
+ samples = uint32(videoClockRate * (float32(duration) / 1000000000))
+ track = pipeline.videoTrack
+ } else {
+ samples = uint32(audioClockRate * (float32(duration) / 1000000000))
+ track = pipeline.audioTrack
+ }
+
+ if err := track.WriteSample(media.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}); err != nil && err != io.ErrClosedPipe {
+ panic(err)
+ }
+
+ C.free(buffer)
+}
diff --git a/gst/gst.h b/gst/gst.h
new file mode 100644
index 0000000..1103a6a
--- /dev/null
+++ b/gst/gst.h
@@ -0,0 +1,20 @@
+#ifndef GST_H
+#define GST_H
+
+#include <glib.h>
+#include <gst/gst.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int isVideo);
+
+GstElement *gstreamer_send_create_pipeline(char *pipeline);
+void gstreamer_send_start_pipeline(GstElement *pipeline);
+
+void gstreamer_send_play_pipeline(GstElement *pipeline);
+void gstreamer_send_pause_pipeline(GstElement *pipeline);
+void gstreamer_send_seek(GstElement *pipeline, int64_t seek_pos);
+
+void gstreamer_send_start_mainloop(void);
+
+#endif