Skip to content

[ML] Track memory usage in CHierarchicalResultsNormalizer #2831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8f5a3ac
[ML] Enhance CHierarchicalResultsNormalizer with memory usage trackin…
valeriy42 Mar 16, 2025
84eab82
First unit test working
valeriy42 Mar 17, 2025
b93c2f1
Add memory usage
valeriy42 Mar 18, 2025
3c0b574
[ML] Add unit tests for CHierarchicalResultsNormalizer with memory us…
valeriy42 Mar 18, 2025
4c76090
[ML] Add memory usage tests for CConcreteHierarchicalResultsLevelSet
valeriy42 Mar 18, 2025
fbd9be2
Formatting
valeriy42 Mar 18, 2025
f4aa37a
Merge branch 'main' of https://github.com/elastic/ml-cpp into enhance…
valeriy42 Mar 19, 2025
a1384f2
Fix linting issues and add changelog
valeriy42 Mar 19, 2025
9696676
fix formatting
valeriy42 Mar 19, 2025
98e5105
Refactor CHierarchicalResultsNormalizer to improve variable naming an…
valeriy42 Mar 19, 2025
4899211
add comment to ignore sonar issue
valeriy42 Mar 19, 2025
048fab1
copyright formatting
valeriy42 Mar 19, 2025
ea559bd
formatting
valeriy42 Mar 19, 2025
9d63e74
Add limits parameter to CResultNormalizer constructor and remove unus…
valeriy42 Mar 19, 2025
2f3bae3
Remove unused staticSize method and update memory usage calculations …
valeriy42 Mar 19, 2025
f30c962
Refactor memory usage methods in SNormalizer and update Main.cc to us…
valeriy42 Mar 20, 2025
64ff6c4
Add export macro to CHierarchicalResultsLevelSet class
valeriy42 Mar 20, 2025
686264b
Add memory usage methods to CHierarchicalResultsAggregator and update…
valeriy42 Mar 20, 2025
04863d1
Remove debug memory usage methods from CHierarchicalResultsAggregator…
valeriy42 Mar 20, 2025
e58b698
Remove dead code
valeriy42 Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bin/normalize/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <model/CAnomalyDetectorModelConfig.h>

#include <api/CAnomalyJobConfig.h>
#include <api/CCsvInputParser.h>
#include <api/CCsvOutputWriter.h>
#include <api/CIoManager.h>
Expand Down Expand Up @@ -144,8 +145,12 @@ int main(int argc, char** argv) {
ioMgr.outputStream());
}()};

// Initialize memory limits with default values.
// This is fine as the normalizer doesn't use the memory limit.
ml::model::CLimits limits{false};

// This object will do the work
ml::api::CResultNormalizer normalizer{modelConfig, *outputWriter};
ml::api::CResultNormalizer normalizer{modelConfig, *outputWriter, limits};

