Wait for set state to finish before deleting pipeline

Setting state to GST_STATE_NULL sometimes blocks, to fix this use the threadpool to set the state to NULL and wait with deleting the pipeline until the state is changed.
This fixes blocking the main thread when switching Spotify songs.
This commit is contained in:
Jonas Kvinge
2024-08-03 00:05:06 +02:00
parent 8ddd309d5d
commit 548fa3f6ee
11 changed files with 520 additions and 214 deletions

View File

@@ -49,6 +49,7 @@
#include <QVariant>
#include <QString>
#include <QUrl>
#include <QTimer>
#include <QTimeLine>
#include <QEasingCurve>
#include <QMetaObject>
@@ -64,6 +65,12 @@
namespace {
constexpr int GST_PLAY_FLAG_VIDEO = 0x00000001;
constexpr int GST_PLAY_FLAG_AUDIO = 0x00000002;
constexpr int GST_PLAY_FLAG_DOWNLOAD = 0x00000080;
constexpr int GST_PLAY_FLAG_BUFFERING = 0x00000100;
constexpr int GST_PLAY_FLAG_SOFT_VOLUME = 0x00000010;
constexpr int kGstStateTimeoutNanosecs = 10000000;
constexpr int kFaderFudgeMsec = 2000;
@@ -109,6 +116,7 @@ GstEnginePipeline::GstEnginePipeline(QObject *parent)
ignore_tags_(false),
pipeline_connected_(false),
pipeline_active_(false),
pending_state_(GST_STATE_NULL),
pending_seek_nanosec_(-1),
last_known_position_ns_(0),
next_uri_set_(false),
@@ -141,7 +149,9 @@ GstEnginePipeline::GstEnginePipeline(QObject *parent)
about_to_finish_cb_id_(-1),
notify_volume_cb_id_(-1),
logged_unsupported_analyzer_format_(false),
about_to_finish_(false) {
about_to_finish_(false),
finish_requested_(false),
finished_(false) {
eq_band_gains_.reserve(kEqBandCount);
for (int i = 0; i < kEqBandCount; ++i) eq_band_gains_ << 0;
@@ -150,78 +160,22 @@ GstEnginePipeline::GstEnginePipeline(QObject *parent)
GstEnginePipeline::~GstEnginePipeline() {
Disconnect();
if (pipeline_) {
if (fader_) {
if (fader_->state() != QTimeLine::NotRunning) {
fader_->stop();
}
fader_.reset();
if (state() != GST_STATE_NULL) {
gst_element_set_state(pipeline_, GST_STATE_NULL);
}
if (element_added_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(audiobin_), element_added_cb_id_);
}
if (element_removed_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(audiobin_), element_removed_cb_id_);
}
if (pad_added_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), pad_added_cb_id_);
}
if (notify_source_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), notify_source_cb_id_);
}
if (about_to_finish_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), about_to_finish_cb_id_);
}
if (notify_volume_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(volume_), notify_volume_cb_id_);
}
if (upstream_events_probe_cb_id_ != 0) {
GstPad *pad = gst_element_get_static_pad(eventprobe_, "src");
if (pad) {
gst_pad_remove_probe(pad, upstream_events_probe_cb_id_);
gst_object_unref(pad);
}
}
if (buffer_probe_cb_id_ != 0) {
GstPad *pad = gst_element_get_static_pad(audioqueueconverter_, "src");
if (pad) {
gst_pad_remove_probe(pad, buffer_probe_cb_id_);
gst_object_unref(pad);
}
}
{
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_));
if (bus) {
gst_bus_remove_watch(bus);
gst_bus_set_sync_handler(bus, nullptr, nullptr, nullptr);
gst_object_unref(bus);
}
}
gst_element_set_state(pipeline_, GST_STATE_NULL);
gst_object_unref(GST_OBJECT(pipeline_));
pipeline_ = nullptr;
if (audiobin_ && !pipeline_connected_) {
gst_object_unref(GST_OBJECT(audiobin_));
}
audiobin_ = nullptr;
}
qLog(Debug) << "Pipeline" << id_ << "deleted";
}
void GstEnginePipeline::set_output_device(const QString &output, const QVariant &device) {
@@ -348,6 +302,97 @@ GstElement *GstEnginePipeline::CreateElement(const QString &factory_name, const
}
void GstEnginePipeline::Disconnect() {
if (pipeline_) {
if (fader_) {
if (fader_->state() != QTimeLine::NotRunning) {
fader_->stop();
}
fader_.reset();
}
if (element_added_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(audiobin_), element_added_cb_id_);
element_added_cb_id_ = -1;
}
if (element_removed_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(audiobin_), element_removed_cb_id_);
element_removed_cb_id_ = -1;
}
if (pad_added_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), pad_added_cb_id_);
pad_added_cb_id_ = -1;
}
if (notify_source_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), notify_source_cb_id_);
notify_source_cb_id_ = -1;
}
if (about_to_finish_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(pipeline_), about_to_finish_cb_id_);
about_to_finish_cb_id_ = -1;
}
if (notify_volume_cb_id_ != -1) {
g_signal_handler_disconnect(G_OBJECT(volume_), notify_volume_cb_id_);
notify_volume_cb_id_ = -1;
}
if (upstream_events_probe_cb_id_ != 0) {
GstPad *pad = gst_element_get_static_pad(eventprobe_, "src");
if (pad) {
gst_pad_remove_probe(pad, upstream_events_probe_cb_id_);
gst_object_unref(pad);
}
upstream_events_probe_cb_id_ = 0;
}
if (buffer_probe_cb_id_ != 0) {
GstPad *pad = gst_element_get_static_pad(audioqueueconverter_, "src");
if (pad) {
gst_pad_remove_probe(pad, buffer_probe_cb_id_);
gst_object_unref(pad);
}
buffer_probe_cb_id_ = 0;
}
{
GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_));
if (bus) {
gst_bus_remove_watch(bus);
gst_bus_set_sync_handler(bus, nullptr, nullptr, nullptr);
gst_object_unref(bus);
}
}
}
}
bool GstEnginePipeline::Finish() {
qLog(Debug) << "Finishing pipeline" << id_;
finish_requested_ = true;
Disconnect();
if (state() == GST_STATE_NULL) {
finished_ = true;
}
else {
SetStateAsync(GST_STATE_NULL);
}
return finished_;
}
bool GstEnginePipeline::InitFromUrl(const QUrl &media_url, const QUrl &stream_url, const QByteArray &gst_url, const qint64 end_nanosec, const double ebur128_loudness_normalizing_gain_db, QString &error) {
media_url_ = media_url;
@@ -395,9 +440,9 @@ bool GstEnginePipeline::InitFromUrl(const QUrl &media_url, const QUrl &stream_ur
gint flags = 0;
g_object_get(G_OBJECT(pipeline_), "flags", &flags, nullptr);
flags |= 0x00000002;
flags &= ~0x00000001;
flags &= ~0x00000010;
flags |= GST_PLAY_FLAG_AUDIO;
flags &= ~GST_PLAY_FLAG_VIDEO;
flags &= ~GST_PLAY_FLAG_SOFT_VOLUME;
g_object_set(G_OBJECT(pipeline_), "flags", flags, nullptr);
g_object_set(G_OBJECT(pipeline_), "uri", gst_url.constData(), nullptr);
@@ -434,10 +479,13 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
#else
case QVariant::String:{
#endif
QString device = device_.toString();
const QString device = device_.toString();
if (!device.isEmpty()) {
qLog(Debug) << "Setting device" << device << "for" << output_;
g_object_set(G_OBJECT(audiosink_), "device", device.toUtf8().constData(), nullptr);
if (output_ == QLatin1String(GstEngine::kALSASink) && (device.startsWith(QLatin1String("hw:")) || device.startsWith(QLatin1String("plughw:")))) {
exclusive_mode_ = true;
}
}
break;
}
@@ -607,6 +655,9 @@ bool GstEnginePipeline::InitAudioBin(QString &error) {
if (!volume_fading_) {
return false;
}
if (fader_) {
SetFaderVolume(fader_->currentValue());
}
}
// Create the stereo balancer elements if it's enabled.
@@ -1026,9 +1077,10 @@ void GstEnginePipeline::SourceSetupCallback(GstElement *playbin, GstElement *sou
// If the pipeline was buffering we stop that now.
if (instance->buffering_) {
qLog(Debug) << "Buffering finished";
instance->buffering_ = false;
emit instance->BufferingFinished();
instance->SetState(GST_STATE_PLAYING);
instance->SetStateAsync(GST_STATE_PLAYING);
}
}
@@ -1254,7 +1306,7 @@ GstPadProbeReturn GstEnginePipeline::BufferProbeCallback(GstPad *pad, GstPadProb
consumers = instance->buffer_consumers_;
}
for (GstBufferConsumer *consumer : consumers) {
for (GstBufferConsumer *consumer : std::as_const(consumers)) {
gst_buffer_ref(buf);
consumer->ConsumeBuffer(buf, instance->id(), format);
}
@@ -1521,7 +1573,7 @@ void GstEnginePipeline::TagMessageReceived(GstMessage *msg) {
}
if (!title_splitted.isEmpty() && title_splitted.count() >= 2) {
int i = 0;
for (const QString &title_part : title_splitted) {
for (const QString &title_part : std::as_const(title_splitted)) {
++i;
switch (i) {
case 1:
@@ -1594,11 +1646,13 @@ void GstEnginePipeline::StateChangedMessageReceived(GstMessage *msg) {
if (next_uri_reset_ && new_state == GST_STATE_PAUSED) {
qLog(Debug) << "Reverting next uri and going to playing state.";
next_uri_reset_ = false;
pending_state_ = GST_STATE_PLAYING;
SeekDelayed(pending_seek_nanosec_);
SetStateDelayed(GST_STATE_PLAYING);
pending_seek_nanosec_ = -1;
}
else {
SeekQueued(pending_seek_nanosec_);
SeekAsync(pending_seek_nanosec_);
pending_seek_nanosec_ = -1;
}
}
}
@@ -1612,16 +1666,30 @@ void GstEnginePipeline::StateChangedMessageReceived(GstMessage *msg) {
g_object_set(G_OBJECT(pipeline_), "uri", gst_url_.constData(), nullptr);
if (pending_seek_nanosec_ == -1) {
qLog(Debug) << "Reverting next uri and going to playing state.";
SetState(GST_STATE_PLAYING);
SetStateAsync(GST_STATE_PLAYING);
}
else {
qLog(Debug) << "Reverting next uri and going to paused state.";
next_uri_reset_ = true;
SetState(GST_STATE_PAUSED);
SetStateAsync(GST_STATE_PAUSED);
}
}
}
if (pipeline_active_) {
if (pending_seek_nanosec_ != -1 && new_state == GST_STATE_PAUSED) {
SeekAsync(pending_seek_nanosec_);
}
else if (pending_state_ != GST_STATE_NULL) {
SetStateAsync(pending_state_);
pending_state_ = GST_STATE_NULL;
}
if (fader_ && fader_->state() != QTimeLine::State::Running && new_state == GST_STATE_PLAYING) {
qLog(Debug) << "Resuming fader";
ResumeFaderAsync();
}
}
}
void GstEnginePipeline::BufferingMessageReceived(GstMessage *msg) {
@@ -1637,16 +1705,18 @@ void GstEnginePipeline::BufferingMessageReceived(GstMessage *msg) {
const GstState current_state = state();
if (percent == 0 && current_state == GST_STATE_PLAYING && !buffering_) {
qLog(Debug) << "Buffering started";
buffering_ = true;
emit BufferingStarted();
SetState(GST_STATE_PAUSED);
SetStateAsync(GST_STATE_PAUSED);
}
else if (percent == 100 && buffering_) {
qLog(Debug) << "Buffering finished";
buffering_ = false;
emit BufferingFinished();
SetState(GST_STATE_PLAYING);
SetStateAsync(GST_STATE_PLAYING);
}
else if (buffering_) {
emit BufferingProgress(percent);
@@ -1687,18 +1757,54 @@ GstState GstEnginePipeline::state() const {
}
QFuture<GstStateChangeReturn> GstEnginePipeline::SetState(const GstState state) {
QFuture<GstStateChangeReturn> GstEnginePipeline::SetStateAsync(const GstState state) {
qLog(Debug) << "Setting pipeline state to" << GstStateText(state);
return QtConcurrent::run(&set_state_threadpool_, &gst_element_set_state, pipeline_, state);
qLog(Debug) << "Setting pipeline" << id_ << "state to" << GstStateText(state);
QFutureWatcher<GstStateChangeReturn> *watcher = new QFutureWatcher<GstStateChangeReturn>();
QObject::connect(watcher, &QFutureWatcher<GstStateChangeReturn>::finished, this, [this, watcher, state]() {
const GstStateChangeReturn state_change = watcher->result();
watcher->deleteLater();
SetStateAsyncFinished(state, state_change);
});
QFuture<GstStateChangeReturn> future = QtConcurrent::run(&set_state_threadpool_, &gst_element_set_state, pipeline_, state);
watcher->setFuture(future);
return future;
}
void GstEnginePipeline::SetStateDelayed(const GstState state) {
void GstEnginePipeline::SetStateAsyncFinished(const GstState state, const GstStateChangeReturn state_change) {
QMetaObject::invokeMethod(this, [this, state]() {
QTimer::singleShot(300, this, [this, state]() { SetState(state); });
}, Qt::QueuedConnection);
switch (state_change) {
case GST_STATE_CHANGE_SUCCESS:
case GST_STATE_CHANGE_ASYNC:
case GST_STATE_CHANGE_NO_PREROLL:
qLog(Debug) << "Pipeline" << id_ << "state successfully set to" << GstStateText(state);
emit SetStateFinished(state_change);
if (!finished_ && finish_requested_) {
finished_ = true;
emit Finished();
}
break;
case GST_STATE_CHANGE_FAILURE:
qLog(Error) << "Failed to set pipeline to state" << GstStateText(state);
break;
}
}
QFuture<GstStateChangeReturn> GstEnginePipeline::Play(const bool pause, const quint64 offset_nanosec) {
if (offset_nanosec != 0) {
pending_seek_nanosec_ = static_cast<qint64>(offset_nanosec);
}
if (!pause) {
pending_state_ = GST_STATE_PLAYING;
}
return SetStateAsync(GST_STATE_PAUSED);
}
@@ -1716,7 +1822,7 @@ bool GstEnginePipeline::Seek(const qint64 nanosec) {
if (next_uri_set_) {
pending_seek_nanosec_ = nanosec;
SetState(GST_STATE_READY);
SetStateAsync(GST_STATE_READY);
return true;
}
@@ -1725,11 +1831,22 @@ bool GstEnginePipeline::Seek(const qint64 nanosec) {
qLog(Debug) << "Seeking to" << nanosec;
return gst_element_seek_simple(pipeline_, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, nanosec);
const bool success = gst_element_seek_simple(pipeline_, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, nanosec);
if (success) {
qLog(Debug) << "Seek succeeded";
if (pending_state_ != GST_STATE_NULL) {
qLog(Debug) << "Setting state from pending state" << GstStateText(pending_state_);
SetStateAsync(pending_state_);
pending_state_ = GST_STATE_NULL;
}
}
return success;
}
void GstEnginePipeline::SeekQueued(const qint64 nanosec) {
void GstEnginePipeline::SeekAsync(const qint64 nanosec) {
QMetaObject::invokeMethod(this, "Seek", Qt::QueuedConnection, Q_ARG(qint64, nanosec));
@@ -1865,17 +1982,32 @@ void GstEnginePipeline::StartFader(const qint64 duration_nanosec, const QTimeLin
fader_->setDirection(direction);
fader_->setEasingCurve(shape);
fader_->setCurrentTime(static_cast<int>(start_time));
fader_->resume();
fader_fudge_timer_.stop();
use_fudge_timer_ = use_fudge_timer;
SetFaderVolume(fader_->currentValue());
qLog(Debug) << "Pipeline" << id_ << "with state" << GstStateText(state()) << "set to fade from" << start_time;
if (pipeline_active_) {
fader_->resume();
}
}
void GstEnginePipeline::ResumeFaderAsync() {
if (fader_) {
QMetaObject::invokeMethod(&*fader_, "resume", Qt::QueuedConnection);
}
}
void GstEnginePipeline::FaderTimelineFinished() {
qLog(Debug) << "Pipeline" << id_ << "finished fading";
fader_.reset();
// Wait a little while longer before emitting the finished signal (and probably destroying the pipeline) to account for delays in the audio server/driver.
@@ -1895,7 +2027,7 @@ void GstEnginePipeline::timerEvent(QTimerEvent *e) {
if (e->timerId() == fader_fudge_timer_.timerId()) {
fader_fudge_timer_.stop();
emit FaderFinished();
emit FaderFinished(id_);
return;
}