From 1d5310c38c93766b294afdd380e6fb05170aaad5 Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Fri, 20 Sep 2019 00:31:15 -0700 Subject: Initial commit --- gst/gst.c | 110 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ gst/gst.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++++ gst/gst.h | 20 +++++++++++ 3 files changed, 226 insertions(+) create mode 100644 gst/gst.c create mode 100644 gst/gst.go create mode 100644 gst/gst.h (limited to 'gst') diff --git a/gst/gst.c b/gst/gst.c new file mode 100644 index 0000000..e84316e --- /dev/null +++ b/gst/gst.c @@ -0,0 +1,110 @@ +#include "gst.h" + +#include + +typedef struct SampleHandlerUserData { + int pipelineId; +} SampleHandlerUserData; + +GMainLoop *gstreamer_send_main_loop = NULL; +void gstreamer_send_start_mainloop(void) { + gstreamer_send_main_loop = g_main_loop_new(NULL, FALSE); + + g_main_loop_run(gstreamer_send_main_loop); +} + +static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) { + GstElement *pipeline = GST_ELEMENT(data); + + switch (GST_MESSAGE_TYPE(msg)) { + case GST_MESSAGE_EOS: + if (!gst_element_seek (pipeline, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SKIP, + GST_SEEK_TYPE_SET, 0, + GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) { + g_print ("EOS restart failed\n"); + exit(1); + } + break; + + case GST_MESSAGE_ERROR: { + gchar *debug; + GError *error; + + gst_message_parse_error(msg, &error, &debug); + g_free(debug); + + g_printerr("GStreamer Error: %s\n", error->message); + g_error_free(error); + exit(1); + } + default: + break; + } + + return TRUE; +} + +GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) { + GstSample *sample = NULL; + GstBuffer *buffer = NULL; + gpointer copy = NULL; + gsize copy_size = 0; + int *isVideo = (int *) user_data; + + g_signal_emit_by_name (object, "pull-sample", &sample); + if (sample) { + buffer = gst_sample_get_buffer(sample); + if (buffer) { + gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), ©, ©_size); + goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), *isVideo); + } + gst_sample_unref (sample); + } + + return GST_FLOW_OK; +} + +GstElement *gstreamer_send_create_pipeline(char *pipeline) { + gst_init(NULL, NULL); + GError *error = NULL; + return gst_parse_launch(pipeline, &error); +} + +void gstreamer_send_start_pipeline(GstElement *pipeline) { + GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, gstreamer_send_bus_call, pipeline); + gst_object_unref(bus); + + GstElement *audio = gst_bin_get_by_name(GST_BIN(pipeline), "audio"), + *video = gst_bin_get_by_name(GST_BIN(pipeline), "video"); + + int *isAudio = malloc(sizeof(int)), + *isVideo = malloc(sizeof(int)); + + *isVideo = 1; + *isAudio = 0; + + g_object_set(audio, "emit-signals", TRUE, NULL); + g_signal_connect(audio, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), isAudio); + + g_object_set(video, "emit-signals", TRUE, NULL); + g_signal_connect(video, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), isVideo); + + gstreamer_send_play_pipeline(pipeline); +} + +void gstreamer_send_play_pipeline(GstElement *pipeline) { + gst_element_set_state(pipeline, GST_STATE_PLAYING); +} + +void gstreamer_send_pause_pipeline(GstElement *pipeline) { + gst_element_set_state(pipeline, GST_STATE_PAUSED); +} + +void gstreamer_send_seek(GstElement *pipeline, int64_t seek_pos) { + if (!gst_element_seek (pipeline, 1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SKIP, + GST_SEEK_TYPE_SET, seek_pos * GST_SECOND, + GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE)) { + g_print ("Seek failed!\n"); + } +} diff --git a/gst/gst.go b/gst/gst.go new file mode 100644 index 0000000..f8a5385 --- /dev/null +++ b/gst/gst.go @@ -0,0 +1,96 @@ +package gst + +/* +#cgo pkg-config: gstreamer-1.0 gstreamer-app-1.0 + +#include "gst.h" + +*/ +import "C" +import ( + "fmt" + "io" + "sync" + "unsafe" + + "github.com/pion/webrtc/v2" + "github.com/pion/webrtc/v2/pkg/media" +) + +func init() { + go C.gstreamer_send_start_mainloop() +} + +// Pipeline is a wrapper for a GStreamer Pipeline +type Pipeline struct { + Pipeline *C.GstElement + audioTrack *webrtc.Track + videoTrack *webrtc.Track +} + +var pipeline = &Pipeline{} +var pipelinesLock sync.Mutex + +// CreatePipeline creates a GStreamer Pipeline +func CreatePipeline(containerPath string, audioTrack, videoTrack *webrtc.Track) *Pipeline { + pipelineStr := fmt.Sprintf("filesrc location=\"%s\" ! decodebin name=demux ! queue ! x264enc bframes=0 speed-preset=veryfast key-int-max=60 ! video/x-h264,stream-format=byte-stream ! appsink name=video demux. ! queue ! audioconvert ! audioresample ! opusenc ! appsink name=audio", containerPath) + + pipelineStrUnsafe := C.CString(pipelineStr) + defer C.free(unsafe.Pointer(pipelineStrUnsafe)) + + pipelinesLock.Lock() + defer pipelinesLock.Unlock() + pipeline = &Pipeline{ + Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe), + audioTrack: audioTrack, + videoTrack: videoTrack, + } + return pipeline +} + +// Start starts the GStreamer Pipeline +func (p *Pipeline) Start() { + // This will signal to goHandlePipelineBuffer + // and provide a method for cancelling sends. + C.gstreamer_send_start_pipeline(p.Pipeline) +} + +// Play sets the pipeline to PLAYING +func (p *Pipeline) Play() { + C.gstreamer_send_play_pipeline(p.Pipeline) +} + +// Pause sets the pipeline to PAUSED +func (p *Pipeline) Pause() { + C.gstreamer_send_pause_pipeline(p.Pipeline) +} + +// SeekToTime seeks on the pipeline +func (p *Pipeline) SeekToTime(seekPos int64) { + C.gstreamer_send_seek(p.Pipeline, C.int64_t(seekPos)) +} + +const ( + videoClockRate = 90000 + audioClockRate = 48000 +) + +//export goHandlePipelineBuffer +func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, isVideo C.int) { + var track *webrtc.Track + var samples uint32 + + if isVideo == 1 { + samples = uint32(videoClockRate * (float32(duration) / 1000000000)) + track = pipeline.videoTrack + } else { + samples = uint32(audioClockRate * (float32(duration) / 1000000000)) + track = pipeline.audioTrack + } + + if err := track.WriteSample(media.Sample{Data: C.GoBytes(buffer, bufferLen), Samples: samples}); err != nil && err != io.ErrClosedPipe { + panic(err) + } + + C.free(buffer) +} diff --git a/gst/gst.h b/gst/gst.h new file mode 100644 index 0000000..1103a6a --- /dev/null +++ b/gst/gst.h @@ -0,0 +1,20 @@ +#ifndef GST_H +#define GST_H + +#include +#include +#include +#include + +extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int isVideo); + +GstElement *gstreamer_send_create_pipeline(char *pipeline); +void gstreamer_send_start_pipeline(GstElement *pipeline); + +void gstreamer_send_play_pipeline(GstElement *pipeline); +void gstreamer_send_pause_pipeline(GstElement *pipeline); +void gstreamer_send_seek(GstElement *pipeline, int64_t seek_pos); + +void gstreamer_send_start_mainloop(void); + +#endif -- cgit v1.2.3