diff --git a/src/tangara/database/database.cpp b/src/tangara/database/database.cpp index 2d72fe95..491ad8b7 100644 --- a/src/tangara/database/database.cpp +++ b/src/tangara/database/database.cpp @@ -6,9 +6,6 @@ #include "database/database.hpp" -#include -#include - #include #include #include @@ -20,12 +17,8 @@ #include #include -#include "collation.hpp" #include "cppbor.h" #include "cppbor_parse.h" -#include "database/index.hpp" -#include "database/track_finder.hpp" -#include "debug.hpp" #include "esp_log.h" #include "esp_timer.h" #include "ff.h" @@ -39,12 +32,14 @@ #include "leveldb/status.h" #include "leveldb/write_batch.h" +#include "collation.hpp" #include "database/db_events.hpp" #include "database/env_esp.hpp" +#include "database/index.hpp" #include "database/records.hpp" #include "database/tag_parser.hpp" #include "database/track.hpp" -#include "drivers/spi.hpp" +#include "database/track_finder.hpp" #include "events/event_queue.hpp" #include "memory_resource.hpp" #include "result.hpp" @@ -58,12 +53,16 @@ static SingletonEnv sEnv; static const char kDbPath[] = "/.tangara-db"; static const char kKeyDbVersion[] = "schema_version"; - static const char kKeyCustom[] = "U\0"; static const char kKeyCollator[] = "collator"; +static constexpr size_t kMaxParallelism = 2; + static std::atomic sIsDbOpen(false); +using std::placeholders::_1; +using std::placeholders::_2; + static auto CreateNewDatabase(leveldb::Options& options, locale::ICollator& col) -> leveldb::DB* { Database::Destroy(); @@ -167,7 +166,8 @@ auto Database::Open(ITagParser& parser, } ESP_LOGI(kTag, "Database opened successfully"); - return new Database(db, cache.release(), parser, collator); + return new Database(db, cache.release(), bg_worker, parser, + collator); }) .get(); } @@ -180,15 +180,20 @@ auto Database::Destroy() -> void { Database::Database(leveldb::DB* db, leveldb::Cache* cache, + tasks::WorkerPool& pool, ITagParser& tag_parser, locale::ICollator& collator) : db_(db), cache_(cache), + track_finder_( + pool, + kMaxParallelism, + std::bind(&Database::processCandidateCallback, this, _1, _2), + std::bind(&Database::indexingCompleteCallback, this)), tag_parser_(tag_parser), collator_(collator), is_updating_(false) { dbCalculateNextTrackId(); - ESP_LOGI(kTag, "next track id is %lu", next_track_id_.load()); } Database::~Database() { @@ -243,7 +248,7 @@ auto Database::get(const std::string& key) -> std::optional { } auto Database::getTrackPath(TrackId id) -> std::optional { - auto track_data = dbGetTrackData(id); + auto track_data = dbGetTrackData(leveldb::ReadOptions(), id); if (!track_data) { return {}; } @@ -251,7 +256,7 @@ auto Database::getTrackPath(TrackId id) -> std::optional { } auto Database::getTrack(TrackId id) -> std::shared_ptr { - std::shared_ptr data = dbGetTrackData(id); + std::shared_ptr data = dbGetTrackData(leveldb::ReadOptions(), id); if (!data || data->is_tombstoned) { return {}; } @@ -274,34 +279,61 @@ auto Database::getIndexes() -> std::vector { }; } -class UpdateNotifier { - public: - UpdateNotifier(std::atomic& is_updating) : is_updating_(is_updating) { - events::Ui().Dispatch(event::UpdateStarted{}); - events::System().Dispatch(event::UpdateStarted{}); +Database::UpdateTracker::UpdateTracker() + : num_old_tracks_(0), + num_new_tracks_(0), + start_time_(esp_timer_get_time()) { + events::Ui().Dispatch(event::UpdateStarted{}); + events::System().Dispatch(event::UpdateStarted{}); +} + +Database::UpdateTracker::~UpdateTracker() { + uint64_t end_time = esp_timer_get_time(); + + uint64_t time_per_old = 0; + if (num_old_tracks_) { + time_per_old = (verification_finish_time_ - start_time_) / num_old_tracks_; } - ~UpdateNotifier() { - is_updating_ = false; - events::Ui().Dispatch(event::UpdateFinished{}); - events::System().Dispatch(event::UpdateFinished{}); + uint64_t time_per_new = 0; + if (num_new_tracks_) { + time_per_new = (end_time - verification_finish_time_) / num_new_tracks_; } - private: - std::atomic& is_updating_; -}; + ESP_LOGI( + kTag, + "processed %lu old tracks and %lu new tracks in %llu seconds (%llums " + "per old, %llums per new)", + num_old_tracks_, num_new_tracks_, (end_time - start_time_) / 1000000, + time_per_old / 1000, time_per_new / 1000); + + events::Ui().Dispatch(event::UpdateFinished{}); + events::System().Dispatch(event::UpdateFinished{}); +} + +auto Database::UpdateTracker::onTrackVerified() -> void { + events::Ui().Dispatch(event::UpdateProgress{ + .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks, + .val = ++num_old_tracks_, + }); +} + +auto Database::UpdateTracker::onVerificationFinished() -> void { + verification_finish_time_ = esp_timer_get_time(); +} + +auto Database::UpdateTracker::onTrackAdded() -> void { + num_new_tracks_++; +} auto Database::updateIndexes() -> void { if (is_updating_.exchange(true)) { return; } - UpdateNotifier notifier{is_updating_}; - - uint32_t num_old_tracks = 0; - uint32_t num_new_tracks = 0; - uint64_t start_time = esp_timer_get_time(); + update_tracker_ = std::make_unique(); leveldb::ReadOptions read_options; - read_options.fill_cache = true; + read_options.fill_cache = false; + read_options.verify_checksums = true; // Stage 1: verify all existing tracks are still valid. ESP_LOGI(kTag, "verifying existing tracks"); @@ -310,11 +342,7 @@ auto Database::updateIndexes() -> void { std::string prefix = EncodeDataPrefix(); for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix); it->Next()) { - num_old_tracks++; - events::Ui().Dispatch(event::UpdateProgress{ - .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks, - .val = num_old_tracks, - }); + update_tracker_->onTrackVerified(); std::shared_ptr track = ParseDataValue(it->value()); if (!track) { @@ -325,7 +353,6 @@ auto Database::updateIndexes() -> void { } if (track->is_tombstoned) { - ESP_LOGW(kTag, "skipping tombstoned %lx", track->id); continue; } @@ -392,103 +419,86 @@ auto Database::updateIndexes() -> void { } } - uint64_t verify_end_time = esp_timer_get_time(); + update_tracker_->onVerificationFinished(); // Stage 2: search for newly added files. ESP_LOGI(kTag, "scanning for new tracks"); - uint64_t num_files = 0; - - auto track_finder = std::make_shared(""); + track_finder_.launch(""); +}; - FILINFO info; - while (auto path = track_finder->next(info)) { - num_files++; - events::Ui().Dispatch(event::UpdateProgress{ - .stage = event::UpdateProgress::Stage::kScanningForNewTracks, - .val = num_files, - }); +auto Database::processCandidateCallback(FILINFO& info, std::string_view path) + -> void { + leveldb::ReadOptions read_options; + read_options.fill_cache = true; + read_options.verify_checksums = false; - std::string unused; - if (db_->Get(read_options, EncodePathKey(*path), &unused).ok()) { - // This file is already in the database; skip it. - continue; - } + std::string unused; + if (db_->Get(read_options, EncodePathKey(path), &unused).ok()) { + // This file is already in the database; skip it. + return; + } - std::shared_ptr tags = tag_parser_.ReadAndParseTags(*path); - if (!tags || tags->encoding() == Container::kUnsupported) { - // No parseable tags; skip this fiile. - continue; - } + std::shared_ptr tags = tag_parser_.ReadAndParseTags(path); + if (!tags || tags->encoding() == Container::kUnsupported) { + // No parseable tags; skip this fiile. + return; + } - // Check for any existing track with the same hash. - uint64_t hash = tags->Hash(); - std::optional existing_id; - std::string raw_entry; - if (db_->Get(leveldb::ReadOptions(), EncodeHashKey(hash), &raw_entry) - .ok()) { - existing_id = ParseHashValue(raw_entry); - } + // Check for any existing track with the same hash. + uint64_t hash = tags->Hash(); + std::optional existing_id; + std::string raw_entry; + if (db_->Get(read_options, EncodeHashKey(hash), &raw_entry).ok()) { + existing_id = ParseHashValue(raw_entry); + } - std::shared_ptr data; - if (existing_id) { - // Do we have any existing data for this track? This could be the case if - // this is a tombstoned entry. In such as case, we want to reuse the - // previous TrackData so that any extra metadata is preserved. - data = dbGetTrackData(*existing_id); - if (!data) { - data = std::make_shared(); - data->id = *existing_id; - } else if (std::string_view{data->filepath} != *path) { - ESP_LOGW(kTag, "hash collision: %s, %s, %s", - tags->title().value_or("no title").c_str(), - tags->artist().value_or("no artist").c_str(), - tags->album().value_or("no album").c_str()); - // Don't commit anything if there's a hash collision, since we're - // likely to make a big mess. - continue; - } - } else { - num_new_tracks++; + std::shared_ptr data; + if (existing_id) { + // Do we have any existing data for this track? This could be the case if + // this is a tombstoned entry. In such as case, we want to reuse the + // previous TrackData so that any extra metadata is preserved. + data = dbGetTrackData(read_options, *existing_id); + if (!data) { data = std::make_shared(); - data->id = dbMintNewTrackId(); + data->id = *existing_id; + } else if (data->filepath != path && !data->is_tombstoned) { + ESP_LOGW(kTag, "hash collision: %s, %s, %s", + tags->title().value_or("no title").c_str(), + tags->artist().value_or("no artist").c_str(), + tags->album().value_or("no album").c_str()); + // Don't commit anything if there's a hash collision, since we're + // likely to make a big mess. + return; } + } else { + update_tracker_->onTrackAdded(); + data = std::make_shared(); + data->id = dbMintNewTrackId(); + } - // Make sure the file-based metadata on the TrackData is up to date. - data->filepath = *path; - data->tags_hash = hash; - data->modified_at = {info.fdate, info.ftime}; - - // Apply all the actual database changes as one atomic batch. This makes - // the whole 'new track' operation atomic, and also reduces the amount of - // lock contention when adding many tracks at once. - leveldb::WriteBatch batch; - dbIngestTagHashes(*tags, data->individual_tag_hashes, batch); - - dbCreateIndexesForTrack(*data, *tags, batch); - batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data)); - batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id)); - batch.Put(EncodePathKey(*path), TrackIdToBytes(data->id)); + // Make sure the file-based metadata on the TrackData is up to date. + data->filepath = path; + data->tags_hash = hash; + data->modified_at = {info.fdate, info.ftime}; + data->is_tombstoned = false; - db_->Write(leveldb::WriteOptions(), &batch); - }; + // Apply all the actual database changes as one atomic batch. This makes + // the whole 'new track' operation atomic, and also reduces the amount of + // lock contention when adding many tracks at once. + leveldb::WriteBatch batch; + dbIngestTagHashes(*tags, data->individual_tag_hashes, batch); - uint64_t end_time = esp_timer_get_time(); + dbCreateIndexesForTrack(*data, *tags, batch); + batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data)); + batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id)); + batch.Put(EncodePathKey(path), TrackIdToBytes(data->id)); - uint64_t time_per_old = 0; - if (num_old_tracks) { - time_per_old = (verify_end_time - start_time) / num_old_tracks; - } - uint64_t time_per_new = 0; - if (num_new_tracks) { - time_per_new = (end_time - verify_end_time) / num_new_tracks; - } + db_->Write(leveldb::WriteOptions(), &batch); +} - ESP_LOGI( - kTag, - "processed %lu old tracks and %lu new tracks in %llu seconds (%llums " - "per old, %llums per new)", - num_old_tracks, num_new_tracks, (end_time - start_time) / 1000000, - time_per_old / 1000, time_per_new / 1000); +auto Database::indexingCompleteCallback() -> void { + update_tracker_.reset(); + is_updating_ = false; } auto Database::isUpdating() -> bool { @@ -536,10 +546,11 @@ auto Database::dbMintNewTrackId() -> TrackId { return next_track_id_++; } -auto Database::dbGetTrackData(TrackId id) -> std::shared_ptr { +auto Database::dbGetTrackData(leveldb::ReadOptions options, TrackId id) + -> std::shared_ptr { std::string key = EncodeDataKey(id); std::string raw_val; - if (!db_->Get(leveldb::ReadOptions(), key, &raw_val).ok()) { + if (!db_->Get(options, key, &raw_val).ok()) { ESP_LOGW(kTag, "no key found for #%lx", id); return {}; } diff --git a/src/tangara/database/database.hpp b/src/tangara/database/database.hpp index 6994d0b8..6daffd23 100644 --- a/src/tangara/database/database.hpp +++ b/src/tangara/database/database.hpp @@ -23,6 +23,8 @@ #include "database/records.hpp" #include "database/tag_parser.hpp" #include "database/track.hpp" +#include "database/track_finder.hpp" +#include "ff.h" #include "leveldb/cache.h" #include "leveldb/db.h" #include "leveldb/iterator.h" @@ -93,22 +95,48 @@ class Database { leveldb::DB* db_; leveldb::Cache* cache_; + TrackFinder track_finder_; + // Not owned. ITagParser& tag_parser_; locale::ICollator& collator_; + /* Internal utility for tracking a currently in-progress index update. */ + class UpdateTracker { + public: + UpdateTracker(); + ~UpdateTracker(); + + auto onTrackVerified() -> void; + auto onVerificationFinished() -> void; + auto onTrackAdded() -> void; + + private: + uint32_t num_old_tracks_; + uint32_t num_new_tracks_; + uint64_t start_time_; + uint64_t verification_finish_time_; + }; + std::atomic is_updating_; + std::unique_ptr update_tracker_; + std::atomic next_track_id_; Database(leveldb::DB* db, leveldb::Cache* cache, + tasks::WorkerPool& pool, ITagParser& tag_parser, locale::ICollator& collator); + auto processCandidateCallback(FILINFO&, std::string_view) -> void; + auto indexingCompleteCallback() -> void; + auto dbCalculateNextTrackId() -> void; auto dbMintNewTrackId() -> TrackId; - auto dbGetTrackData(TrackId id) -> std::shared_ptr; + auto dbGetTrackData(leveldb::ReadOptions, TrackId id) + -> std::shared_ptr; auto dbCreateIndexesForTrack(const Track&, leveldb::WriteBatch&) -> void; auto dbCreateIndexesForTrack(const TrackData&, diff --git a/src/tangara/database/track_finder.cpp b/src/tangara/database/track_finder.cpp index 86948e70..21a44339 100644 --- a/src/tangara/database/track_finder.cpp +++ b/src/tangara/database/track_finder.cpp @@ -24,12 +24,12 @@ namespace database { static_assert(sizeof(TCHAR) == sizeof(char), "TCHAR must be CHAR"); -TrackFinder::TrackFinder(std::string_view root) +CandidateIterator::CandidateIterator(std::string_view root) : to_explore_(&memory::kSpiRamResource) { to_explore_.push_back({root.data(), root.size()}); } -auto TrackFinder::next(FILINFO& out_info) -> std::optional { +auto CandidateIterator::next(FILINFO& info) -> std::optional { std::scoped_lock lock{mut_}; while (!to_explore_.empty() || current_) { if (!current_) { @@ -49,7 +49,6 @@ auto TrackFinder::next(FILINFO& out_info) -> std::optional { } } - FILINFO info; FRESULT res = f_readdir(¤t_->second, &info); if (res != FR_OK || info.fname[0] == 0) { // No more files in the directory. @@ -71,14 +70,49 @@ auto TrackFinder::next(FILINFO& out_info) -> std::optional { to_explore_.push_back(full_path); } else { // This is a file! We can return now. - out_info = info; return {{full_path.data(), full_path.size()}}; } } } - // Out of things to explore. + // Out of paths to explore. return {}; } +TrackFinder::TrackFinder( + tasks::WorkerPool& pool, + size_t parallelism, + std::function processor, + std::function complete_cb) + : pool_{pool}, + parallelism_(parallelism), + processor_(processor), + complete_cb_(complete_cb) {} + +auto TrackFinder::launch(std::string_view root) -> void { + iterator_ = std::make_unique(root); + num_workers_ = parallelism_; + for (size_t i = 0; i < parallelism_; i++) { + schedule(); + } +} + +auto TrackFinder::schedule() -> void { + pool_.Dispatch([&]() { + FILINFO info; + auto next = iterator_->next(info); + if (next) { + std::invoke(processor_, info, *next); + schedule(); + } else { + std::scoped_lock lock{workers_mutex_}; + num_workers_ -= 1; + if (num_workers_ == 0) { + iterator_.reset(); + std::invoke(complete_cb_); + } + } + }); +} + } // namespace database diff --git a/src/tangara/database/track_finder.hpp b/src/tangara/database/track_finder.hpp index aba208e9..daaaa2f2 100644 --- a/src/tangara/database/track_finder.hpp +++ b/src/tangara/database/track_finder.hpp @@ -16,13 +16,27 @@ #include "ff.h" +#include "tasks.hpp" + namespace database { -class TrackFinder { +/* + * Iterator that recursively stats every file within the given directory root. + */ +class CandidateIterator { public: - TrackFinder(std::string_view root); + CandidateIterator(std::string_view root); + + /* + * Returns the next file. The stat result is placed within `out`. If the + * iterator has finished, returns absent. This method always modifies the + * contents of `out`, even if no file is returned. + */ + auto next(FILINFO& out) -> std::optional; - auto next(FILINFO&) -> std::optional; + // Cannot be copied or moved. + CandidateIterator(const CandidateIterator&) = delete; + CandidateIterator& operator=(const CandidateIterator&) = delete; private: std::mutex mut_; @@ -30,4 +44,34 @@ class TrackFinder { std::optional> current_; }; +/* + * Utility for iterating through each file within a directory root. Iteration + * can be sharded across several tasks. + */ +class TrackFinder { + public: + TrackFinder(tasks::WorkerPool&, + size_t parallelism, + std::function processor, + std::function complete_cb); + + auto launch(std::string_view root) -> void; + + // Cannot be copied or moved. + TrackFinder(const TrackFinder&) = delete; + TrackFinder& operator=(const TrackFinder&) = delete; + + private: + tasks::WorkerPool& pool_; + const size_t parallelism_; + const std::function processor_; + const std::function complete_cb_; + + std::mutex workers_mutex_; + std::unique_ptr iterator_; + size_t num_workers_; + + auto schedule() -> void; +}; + } // namespace database