// Restore state
if (!quantilesStateFile.empty()) {
Expand Down
4 changes: 4 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

== {es} version 9.1.0

=== Enhancements

* Track memory used in the hierarchical results normalizer. (See {ml-pull}2831[#2831].)

=== Bug Fixes

== {es} version 9.0.0
Expand Down
2 changes: 2 additions & 0 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! be pruned, i.e. those which are so old as to be effectively dead.
void pruneAllModels(std::size_t buckets = 0) const;

const model::CHierarchicalResultsNormalizer& normalizer() const;

private:
//! The job ID
std::string m_JobId;
Expand Down
3 changes: 2 additions & 1 deletion include/api/CResultNormalizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class API_EXPORT CResultNormalizer {

public:
CResultNormalizer(const model::CAnomalyDetectorModelConfig& modelConfig,
CSimpleOutputWriter& outputWriter);
CSimpleOutputWriter& outputWriter,
model::CLimits& limits);

//! Initialise the system change normalizer
bool initNormalizer(const std::string& stateFileName);
Expand Down
2 changes: 2 additions & 0 deletions include/model/CHierarchicalResultsAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#ifndef INCLUDED_ml_model_CHierarchicalResultsAggregator_h
#define INCLUDED_ml_model_CHierarchicalResultsAggregator_h

#include <core/CMemoryDef.h>

#include <model/CDetectorEqualizer.h>
#include <model/CHierarchicalResultsLevelSet.h>
#include <model/ImportExport.h>
Expand Down
27 changes: 27 additions & 0 deletions include/model/CHierarchicalResultsLevelSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#ifndef INCLUDED_ml_model_CHierarchicalResultsLevelSet_h
#define INCLUDED_ml_model_CHierarchicalResultsLevelSet_h

#include "model/ImportExport.h"
#include <core/CCompressedDictionary.h>

#include <maths/common/CChecksum.h>
Expand All @@ -21,6 +22,10 @@

#include <cstdint>

namespace CHierarchicalResultsLevelSetTest {
struct testMemoryUsage;
}

namespace ml {
namespace model {

Expand Down Expand Up @@ -240,6 +245,26 @@ class CHierarchicalResultsLevelSet : public CHierarchicalResultsVisitor {
return maths::common::CChecksum::calculate(seed, m_LeafSet);
}

void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const {
mem->setName("Hierarchical Results Level Set Memory Usage");
core::memory_debug::dynamicSize("m_BucketElement", m_BucketElement, mem);
core::memory_debug::dynamicSize("m_InfluencerBucketSet", m_InfluencerBucketSet, mem);
core::memory_debug::dynamicSize("m_InfluencerSet", m_InfluencerSet, mem);
core::memory_debug::dynamicSize("m_PartitionSet", m_PartitionSet, mem);
core::memory_debug::dynamicSize("m_PersonSet", m_PersonSet, mem);
core::memory_debug::dynamicSize("m_LeafSet", m_LeafSet, mem);
}

std::size_t memoryUsage() const {
std::size_t mem = core::memory::dynamicSize(m_BucketElement);
mem += core::memory::dynamicSize(m_InfluencerBucketSet);
mem += core::memory::dynamicSize(m_InfluencerSet);
mem += core::memory::dynamicSize(m_PartitionSet);
mem += core::memory::dynamicSize(m_PersonSet);
mem += core::memory::dynamicSize(m_LeafSet);
return mem;
}

private:
//! Get an element of \p set by name.
static const T* element(const TWordTypePrVec& set, const std::string& name) {
Expand Down Expand Up @@ -299,6 +324,8 @@ class CHierarchicalResultsLevelSet : public CHierarchicalResultsVisitor {
//! The container for leaves comprising distinct named
//! (partition, person) field name pairs.
TWordTypePrVec m_LeafSet;

friend struct CHierarchicalResultsLevelSetTest::testMemoryUsage;
};

template<typename T>
Expand Down
35 changes: 29 additions & 6 deletions include/model/CHierarchicalResultsNormalizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
#ifndef INCLUDED_ml_model_CHierarchicalResultsNormalizer_h
#define INCLUDED_ml_model_CHierarchicalResultsNormalizer_h

#include <core/CMemoryDef.h>
#include <core/CNonCopyable.h>

#include <model/CAnomalyScore.h>
#include <model/CHierarchicalResultsLevelSet.h>
#include <model/CLimits.h>
#include <model/CMonitoredResource.h>
#include <model/ImportExport.h>

#include <memory>
#include <string>
#include <utility>
#include <vector>

namespace ml {
Expand All @@ -44,6 +45,10 @@ struct MODEL_EXPORT SNormalizer {
//! Compute a checksum for this object.
uint64_t checksum() const;

void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const;

std::size_t memoryUsage() const;

std::string s_Description;
TNormalizerPtr s_Normalizer;
};
Expand Down Expand Up @@ -84,6 +89,7 @@ struct MODEL_EXPORT SNormalizer {
//! normalizers is negligible.
class MODEL_EXPORT CHierarchicalResultsNormalizer
: public CHierarchicalResultsLevelSet<hierarchical_results_normalizer_detail::SNormalizer>,
public CMonitoredResource,
private core::CNonCopyable {
public:
using TBase = CHierarchicalResultsLevelSet<hierarchical_results_normalizer_detail::SNormalizer>;
Expand All @@ -106,9 +112,10 @@ class MODEL_EXPORT CHierarchicalResultsNormalizer
enum ERestoreOutcome { E_Ok = 0, E_Corrupt = 1, E_Incomplete = 2 };

public:
CHierarchicalResultsNormalizer(const CAnomalyDetectorModelConfig& modelConfig);
CHierarchicalResultsNormalizer(CLimits& limits,
const CAnomalyDetectorModelConfig& modelConfig);

~CHierarchicalResultsNormalizer() override = default;
~CHierarchicalResultsNormalizer() override;

//! Add a job for the subsequent invocations of the normalizer.
void setJob(EJob job);
Expand Down Expand Up @@ -167,6 +174,19 @@ class MODEL_EXPORT CHierarchicalResultsNormalizer
const std::string& functionName,
const std::string& valueFieldName) const;

//! Get the memory used by this hierarchical results normalizer.
void debugMemoryUsage(const core::CMemoryUsage::TMemoryUsagePtr& mem) const override;

//! Return the total memory usage.
std::size_t memoryUsage() const override;

//! Get the static size of this object.
std::size_t staticSize() const override;

//! Update the overall model size stats with information from the
//! hierarchical results normalizer.
void updateModelSizeStats(CResourceMonitor::SModelSizeStats& modelSizeStats) const override;

private:
//! \brief Creates new normalizer instances.
class CNormalizerFactory {
Expand Down Expand Up @@ -210,15 +230,18 @@ class MODEL_EXPORT CHierarchicalResultsNormalizer
static std::string leafCue(const TWord& word);

private:
//! Configurable limits
CLimits& m_Limits;

//! The jobs that the normalizer will perform when invoked
//! can be: update, normalize or update + normalize.
EJob m_Job;
EJob m_Job{E_NoOp};

//! The model configuration file.
const CAnomalyDetectorModelConfig& m_ModelConfig;

//! Whether the last update of the quantiles has caused a big change.
bool m_HasLastUpdateCausedBigChange;
bool m_HasLastUpdateCausedBigChange{false};
};
}
}
Expand Down
6 changes: 3 additions & 3 deletions include/model/CResourceMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ class MODEL_EXPORT CResourceMonitor {
//! by calling this once per bucket processed until the initially requested memory limit is reached.
void decreaseMargin(core_t::TTime elapsedTime);

//! Returns the sum of used memory plus any extra memory
std::size_t totalMemory() const;

private:
using TMonitoredResourcePtrSizeUMap =
boost::unordered_map<CMonitoredResource*, std::size_t>;
Expand Down Expand Up @@ -218,9 +221,6 @@ class MODEL_EXPORT CResourceMonitor {
//! Get the low memory limit with margin applied.
std::size_t lowLimit() const;

//! Returns the sum of used memory plus any extra memory
std::size_t totalMemory() const;

//! Adjusts the amount of memory reported to take into
//! account the current value of the byte limit margin and the effects
//! of background persistence.
Expand Down
5 changes: 4 additions & 1 deletion lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ CAnomalyJob::CAnomalyJob(std::string jobId,
m_MaxDetectors{std::numeric_limits<size_t>::max()},
m_PersistenceManager{persistenceManager}, m_MaxQuantileInterval{maxQuantileInterval},
m_LastNormalizerPersistTime{core::CTimeUtils::now()},
m_Aggregator{modelConfig}, m_Normalizer{modelConfig} {
m_Aggregator{modelConfig}, m_Normalizer{limits, modelConfig} {
m_JsonOutputWriter.limitNumberRecords(maxAnomalyRecords);

m_Limits.resourceMonitor().memoryUsageReporter(
Expand Down Expand Up @@ -1652,6 +1652,9 @@ void CAnomalyJob::pruneAllModels(std::size_t buckets) const {
(buckets == 0) ? detector->pruneModels() : detector->pruneModels(buckets);
}
}
const model::CHierarchicalResultsNormalizer& CAnomalyJob::normalizer() const {
return m_Normalizer;
}

CAnomalyJob::TAnomalyDetectorPtr
CAnomalyJob::makeDetector(const model::CAnomalyDetectorModelConfig& modelConfig,
Expand Down
5 changes: 3 additions & 2 deletions lib/api/CResultNormalizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ const std::string CResultNormalizer::INFLUENCER_LEVEL("infl");
const std::string CResultNormalizer::ZERO("0");

CResultNormalizer::CResultNormalizer(const model::CAnomalyDetectorModelConfig& modelConfig,
CSimpleOutputWriter& outputWriter)
CSimpleOutputWriter& outputWriter,
model::CLimits& limits)
: m_ModelConfig(modelConfig), m_OutputWriter(outputWriter),
m_WriteFieldNames(true),
m_OutputFieldNormalizedScore(m_OutputFields[NORMALIZED_SCORE_NAME]),
m_Normalizer(m_ModelConfig) {
m_Normalizer(limits, m_ModelConfig) {
}

bool CResultNormalizer::initNormalizer(const std::string& stateFileName) {
Expand Down
22 changes: 22 additions & 0 deletions lib/api/unittest/CAnomalyJobTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1183,4 +1183,26 @@ BOOST_AUTO_TEST_CASE(testRestoreFromBadState) {
}
}

BOOST_AUTO_TEST_CASE(testHierarchicalResultsNormalizerShouldIncreaseMemoryUsage) {
model::CLimits limits;
auto jobConfig = CTestAnomalyJob::makeSimpleJobConfig("metric", "value", "", "", "");
auto modelConfig = model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE);
std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);
CTestAnomalyJob::TStrStrUMap const dataRows = {
{"time", "12345678"}, {"value", "1.0"}, {"greenhouse", "rhubarb"}};

BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
auto resourceMonitor = limits.resourceMonitor();
resourceMonitor.forceRefreshAll();
BOOST_TEST_REQUIRE(job.mutableNormalizer().memoryUsage() > 0);

// Unregister the normalizer and check that memory usage decreases
auto memoryUsageBeforeUnregister = resourceMonitor.totalMemory();
resourceMonitor.unRegisterComponent(job.mutableNormalizer());
resourceMonitor.forceRefreshAll();
BOOST_TEST_REQUIRE(resourceMonitor.totalMemory() < memoryUsageBeforeUnregister);
}
BOOST_AUTO_TEST_SUITE_END()
4 changes: 3 additions & 1 deletion lib/api/unittest/CJsonOutputWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <model/CAnomalyDetector.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CHierarchicalResultsNormalizer.h>
#include <model/CLimits.h>

#include <api/CGlobalCategoryId.h>
#include <api/CJsonOutputWriter.h>
Expand Down Expand Up @@ -1688,7 +1689,8 @@ BOOST_AUTO_TEST_CASE(testPersistNormalizer) {
ml::core::CJsonOutputStreamWrapper outputStream(sstream);
ml::api::CJsonOutputWriter writer("job", outputStream);

ml::model::CHierarchicalResultsNormalizer normalizer(modelConfig);
ml::model::CLimits limits(false);
ml::model::CHierarchicalResultsNormalizer normalizer(limits, modelConfig);
writer.persistNormalizer(normalizer, persistTime);
writer.finalise();
}
Expand Down
7 changes: 5 additions & 2 deletions lib/api/unittest/CResultNormalizerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <core/CLogger.h>

#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CLimits.h>

#include <api/CCsvInputParser.h>
#include <api/CNdJsonOutputWriter.h>
Expand All @@ -31,7 +32,8 @@ BOOST_AUTO_TEST_CASE(testInitNormalizerPartitioned) {

ml::api::CNdJsonOutputWriter outputWriter;

ml::api::CResultNormalizer normalizer(modelConfig, outputWriter);
ml::model::CLimits limits(false);
ml::api::CResultNormalizer normalizer(modelConfig, outputWriter, limits);

BOOST_TEST_REQUIRE(normalizer.initNormalizer("testfiles/new_quantilesState.json"));
LOG_DEBUG(<< "normalizer initialized");
Expand Down Expand Up @@ -390,7 +392,8 @@ BOOST_AUTO_TEST_CASE(testInitNormalizer) {

ml::api::CNdJsonOutputWriter outputWriter;

ml::api::CResultNormalizer normalizer(modelConfig, outputWriter);
ml::model::CLimits limits(false);
ml::api::CResultNormalizer normalizer(modelConfig, outputWriter, limits);

BOOST_TEST_REQUIRE(normalizer.initNormalizer("testfiles/quantilesState.json"));

Expand Down
4 changes: 4 additions & 0 deletions lib/api/unittest/CTestAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class CTestAnomalyJob : public ml::api::CAnomalyJob {
const std::string& summaryCountFieldName = "");

static ml::api::CAnomalyJobConfig makeJobConfig(const std::string& detectorsConfig);

ml::model::CHierarchicalResultsNormalizer& mutableNormalizer() const {
return const_cast<ml::model::CHierarchicalResultsNormalizer&>(this->normalizer());
}
};

#endif // INCLUDED_CTestAnomalyJob_h
1 change: 1 addition & 0 deletions lib/model/CHierarchicalResultsAggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <model/CHierarchicalResultsAggregator.h>

#include <core/CLogger.h>
#include <core/CMemoryDef.h>
#include <core/CPersistUtils.h>
#include <core/CStatePersistInserter.h>
#include <core/CStateRestoreTraverser.h>
Expand Down
Loading