Add mutexes

This commit is contained in:
Jonas Kvinge
2024-09-02 22:27:45 +02:00
parent 2a9ccd7480
commit 552440f50e
6 changed files with 489 additions and 308 deletions

View File

@@ -45,6 +45,7 @@
#include <QFuture>
#include <QFutureWatcher>
#include <QMutex>
#include <QMutexLocker>
#include <QMetaType>
#include <QByteArray>
#include <QList>
@@ -87,7 +88,6 @@ int GstEnginePipeline::sId = 1;
GstEnginePipeline::GstEnginePipeline(QObject *parent)
: QObject(parent),
id_(sId++),
valid_(false),
exclusive_mode_(false),
volume_enabled_(true),
fading_enabled_(false),
@@ -119,6 +119,7 @@ GstEnginePipeline::GstEnginePipeline(QObject *parent)
ignore_tags_(false),
pipeline_connected_(false),
pipeline_active_(false),
buffering_(false),
pending_state_(GST_STATE_NULL),
pending_seek_nanosec_(-1),
last_known_position_ns_(0),
@@ -127,7 +128,7 @@ GstEnginePipeline::GstEnginePipeline(QObject *parent)
volume_set_(false),
volume_internal_(-1.0),
volume_percent_(100),
buffering_(false),
fader_active_(false),
use_fudge_timer_(false),
pipeline_(nullptr),
audiobin_(nullptr),
@@ -171,13 +172,13 @@ GstEnginePipeline::~GstEnginePipeline() {
}
gst_object_unref(GST_OBJECT(pipeline_));
pipeline_ = nullptr;
if (audiobin_ && !pipeline_connected_) {
if (audiobin_ && !pipeline_connected_.value()) {
gst_object_unref(GST_OBJECT(audiobin_));
}
audiobin_ = nullptr;
}
qLog(Debug) << "Pipeline" << id_ << "deleted";
qLog(Debug) << "Pipeline" << id() << "deleted";
}
@@ -238,10 +239,13 @@ void GstEnginePipeline::set_buffer_high_watermark(const double value) {
}
void GstEnginePipeline::set_proxy_settings(const QString &address, const bool authentication, const QString &user, const QString &pass) {
QMutexLocker l(&mutex_proxy_);
proxy_address_ = address;
proxy_authentication_ = authentication;
proxy_user_ = user;
proxy_pass_ = pass;
}
void GstEnginePipeline::set_channels(const bool enabled, const int channels) {
@@ -268,6 +272,7 @@ void GstEnginePipeline::set_spotify_login(const QString &spotify_username, const
spotify_password_ = spotify_password;
}
#endif // HAVE_SPOTIFY
QString GstEnginePipeline::GstStateText(const GstState state) {
@@ -291,7 +296,7 @@ QString GstEnginePipeline::GstStateText(const GstState state) {
GstElement *GstEnginePipeline::CreateElement(const QString &factory_name, const QString &name, GstElement *bin, QString &error) const {
QString unique_name = QLatin1String("pipeline") + QLatin1Char('-') + QString::number(id_) + QLatin1Char('-') + (name.isEmpty() ? factory_name : name);
QString unique_name = QLatin1String("pipeline") + QLatin1Char('-') + QString::number(id()) + QLatin1Char('-') + (name.isEmpty() ? factory_name : name);
GstElement *element = gst_element_factory_make(factory_name.toUtf8().constData(), unique_name.toUtf8().constData());
if (!element) {
@@ -310,6 +315,7 @@ void GstEnginePipeline::Disconnect() {
if (pipeline_) {
if (fader_) {
fader_active_ = false;
if (fader_->state() != QTimeLine::NotRunning) {
fader_->stop();
}
@@ -379,7 +385,7 @@ void GstEnginePipeline::Disconnect() {
bool GstEnginePipeline::Finish() {
qLog(Debug) << "Finishing pipeline" << id_;
qLog(Debug) << "Finishing pipeline" << id();
finish_requested_ = true;
@@ -392,15 +398,19 @@ bool GstEnginePipeline::Finish() {
SetStateAsync(GST_STATE_NULL);
}
return finished_;
return finished_.value();
}
bool GstEnginePipeline::InitFromUrl(const QUrl &media_url, const QUrl &stream_url, const QByteArray &gst_url, const qint64 end_nanosec, const double ebur128_loudness_normalizing_gain_db, QString &error) {
media_url_ = media_url;
stream_url_ = stream_url;
gst_url_ = gst_url;
{
QMutexLocker l(&mutex_url_);
media_url_ = media_url;
stream_url_ = stream_url;
gst_url_ = gst_url;
}
end_offset_nanosec_ = end_nanosec;
ebur128_loudness_normalizing_gain_db_ = ebur128_loudness_normalizing_gain_db;
@@ -448,7 +458,10 @@ bool GstEnginePipeline::InitFromUrl(const QUrl &media_url, const QUrl &stream_ur
flags &= ~GST_PLAY_FLAG_SOFT_VOLUME;
g_object_set(G_OBJECT(pipeline_), "flags", flags, nullptr);
g_object_set(G_OBJECT(pipeline_), "uri", gst_url.constData(), nullptr);
{
QMutexLocker l(&mutex_url_);
g_object_set(G_OBJECT(pipeline_), "uri", gst_url.constData(), nullptr);
}
pipeline_connected_ = true;
@@ -968,7 +981,7 @@ GstPadProbeReturn GstEnginePipeline::UpstreamEventsProbeCallback(GstPad *pad, Gs
switch (GST_EVENT_TYPE(e)) {
case GST_EVENT_SEGMENT:
if (!instance->segment_start_received_) {
if (!instance->segment_start_received_.value()) {
// The segment start time is used to calculate the proper offset of data buffers from the start of the stream
const GstSegment *segment = nullptr;
gst_event_parse_segment(e, &segment);
@@ -1008,7 +1021,7 @@ void GstEnginePipeline::ElementAddedCallback(GstBin *bin, GstBin *sub_bin, GstEl
}
instance->SetupVolume(volume);
instance->SetVolume(instance->volume_percent_);
instance->SetVolume(instance->volume_percent_.value());
}
@@ -1036,11 +1049,14 @@ void GstEnginePipeline::SourceSetupCallback(GstElement *playbin, GstElement *sou
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
if (g_object_class_find_property(G_OBJECT_GET_CLASS(source), "device") && !instance->source_device().isEmpty()) {
// Gstreamer is not able to handle device in URL (referring to Gstreamer documentation, this might be added in the future).
// Despite that, for now we include device inside URL: we decompose it during Init and set device here, when this callback is called.
qLog(Debug) << "Setting device";
g_object_set(source, "device", instance->source_device().toLocal8Bit().constData(), nullptr);
{
QMutexLocker l(&instance->mutex_source_device_);
if (g_object_class_find_property(G_OBJECT_GET_CLASS(source), "device") && !instance->source_device().isEmpty()) {
// Gstreamer is not able to handle device in URL (referring to Gstreamer documentation, this might be added in the future).
// Despite that, for now we include device inside URL: we decompose it during Init and set device here, when this callback is called.
qLog(Debug) << "Setting device";
g_object_set(source, "device", instance->source_device().toLocal8Bit().constData(), nullptr);
}
}
if (g_object_class_find_property(G_OBJECT_GET_CLASS(source), "user-agent")) {
@@ -1050,40 +1066,46 @@ void GstEnginePipeline::SourceSetupCallback(GstElement *playbin, GstElement *sou
}
if (g_object_class_find_property(G_OBJECT_GET_CLASS(source), "ssl-strict")) {
qLog(Debug) << "Turning" << (instance->strict_ssl_enabled_ ? "on" : "off") << "strict SSL";
g_object_set(source, "ssl-strict", instance->strict_ssl_enabled_ ? TRUE : FALSE, nullptr);
qLog(Debug) << "Turning" << (instance->strict_ssl_enabled_.value() ? "on" : "off") << "strict SSL";
g_object_set(source, "ssl-strict", instance->strict_ssl_enabled_.value() ? TRUE : FALSE, nullptr);
}
if (!instance->proxy_address_.isEmpty() && g_object_class_find_property(G_OBJECT_GET_CLASS(source), "proxy")) {
qLog(Debug) << "Setting proxy to" << instance->proxy_address_;
g_object_set(source, "proxy", instance->proxy_address_.toUtf8().constData(), nullptr);
if (instance->proxy_authentication_ &&
g_object_class_find_property(G_OBJECT_GET_CLASS(source), "proxy-id") &&
g_object_class_find_property(G_OBJECT_GET_CLASS(source), "proxy-pw") &&
!instance->proxy_user_.isEmpty() &&
!instance->proxy_pass_.isEmpty())
{
g_object_set(source, "proxy-id", instance->proxy_user_.toUtf8().constData(), "proxy-pw", instance->proxy_pass_.toUtf8().constData(), nullptr);
{
QMutexLocker l(&instance->mutex_proxy_);
if (!instance->proxy_address_.isEmpty() && g_object_class_find_property(G_OBJECT_GET_CLASS(source), "proxy")) {
qLog(Debug) << "Setting proxy to" << instance->proxy_address_;
g_object_set(source, "proxy", instance->proxy_address_.toUtf8().constData(), nullptr);
if (instance->proxy_authentication_ &&
g_object_class_find_property(G_OBJECT_GET_CLASS(source), "proxy-id") &&
g_object_class_find_property(G_OBJECT_GET_CLASS(source), "proxy-pw") &&
!instance->proxy_user_.isEmpty() &&
!instance->proxy_pass_.isEmpty())
{
g_object_set(source, "proxy-id", instance->proxy_user_.toUtf8().constData(), "proxy-pw", instance->proxy_pass_.toUtf8().constData(), nullptr);
}
}
}
#ifdef HAVE_SPOTIFY
if (instance->media_url_.scheme() == QStringLiteral("spotify")) {
if (g_object_class_find_property(G_OBJECT_GET_CLASS(source), "bitrate")) {
g_object_set(source, "bitrate", 2, nullptr);
}
if (!instance->spotify_username_.isEmpty() &&
!instance->spotify_password_.isEmpty() &&
g_object_class_find_property(G_OBJECT_GET_CLASS(source), "username") &&
g_object_class_find_property(G_OBJECT_GET_CLASS(source), "password")) {
g_object_set(source, "username", instance->spotify_username_.toUtf8().constData(), nullptr);
g_object_set(source, "password", instance->spotify_password_.toUtf8().constData(), nullptr);
{
QMutexLocker l(&instance->mutex_url_);
if (instance->media_url_.scheme() == QStringLiteral("spotify")) {
if (g_object_class_find_property(G_OBJECT_GET_CLASS(source), "bitrate")) {
g_object_set(source, "bitrate", 2, nullptr);
}
if (!instance->spotify_username_.isEmpty() &&
!instance->spotify_password_.isEmpty() &&
g_object_class_find_property(G_OBJECT_GET_CLASS(source), "username") &&
g_object_class_find_property(G_OBJECT_GET_CLASS(source), "password")) {
g_object_set(source, "username", instance->spotify_username_.toUtf8().constData(), nullptr);
g_object_set(source, "password", instance->spotify_password_.toUtf8().constData(), nullptr);
}
}
}
#endif
// If the pipeline was buffering we stop that now.
if (instance->buffering_) {
if (instance->buffering_.value()) {
qLog(Debug) << "Buffering finished";
instance->buffering_ = false;
Q_EMIT instance->BufferingFinished();
@@ -1099,12 +1121,13 @@ void GstEnginePipeline::NotifyVolumeCallback(GstElement *element, GParamSpec *pa
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
if (!instance->volume_set_) return;
if (!instance->volume_set_.value()) return;
g_object_get(G_OBJECT(instance->volume_), "volume", &instance->volume_internal_, nullptr);
const double volume_internal = instance->volume_internal_.value();
g_object_get(G_OBJECT(instance->volume_), "volume", &volume_internal, nullptr);
const uint volume_percent = static_cast<uint>(qBound(0L, lround(instance->volume_internal_ / 0.01), 100L));
if (volume_percent != instance->volume_percent_) {
const uint volume_percent = static_cast<uint>(qBound(0L, lround(instance->volume_internal_.value() / 0.01), 100L));
if (volume_percent != instance->volume_percent_.value()) {
instance->volume_percent_ = volume_percent;
Q_EMIT instance->VolumeChanged(volume_percent);
}
@@ -1137,8 +1160,8 @@ void GstEnginePipeline::PadAddedCallback(GstElement *element, GstPad *pad, gpoin
instance->pad_probe_cb_id_ = gst_pad_add_probe(pad, static_cast<GstPadProbeType>(GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH), PadProbeCallback, instance, nullptr);
instance->pipeline_connected_ = true;
if (instance->pending_seek_nanosec_ != -1 && instance->pipeline_active_) {
QMetaObject::invokeMethod(instance, "Seek", Qt::QueuedConnection, Q_ARG(qint64, instance->pending_seek_nanosec_));
if (instance->pending_seek_nanosec_.value() != -1 && instance->pipeline_active_.value()) {
QMetaObject::invokeMethod(instance, "Seek", Qt::QueuedConnection, Q_ARG(qint64, instance->pending_seek_nanosec_.value()));
instance->pending_seek_nanosec_ = -1;
}
@@ -1206,7 +1229,7 @@ GstPadProbeReturn GstEnginePipeline::BufferProbeCallback(GstPad *pad, GstPadProb
GstBuffer *buf = gst_pad_probe_info_get_buffer(info);
GstBuffer *buf16 = nullptr;
quint64 start_time = GST_BUFFER_TIMESTAMP(buf) - instance->segment_start_;
quint64 start_time = GST_BUFFER_TIMESTAMP(buf) - instance->segment_start_.value();
quint64 duration = GST_BUFFER_DURATION(buf);
qint64 end_time = static_cast<qint64>(start_time + duration);
@@ -1324,19 +1347,23 @@ GstPadProbeReturn GstEnginePipeline::BufferProbeCallback(GstPad *pad, GstPadProb
}
// Calculate the end time of this buffer so we can stop playback if it's after the end time of this song.
if (instance->end_offset_nanosec_ > 0 && end_time > instance->end_offset_nanosec_) {
if (instance->has_next_valid_url() && instance->next_stream_url_ == instance->stream_url_ && instance->next_beginning_offset_nanosec_ == instance->end_offset_nanosec_) {
// The "next" song is actually the next segment of this file - so cheat and keep on playing, but just tell the Engine we've moved on.
instance->end_offset_nanosec_ = instance->next_end_offset_nanosec_;
instance->next_media_url_.clear();
instance->next_stream_url_.clear();
instance->next_gst_url_.clear();
instance->next_beginning_offset_nanosec_ = 0;
instance->next_end_offset_nanosec_ = 0;
if (instance->end_offset_nanosec_.value() > 0 && end_time > instance->end_offset_nanosec_.value()) {
if (instance->HasNextUrl()) {
QMutexLocker mutex_locker_url(&instance->mutex_url_);
QMutexLocker mutex_locker_next_url(&instance->mutex_next_url_);
if (instance->next_stream_url_ == instance->stream_url_ && instance->next_beginning_offset_nanosec_ == instance->end_offset_nanosec_) {
// The "next" song is actually the next segment of this file - so cheat and keep on playing, but just tell the Engine we've moved on.
instance->end_offset_nanosec_ = instance->next_end_offset_nanosec_;
instance->next_media_url_.clear();
instance->next_stream_url_.clear();
instance->next_gst_url_.clear();
instance->next_beginning_offset_nanosec_ = 0;
instance->next_end_offset_nanosec_ = 0;
// GstEngine will try to seek to the start of the new section, but we're already there so ignore it.
instance->ignore_next_seek_ = true;
Q_EMIT instance->EndOfStreamReached(instance->id(), true);
// GstEngine will try to seek to the start of the new section, but we're already there so ignore it.
instance->ignore_next_seek_ = true;
Q_EMIT instance->EndOfStreamReached(instance->id(), true);
}
}
else {
// There's no next song
@@ -1354,11 +1381,14 @@ void GstEnginePipeline::AboutToFinishCallback(GstPlayBin *playbin, gpointer self
GstEnginePipeline *instance = reinterpret_cast<GstEnginePipeline*>(self);
qLog(Debug) << "Stream from URL" << instance->gst_url_ << "about to finish.";
{
QMutexLocker l(&instance->mutex_url_);
qLog(Debug) << "Stream from URL" << instance->gst_url_ << "about to finish.";
}
instance->about_to_finish_ = true;
if (instance->has_next_valid_url() && !instance->next_uri_set_) {
if (instance->HasNextUrl() && !instance->next_uri_set_.value()) {
instance->SetNextUrl();
}
@@ -1458,18 +1488,22 @@ void GstEnginePipeline::StreamStatusMessageReceived(GstMessage *msg) {
void GstEnginePipeline::StreamStartMessageReceived() {
if (next_uri_set_) {
qLog(Debug) << "Stream changed from URL" << gst_url_ << "to" << next_gst_url_;
if (next_uri_set_.value()) {
next_uri_set_ = false;
next_uri_reset_ = false;
about_to_finish_ = false;
media_url_ = next_media_url_;
stream_url_ = next_stream_url_;
gst_url_ = next_gst_url_;
{
QMutexLocker lock_url(&mutex_url_);
QMutexLocker lock_next_url(&mutex_next_url_);
qLog(Debug) << "Stream changed from URL" << gst_url_ << "to" << next_gst_url_;
media_url_ = next_media_url_;
stream_url_ = next_stream_url_;
gst_url_ = next_gst_url_;
next_stream_url_.clear();
next_media_url_.clear();
next_gst_url_.clear();
}
end_offset_nanosec_ = next_end_offset_nanosec_;
next_stream_url_.clear();
next_media_url_.clear();
next_gst_url_.clear();
next_beginning_offset_nanosec_ = 0;
next_end_offset_nanosec_ = 0;
@@ -1505,6 +1539,7 @@ void GstEnginePipeline::ElementMessageReceived(GstMessage *msg) {
const char *uri = gst_structure_get_string(structure, "new-location");
// Set the redirect URL. In mmssrc redirect messages come during the initial state change to PLAYING, so callers can pick up this URL after the state change has failed.
QMutexLocker l(&mutex_redirect_url_);
redirect_url_ = uri;
}
@@ -1523,7 +1558,7 @@ void GstEnginePipeline::ErrorMessageReceived(GstMessage *msg) {
g_error_free(error);
g_free(debugs);
if (pipeline_active_ && next_uri_set_ && (domain == GST_CORE_ERROR || domain == GST_RESOURCE_ERROR || domain == GST_STREAM_ERROR)) {
if (pipeline_active_.value() && next_uri_set_.value() && (domain == GST_CORE_ERROR || domain == GST_RESOURCE_ERROR || domain == GST_STREAM_ERROR)) {
// A track is still playing and the next uri is not playable. We ignore the error here so it can play until the end.
// But there is no message send to the bus when the current track finishes, we have to add an EOS ourself.
qLog(Info) << "Ignoring error" << domain << code << message << debugstr << "when loading next track";
@@ -1536,10 +1571,13 @@ void GstEnginePipeline::ErrorMessageReceived(GstMessage *msg) {
qLog(Error) << __FUNCTION__ << "ID:" << id() << "Domain:" << domain << "Code:" << code << "Error:" << message;
qLog(Error) << __FUNCTION__ << "ID:" << id() << "Domain:" << domain << "Code:" << code << "Debug:" << debugstr;
if (!redirect_url_.isEmpty() && debugstr.contains(QLatin1String("A redirect message was posted on the bus and should have been handled by the application."))) {
// mmssrc posts a message on the bus *and* makes an error message when it wants to do a redirect.
// We handle the message, but now we have to ignore the error too.
return;
{
QMutexLocker l(&mutex_redirect_url_);
if (!redirect_url_.isEmpty() && debugstr.contains(QLatin1String("A redirect message was posted on the bus and should have been handled by the application."))) {
// mmssrc posts a message on the bus *and* makes an error message when it wants to do a redirect.
// We handle the message, but now we have to ignore the error too.
return;
}
}
#ifdef Q_OS_WIN
@@ -1555,15 +1593,18 @@ void GstEnginePipeline::ErrorMessageReceived(GstMessage *msg) {
void GstEnginePipeline::TagMessageReceived(GstMessage *msg) {
if (ignore_tags_) return;
if (ignore_tags_.value()) return;
GstTagList *taglist = nullptr;
gst_message_parse_tag(msg, &taglist);
EngineMetadata engine_metadata;
engine_metadata.type = EngineMetadata::Type::Current;
engine_metadata.media_url = media_url_;
engine_metadata.stream_url = stream_url_;
{
QMutexLocker l(&mutex_url_);
engine_metadata.media_url = media_url_;
engine_metadata.stream_url = stream_url_;
}
engine_metadata.title = ParseStrTag(taglist, GST_TAG_TITLE);
engine_metadata.artist = ParseStrTag(taglist, GST_TAG_ARTIST);
engine_metadata.comment = ParseStrTag(taglist, GST_TAG_COMMENT);
@@ -1643,35 +1684,38 @@ void GstEnginePipeline::StateChangedMessageReceived(GstMessage *msg) {
qLog(Debug) << "Pipeline state changed from" << GstStateText(old_state) << "to" << GstStateText(new_state);
if (!pipeline_active_ && (new_state == GST_STATE_PAUSED || new_state == GST_STATE_PLAYING)) {
if (!pipeline_active_.value() && (new_state == GST_STATE_PAUSED || new_state == GST_STATE_PLAYING)) {
qLog(Debug) << "Pipeline is active";
pipeline_active_ = true;
if (pipeline_connected_) {
if (!volume_set_) {
SetVolume(volume_percent_);
if (pipeline_connected_.value()) {
if (!volume_set_.value()) {
SetVolume(volume_percent_.value());
}
if (pending_seek_nanosec_ != -1) {
if (next_uri_reset_ && new_state == GST_STATE_PAUSED) {
if (pending_seek_nanosec_.value() != -1) {
if (next_uri_reset_.value() && new_state == GST_STATE_PAUSED) {
qLog(Debug) << "Reverting next uri and going to playing state.";
next_uri_reset_ = false;
pending_state_ = GST_STATE_PLAYING;
SeekDelayed(pending_seek_nanosec_);
SeekDelayed(pending_seek_nanosec_.value());
pending_seek_nanosec_ = -1;
}
else {
SeekAsync(pending_seek_nanosec_);
SeekAsync(pending_seek_nanosec_.value());
pending_seek_nanosec_ = -1;
}
}
}
}
else if (pipeline_active_ && new_state != GST_STATE_PAUSED && new_state != GST_STATE_PLAYING) {
else if (pipeline_active_.value() && new_state != GST_STATE_PAUSED && new_state != GST_STATE_PLAYING) {
qLog(Debug) << "Pipeline is inactive";
pipeline_active_ = false;
if (next_uri_set_ && new_state == GST_STATE_READY) {
if (next_uri_set_.value() && new_state == GST_STATE_READY) {
next_uri_set_ = false;
g_object_set(G_OBJECT(pipeline_), "uri", gst_url_.constData(), nullptr);
{
QMutexLocker l(&mutex_url_);
g_object_set(G_OBJECT(pipeline_), "uri", gst_url_.constData(), nullptr);
}
if (pending_seek_nanosec_ == -1) {
qLog(Debug) << "Reverting next uri and going to playing state.";
SetStateAsync(GST_STATE_PLAYING);
@@ -1684,13 +1728,13 @@ void GstEnginePipeline::StateChangedMessageReceived(GstMessage *msg) {
}
}
if (pipeline_active_ && !buffering_) {
if (pending_seek_nanosec_ != -1 && new_state == GST_STATE_PAUSED) {
SeekAsync(pending_seek_nanosec_);
if (pipeline_active_.value() && !buffering_.value()) {
if (pending_seek_nanosec_.value() != -1 && new_state == GST_STATE_PAUSED) {
SeekAsync(pending_seek_nanosec_.value());
pending_seek_nanosec_ = -1;
}
else if (pending_state_ != GST_STATE_NULL) {
SetStateAsync(pending_state_);
else if (pending_state_.value() != GST_STATE_NULL) {
SetStateAsync(pending_state_.value());
pending_state_ = GST_STATE_NULL;
}
if (fader_ && fader_->state() != QTimeLine::State::Running && new_state == GST_STATE_PLAYING) {
@@ -1699,6 +1743,11 @@ void GstEnginePipeline::StateChangedMessageReceived(GstMessage *msg) {
}
}
if (new_state == GST_STATE_NULL && !finished_.value() && finish_requested_.value()) {
finished_ = true;
Q_EMIT Finished();
}
}
void GstEnginePipeline::BufferingMessageReceived(GstMessage *msg) {
@@ -1713,58 +1762,36 @@ void GstEnginePipeline::BufferingMessageReceived(GstMessage *msg) {
const GstState current_state = state();
if (percent < 100 && !buffering_) {
if (percent < 100 && !buffering_.value()) {
qLog(Debug) << "Buffering started";
buffering_ = true;
Q_EMIT BufferingStarted();
if (current_state == GST_STATE_PLAYING) {
SetStateAsync(GST_STATE_PAUSED);
if (pending_state_ == GST_STATE_NULL) {
if (pending_state_.value() == GST_STATE_NULL) {
pending_state_ = current_state;
}
}
}
else if (percent == 100 && buffering_) {
else if (percent == 100 && buffering_.value()) {
qLog(Debug) << "Buffering finished";
buffering_ = false;
Q_EMIT BufferingFinished();
if (pending_seek_nanosec_ != -1) {
SeekAsync(pending_seek_nanosec_);
if (pending_seek_nanosec_.value() != -1) {
SeekAsync(pending_seek_nanosec_.value());
pending_seek_nanosec_ = -1;
}
else if (pending_state_ != GST_STATE_NULL) {
SetStateAsync(pending_state_);
else if (pending_state_.value() != GST_STATE_NULL) {
SetStateAsync(pending_state_.value());
pending_state_ = GST_STATE_NULL;
}
}
else if (buffering_) {
else if (buffering_.value()) {
Q_EMIT BufferingProgress(percent);
}
}
qint64 GstEnginePipeline::position() const {
if (pipeline_active_) {
gint64 current_position = 0;
if (gst_element_query_position(pipeline_, GST_FORMAT_TIME, &current_position)) {
last_known_position_ns_ = current_position;
}
}
return last_known_position_ns_;
}
qint64 GstEnginePipeline::length() const {
gint64 value = 0;
if (pipeline_) gst_element_query_duration(pipeline_, GST_FORMAT_TIME, &value);
return value;
}
GstState GstEnginePipeline::state() const {
GstState s = GST_STATE_NULL, sp = GST_STATE_NULL;
@@ -1776,15 +1803,37 @@ GstState GstEnginePipeline::state() const {
}
qint64 GstEnginePipeline::length() const {
gint64 value = 0;
if (pipeline_) gst_element_query_duration(pipeline_, GST_FORMAT_TIME, &value);
return value;
}
qint64 GstEnginePipeline::position() const {
if (pipeline_active_.value()) {
gint64 current_position = 0;
if (gst_element_query_position(pipeline_, GST_FORMAT_TIME, &current_position)) {
last_known_position_ns_ = current_position;
}
}
return last_known_position_ns_;
}
QFuture<GstStateChangeReturn> GstEnginePipeline::SetStateAsync(const GstState state) {
qLog(Debug) << "Setting pipeline" << id_ << "state to" << GstStateText(state);
qLog(Debug) << "Setting pipeline" << id() << "state to" << GstStateText(state);
QFutureWatcher<GstStateChangeReturn> *watcher = new QFutureWatcher<GstStateChangeReturn>();
QObject::connect(watcher, &QFutureWatcher<GstStateChangeReturn>::finished, this, [this, watcher, state]() {
const GstStateChangeReturn state_change = watcher->result();
const GstStateChangeReturn state_change_return = watcher->result();
watcher->deleteLater();
SetStateAsyncFinished(state, state_change);
SetStateAsyncFinished(state, state_change_return);
});
QFuture<GstStateChangeReturn> future = QtConcurrent::run(&set_state_threadpool_, &gst_element_set_state, pipeline_, state);
watcher->setFuture(future);
@@ -1793,15 +1842,15 @@ QFuture<GstStateChangeReturn> GstEnginePipeline::SetStateAsync(const GstState st
}
void GstEnginePipeline::SetStateAsyncFinished(const GstState state, const GstStateChangeReturn state_change) {
void GstEnginePipeline::SetStateAsyncFinished(const GstState state, const GstStateChangeReturn state_change_return) {
switch (state_change) {
switch (state_change_return) {
case GST_STATE_CHANGE_SUCCESS:
case GST_STATE_CHANGE_ASYNC:
case GST_STATE_CHANGE_NO_PREROLL:
qLog(Debug) << "Pipeline" << id_ << "state successfully set to" << GstStateText(state);
Q_EMIT SetStateFinished(state_change);
if (!finished_ && finish_requested_) {
qLog(Debug) << "Pipeline" << id() << "state successfully set to" << GstStateText(state);
Q_EMIT SetStateFinished(state_change_return);
if (!finished_.value() && finish_requested_.value()) {
finished_ = true;
Q_EMIT Finished();
}
@@ -1829,17 +1878,17 @@ QFuture<GstStateChangeReturn> GstEnginePipeline::Play(const bool pause, const qu
bool GstEnginePipeline::Seek(const qint64 nanosec) {
if (ignore_next_seek_) {
if (ignore_next_seek_.value()) {
ignore_next_seek_ = false;
return true;
}
if (!pipeline_connected_ || !pipeline_active_) {
if (!pipeline_connected_.value() || !pipeline_active_.value()) {
pending_seek_nanosec_ = nanosec;
return true;
}
if (next_uri_set_) {
if (next_uri_set_.value()) {
pending_seek_nanosec_ = nanosec;
SetStateAsync(GST_STATE_READY);
return true;
@@ -1854,9 +1903,9 @@ bool GstEnginePipeline::Seek(const qint64 nanosec) {
if (success) {
qLog(Debug) << "Seek succeeded";
if (pending_state_ != GST_STATE_NULL) {
qLog(Debug) << "Setting state from pending state" << GstStateText(pending_state_);
SetStateAsync(pending_state_);
if (pending_state_.value() != GST_STATE_NULL) {
qLog(Debug) << "Setting state from pending state" << GstStateText(pending_state_.value());
SetStateAsync(pending_state_.value());
pending_state_ = GST_STATE_NULL;
}
}
@@ -1879,31 +1928,14 @@ void GstEnginePipeline::SeekDelayed(const qint64 nanosec) {
}
void GstEnginePipeline::SetEBUR128LoudnessNormalizingGain_dB(const double ebur128_loudness_normalizing_gain_db) {
ebur128_loudness_normalizing_gain_db_ = ebur128_loudness_normalizing_gain_db;
UpdateEBUR128LoudnessNormalizingGaindB();
}
void GstEnginePipeline::UpdateEBUR128LoudnessNormalizingGaindB() {
if (volume_ebur128_) {
auto dB_to_mult = [](const double gain_dB) { return std::pow(10., gain_dB / 20.); };
g_object_set(G_OBJECT(volume_ebur128_), "volume", dB_to_mult(ebur128_loudness_normalizing_gain_db_), nullptr);
}
}
void GstEnginePipeline::SetVolume(const uint volume_percent) {
if (volume_) {
const double volume_internal = static_cast<double>(volume_percent) * 0.01;
if (!volume_set_ || volume_internal != volume_internal_) {
if (!volume_set_.value() || volume_internal != volume_internal_.value()) {
volume_internal_ = volume_internal;
g_object_set(G_OBJECT(volume_), "volume", volume_internal, nullptr);
if (pipeline_active_) {
if (pipeline_active_.value()) {
volume_set_ = true;
}
}
@@ -1913,14 +1945,6 @@ void GstEnginePipeline::SetVolume(const uint volume_percent) {
}
void GstEnginePipeline::SetFaderVolume(const qreal volume) {
if (volume_fading_) {
g_object_set(G_OBJECT(volume_fading_), "volume", volume, nullptr);
}
}
void GstEnginePipeline::SetStereoBalance(const float value) {
stereo_balance_ = value;
@@ -1973,8 +1997,27 @@ void GstEnginePipeline::UpdateEqualizer() {
}
void GstEnginePipeline::SetEBUR128LoudnessNormalizingGain_dB(const double ebur128_loudness_normalizing_gain_db) {
ebur128_loudness_normalizing_gain_db_ = ebur128_loudness_normalizing_gain_db;
UpdateEBUR128LoudnessNormalizingGaindB();
}
void GstEnginePipeline::UpdateEBUR128LoudnessNormalizingGaindB() {
if (volume_ebur128_) {
auto dB_to_mult = [](const double gain_dB) { return std::pow(10., gain_dB / 20.); };
g_object_set(G_OBJECT(volume_ebur128_), "volume", dB_to_mult(ebur128_loudness_normalizing_gain_db_), nullptr);
}
}
void GstEnginePipeline::StartFader(const qint64 duration_nanosec, const QTimeLine::Direction direction, const QEasingCurve::Type shape, const bool use_fudge_timer) {
fader_active_ = true;
const qint64 duration_msec = duration_nanosec / kNsecPerMsec;
// If there's already another fader running then start from the same time that one was already at.
@@ -2007,17 +2050,25 @@ void GstEnginePipeline::StartFader(const qint64 duration_nanosec, const QTimeLin
SetFaderVolume(fader_->currentValue());
qLog(Debug) << "Pipeline" << id_ << "with state" << GstStateText(state()) << "set to fade from" << start_time;
qLog(Debug) << "Pipeline" << id() << "with state" << GstStateText(state()) << "set to fade from" << start_time;
if (pipeline_active_) {
if (pipeline_active_.value()) {
fader_->resume();
}
}
void GstEnginePipeline::SetFaderVolume(const qreal volume) {
if (volume_fading_) {
g_object_set(G_OBJECT(volume_fading_), "volume", volume, nullptr);
}
}
void GstEnginePipeline::ResumeFaderAsync() {
if (fader_) {
if (fader_active_.value()) {
QMetaObject::invokeMethod(&*fader_, "resume", Qt::QueuedConnection);
}
@@ -2025,8 +2076,9 @@ void GstEnginePipeline::ResumeFaderAsync() {
void GstEnginePipeline::FaderTimelineFinished() {
qLog(Debug) << "Pipeline" << id_ << "finished fading";
qLog(Debug) << "Pipeline" << id() << "finished fading";
fader_active_ = false;
fader_.reset();
// Wait a little while longer before emitting the finished signal (and probably destroying the pipeline) to account for delays in the audio server/driver.
@@ -2046,7 +2098,7 @@ void GstEnginePipeline::timerEvent(QTimerEvent *e) {
if (e->timerId() == fader_fudge_timer_.timerId()) {
fader_fudge_timer_.stop();
Q_EMIT FaderFinished(id_);
Q_EMIT FaderFinished(id());
return;
}
@@ -2054,6 +2106,54 @@ void GstEnginePipeline::timerEvent(QTimerEvent *e) {
}
bool GstEnginePipeline::HasNextUrl() const {
QMutexLocker l(&mutex_next_url_);
return next_stream_url_.isValid();
}
void GstEnginePipeline::PrepareNextUrl(const QUrl &media_url, const QUrl &stream_url, const QByteArray &gst_url, const qint64 beginning_nanosec, const qint64 end_nanosec) {
{
QMutexLocker l(&mutex_next_url_);
next_media_url_ = media_url;
next_stream_url_ = stream_url;
next_gst_url_ = gst_url;
}
next_beginning_offset_nanosec_ = beginning_nanosec;
next_end_offset_nanosec_ = end_nanosec;
if (about_to_finish_.value()) {
SetNextUrl();
}
}
void GstEnginePipeline::SetNextUrl() {
if (about_to_finish_.value() && HasNextUrl() && !next_uri_set_.value()) {
// Set the next uri. When the current song ends it will be played automatically and a STREAM_START message is send to the bus.
// When the next uri is not playable an error message is send when the pipeline goes to PLAY (or PAUSE) state or immediately if it is currently in PLAY state.
next_uri_set_ = true;
{
QMutexLocker l(&mutex_next_url_);
qLog(Debug) << "Setting next URL to" << next_gst_url_;
g_object_set(G_OBJECT(pipeline_), "uri", next_gst_url_.constData(), nullptr);
}
about_to_finish_ = false;
}
}
void GstEnginePipeline::SetSourceDevice(const QString &device) {
QMutexLocker l(&mutex_source_device_);
source_device_ = device;
}
void GstEnginePipeline::AddBufferConsumer(GstBufferConsumer *consumer) {
QMutexLocker l(&mutex_buffer_consumers_);
buffer_consumers_ << consumer;
@@ -2069,29 +2169,3 @@ void GstEnginePipeline::RemoveAllBufferConsumers() {
buffer_consumers_.clear();
}
void GstEnginePipeline::PrepareNextUrl(const QUrl &media_url, const QUrl &stream_url, const QByteArray &gst_url, const qint64 beginning_nanosec, const qint64 end_nanosec) {
next_media_url_ = media_url;
next_stream_url_ = stream_url;
next_gst_url_ = gst_url;
next_beginning_offset_nanosec_ = beginning_nanosec;
next_end_offset_nanosec_ = end_nanosec;
if (about_to_finish_) {
SetNextUrl();
}
}
void GstEnginePipeline::SetNextUrl() {
if (about_to_finish_ && has_next_valid_url() && !next_uri_set_) {
// Set the next uri. When the current song ends it will be played automatically and a STREAM_START message is send to the bus.
// When the next uri is not playable an error message is send when the pipeline goes to PLAY (or PAUSE) state or immediately if it is currently in PLAY state.
next_uri_set_ = true;
qLog(Debug) << "Setting next URL to" << next_gst_url_;
g_object_set(G_OBJECT(pipeline_), "uri", next_gst_url_.constData(), nullptr);
about_to_finish_ = false;
}
}