Fix analyzer and cleanup old pipeline code

- Move HandoffCallback to audio queue
- Add new callback for detecting source format
- Remove old decodebin stuff
This commit is contained in:
Jonas Kvinge
2019-10-20 02:56:47 +02:00
parent 1a28dd0311
commit 156eb874db
2 changed files with 61 additions and 101 deletions

View File

@@ -65,8 +65,6 @@ GstEnginePipeline::GstEnginePipeline(GstEngine *engine)
engine_(engine),
id_(sId++),
valid_(false),
output_(QString()),
device_(QVariant()),
volume_control_(true),
eq_enabled_(false),
eq_preamp_(0),
@@ -185,48 +183,9 @@ void GstEnginePipeline::set_buffer_min_fill(int percent) {
buffer_min_fill_ = percent;
}
bool GstEnginePipeline::InitDecodeBin(GstElement *decode_bin) {
if (!decode_bin) return false;
pipeline_ = gst_pipeline_new("pipeline");
if (!gst_bin_add(GST_BIN(pipeline_), decode_bin)) return false;
if (!InitAudioBin()) return false;
if (!gst_bin_add(GST_BIN(pipeline_), audiobin_)) return false;
if (!gst_element_link(decode_bin, audiobin_)) return false;
return true;
}
GstElement *GstEnginePipeline::CreateDecodeBinFromString(const char *pipeline) {
GError *error = nullptr;
GstElement *bin = gst_parse_bin_from_description(pipeline, TRUE, &error);
if (error) {
QString message = QString::fromLocal8Bit(error->message);
int domain = error->domain;
int code = error->code;
g_error_free(error);
qLog(Warning) << message;
emit Error(id(), message, domain, code);
return nullptr;
}
else {
return bin;
}
}
bool GstEnginePipeline::InitAudioBin() {
gst_segment_init(&last_decodebin_segment_, GST_FORMAT_TIME);
gst_segment_init(&last_playbin_segment_, GST_FORMAT_TIME);
// Audio bin
audiobin_ = gst_bin_new("audiobin");
@@ -315,8 +274,8 @@ bool GstEnginePipeline::InitAudioBin() {
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, &EventHandoffCallback, this, nullptr);
gst_object_unref(pad);
pad = gst_element_get_static_pad(queue_, "sink");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, HandoffCallback, this, nullptr);
pad = gst_element_get_static_pad(queue_, "src");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, SourceHandoffCallback, this, nullptr);
gst_object_unref(pad);
// Setting the equalizer bands:
@@ -391,6 +350,11 @@ bool GstEnginePipeline::InitAudioBin() {
gst_element_link_filtered(convert, audiosink_, caps);
gst_caps_unref(caps);
// Add probes and handlers.
pad = gst_element_get_static_pad(audio_queue, "src");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, HandoffCallback, this, nullptr);
gst_object_unref(pad);
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_));
gst_bus_set_sync_handler(bus, BusCallbackSync, this, nullptr);
bus_cb_id_ = gst_bus_add_watch(bus, BusCallback, this);
@@ -407,19 +371,12 @@ bool GstEnginePipeline::InitAudioBin() {
}
bool GstEnginePipeline::InitFromString(const QString &pipeline) {
GstElement *new_bin = CreateDecodeBinFromString(pipeline.toUtf8().constData());
if (!new_bin) return false;
return InitDecodeBin(new_bin);
}
bool GstEnginePipeline::InitFromUrl(const QByteArray &stream_url, const QUrl original_url, const qint64 end_nanosec) {
stream_url_ = stream_url;
original_url_ = original_url;
end_offset_nanosec_ = end_nanosec;
format_.clear();
pipeline_ = engine_->CreateElement("playbin");
if (!pipeline_) return false;
@@ -552,10 +509,12 @@ void GstEnginePipeline::StreamStartMessageReceived() {
stream_url_ = next_stream_url_;
original_url_ = next_original_url_;
end_offset_nanosec_ = next_end_offset_nanosec_;
next_stream_url_ = QByteArray();
next_original_url_ = QUrl();
format_ = next_format_;
next_stream_url_.clear();
next_original_url_.clear();
next_beginning_offset_nanosec_ = 0;
next_end_offset_nanosec_ = 0;
next_format_.clear();
emit EndOfStreamReached(id(), true);
}
@@ -711,7 +670,7 @@ void GstEnginePipeline::StateChangedMessageReceived(GstMessage *msg) {
void GstEnginePipeline::BufferingMessageReceived(GstMessage *msg) {
// Only handle buffering messages from the queue2 element in audiobin - not the one that's created automatically by uridecodebin.
// Only handle buffering messages from the queue2 element in audiobin - not the one that's created automatically by playbin.
if (GST_ELEMENT(GST_MESSAGE_SRC(msg)) != queue_) {
return;
}
@@ -746,7 +705,7 @@ void GstEnginePipeline::NewPadCallback(GstElement*, GstPad *pad, gpointer self)
GstPad *const audiopad = gst_element_get_static_pad(instance->audiobin_, "sink");
// Link decodebin's sink pad to audiobin's src pad.
// Link playbin's sink pad to audiobin's src pad.
if (GST_PAD_IS_LINKED(audiopad)) {
qLog(Warning) << instance->id() << "audiopad is already linked, unlinking old pad";
gst_pad_unlink(audiopad, GST_PAD_PEER(audiopad));
@@ -755,13 +714,13 @@ void GstEnginePipeline::NewPadCallback(GstElement*, GstPad *pad, gpointer self)
gst_pad_link(pad, audiopad);
gst_object_unref(audiopad);
// Offset the timestamps on all the buffers coming out of the decodebin so they line up exactly with the end of the last buffer from the old decodebin.
// Offset the timestamps on all the buffers coming out of the playbin so they line up exactly with the end of the last buffer from the old playbin.
// "Running time" is the time since the last flushing seek.
GstClockTime running_time = gst_segment_to_running_time(&instance->last_decodebin_segment_, GST_FORMAT_TIME, instance->last_decodebin_segment_.position);
GstClockTime running_time = gst_segment_to_running_time(&instance->last_playbin_segment_, GST_FORMAT_TIME, instance->last_playbin_segment_.position);
gst_pad_set_offset(pad, running_time);
// Add a probe to the pad so we can update last_decodebin_segment_.
gst_pad_add_probe(pad, static_cast<GstPadProbeType>(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH), DecodebinProbe, instance, nullptr);
// Add a probe to the pad so we can update last_playbin_segment_.
gst_pad_add_probe(pad, static_cast<GstPadProbeType>(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH), PlaybinProbe, instance, nullptr);
instance->pipeline_is_connected_ = true;
if (instance->pending_seek_nanosec_ != -1 && instance->pipeline_is_initialised_) {
@@ -770,28 +729,27 @@ void GstEnginePipeline::NewPadCallback(GstElement*, GstPad *pad, gpointer self)
}
GstPadProbeReturn GstEnginePipeline::DecodebinProbe(GstPad *pad, GstPadProbeInfo *info, gpointer data) {
GstPadProbeReturn GstEnginePipeline::PlaybinProbe(GstPad *pad, GstPadProbeInfo *info, gpointer data) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(data);
if (!instance) return GST_PAD_PROBE_OK;
const GstPadProbeType info_type = GST_PAD_PROBE_INFO_TYPE(info);
if (info_type & GST_PAD_PROBE_TYPE_BUFFER) {
// The decodebin produced a buffer. Record its end time, so we can offset the buffers produced by the next decodebin when transitioning to the next song.
// The playbin produced a buffer. Record its end time, so we can offset the buffers produced by the next playbin when transitioning to the next song.
GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER(info);
GstClockTime timestamp = GST_BUFFER_TIMESTAMP(buffer);
GstClockTime duration = GST_BUFFER_DURATION(buffer);
if (timestamp == GST_CLOCK_TIME_NONE) {
timestamp = instance->last_decodebin_segment_.position;
timestamp = instance->last_playbin_segment_.position;
}
if (duration != GST_CLOCK_TIME_NONE) {
timestamp += duration;
}
instance->last_decodebin_segment_.position = timestamp;
instance->last_playbin_segment_.position = timestamp;
}
else if (info_type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = GST_PAD_PROBE_INFO_EVENT(info);
@@ -799,7 +757,7 @@ GstPadProbeReturn GstEnginePipeline::DecodebinProbe(GstPad *pad, GstPadProbeInfo
if (event_type == GST_EVENT_SEGMENT) {
// A new segment started, we need to save this to calculate running time offsets later.
gst_event_copy_segment(event, &instance->last_decodebin_segment_);
gst_event_copy_segment(event, &instance->last_playbin_segment_);
}
else if (event_type == GST_EVENT_FLUSH_START) {
// A flushing seek resets the running time to 0, so remove any offset we set on this pad before.
@@ -811,14 +769,9 @@ GstPadProbeReturn GstEnginePipeline::DecodebinProbe(GstPad *pad, GstPadProbeInfo
}
GstPadProbeReturn GstEnginePipeline::HandoffCallback(GstPad *pad, GstPadProbeInfo *info, gpointer self) {
GstPadProbeReturn GstEnginePipeline::HandoffCallback(GstPad*, GstPadProbeInfo *info, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
if (!instance) return GST_PAD_PROBE_OK;
GstCaps *caps = gst_pad_get_current_caps(pad);
GstStructure *structure = gst_caps_get_structure(caps, 0);
const gchar *format = gst_structure_get_string(structure, "format");
GstBuffer *buf = gst_pad_probe_info_get_buffer(info);
@@ -830,7 +783,7 @@ GstPadProbeReturn GstEnginePipeline::HandoffCallback(GstPad *pad, GstPadProbeInf
for (GstBufferConsumer *consumer : consumers) {
gst_buffer_ref(buf);
consumer->ConsumeBuffer(buf, instance->id(), QString(format));
consumer->ConsumeBuffer(buf, instance->id(), instance->format_);
}
// Calculate the end time of this buffer so we can stop playback if it's after the end time of this song.
@@ -843,10 +796,11 @@ GstPadProbeReturn GstEnginePipeline::HandoffCallback(GstPad *pad, GstPadProbeInf
if (instance->has_next_valid_url() && instance->next_stream_url_ == instance->stream_url_ && instance->next_beginning_offset_nanosec_ == instance->end_offset_nanosec_) {
// The "next" song is actually the next segment of this file - so cheat and keep on playing, but just tell the Engine we've moved on.
instance->end_offset_nanosec_ = instance->next_end_offset_nanosec_;
instance->next_stream_url_ = QByteArray();
instance->next_original_url_ = QUrl();
instance->next_stream_url_.clear();
instance->next_original_url_.clear();
instance->next_beginning_offset_nanosec_ = 0;
instance->next_end_offset_nanosec_ = 0;
instance->next_format_.clear();
// GstEngine will try to seek to the start of the new section, but we're already there so ignore it.
instance->ignore_next_seek_ = true;
@@ -863,10 +817,28 @@ GstPadProbeReturn GstEnginePipeline::HandoffCallback(GstPad *pad, GstPadProbeInf
}
GstPadProbeReturn GstEnginePipeline::SourceHandoffCallback(GstPad *pad, GstPadProbeInfo *, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
GstCaps *caps = gst_pad_get_current_caps(pad);
GstStructure *structure = gst_caps_get_structure(caps, 0);
const gchar *format = gst_structure_get_string(structure, "format");
if (instance->next_uri_set_) {
instance->next_format_ = QString(format);
}
else {
instance->format_ = QString(format);
}
return GST_PAD_PROBE_OK;
}
GstPadProbeReturn GstEnginePipeline::EventHandoffCallback(GstPad*, GstPadProbeInfo *info, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
if (!instance) return GST_PAD_PROBE_OK;
GstEvent *e = gst_pad_probe_info_get_event(info);
@@ -891,12 +863,9 @@ GstPadProbeReturn GstEnginePipeline::EventHandoffCallback(GstPad*, GstPadProbeIn
}
void GstEnginePipeline::AboutToFinishCallback(GstPlayBin *bin, gpointer self) {
Q_UNUSED(bin);
void GstEnginePipeline::AboutToFinishCallback(GstPlayBin*, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
if (!instance) return;
if (instance->has_next_valid_url() && !instance->next_uri_set_) {
// Set the next uri. When the current song ends it will be played automatically and a STREAM_START message is send to the bus.
@@ -907,12 +876,9 @@ void GstEnginePipeline::AboutToFinishCallback(GstPlayBin *bin, gpointer self) {
}
void GstEnginePipeline::SourceSetupCallback(GstPlayBin *bin, GParamSpec *pspec, gpointer self) {
Q_UNUSED(pspec);
void GstEnginePipeline::SourceSetupCallback(GstPlayBin *bin, GParamSpec *, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
if (!instance) return;
GstElement *element;
g_object_get(bin, "source", &element, nullptr);
@@ -1156,6 +1122,7 @@ void GstEnginePipeline::SetNextUrl(const QByteArray &stream_url, const QUrl &ori
next_original_url_ = original_url;
next_beginning_offset_nanosec_ = beginning_nanosec;
next_end_offset_nanosec_ = end_nanosec;
next_format_.clear();
// Add request to discover the stream
if (discoverer_) {
@@ -1166,10 +1133,7 @@ void GstEnginePipeline::SetNextUrl(const QByteArray &stream_url, const QUrl &ori
}
void GstEnginePipeline::StreamDiscovered(GstDiscoverer *discoverer, GstDiscovererInfo *info, GError *err, gpointer self) {
Q_UNUSED(discoverer);
Q_UNUSED(err);
void GstEnginePipeline::StreamDiscovered(GstDiscoverer *, GstDiscovererInfo *info, GError *, gpointer self) {
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
if (!instance) return;
@@ -1220,12 +1184,7 @@ void GstEnginePipeline::StreamDiscovered(GstDiscoverer *discoverer, GstDiscovere
}
void GstEnginePipeline::StreamDiscoveryFinished(GstDiscoverer *discoverer, gpointer self) {
Q_UNUSED(discoverer);
Q_UNUSED(self);
}
void GstEnginePipeline::StreamDiscoveryFinished(GstDiscoverer *, gpointer) {}
QString GstEnginePipeline::GSTdiscovererErrorMessage(GstDiscovererResult result) {