Grant Limberg 8e6e4ede6d
Add prometheus metrics for Central controllers (#1969)
* add header-only prometheus lib to ext

* rename folder

* Undo rename directory

* prometheus simpleapi included on mac & linux

* wip

* wire up some controller stats

* Get windows building with prometheus

* bsd build flags for prometheus

* Fix multiple network join from environment entrypoint.sh.release (#1961)

* _bond_m guards _bond, not _paths_m (#1965)

* Fix: warning: mutex '_aqm_m' is not held on every path through here [-Wthread-safety-analysis] (#1964)

* Serve prom metrics from /metrics endpoint

* Add prom metrics for Central controller specific things

* reorganize metric initialization

* testing out a labled gauge on Networks

* increment error counter on throw

* Consolidate metrics definitions

Put all metric definitions into node/Metrics.hpp.  Accessed as needed
from there.

* Revert "testing out a labled gauge on Networks"

This reverts commit 499ed6d95e11452019cdf48e32ed4cd878c2705b.

* still blows up but adding to the record for completeness right now

* Fix runtime issues with metrics

* Add metrics files to visual studio project

* Missed an "extern"

* add copyright headers to new files

* Add metrics for sent/received bytes (total)

* put /metrics endpoint behind auth

* sendto returns int on Win32

---------

Co-authored-by: Leonardo Amaral <leleobhz@users.noreply.github.com>
Co-authored-by: Brenton Bostick <bostick@gmail.com>
2023-04-21 12:12:43 -07:00

206 lines
5.3 KiB
C++

#pragma once
#include "prometheus/collectable.h"
#include "prometheus/text_serializer.h"
#include "prometheus/metric_family.h"
#include <jdl/httpclientlite.h>
#include <memory>
#include <mutex>
#include <string>
#include <sstream>
#include <vector>
#include <map>
#include <future>
#include <algorithm>
#include <utility>
namespace prometheus {
class Gateway {
using CollectableEntry = std::pair<std::weak_ptr<Collectable>, std::string>;
std::string job_uri_;
std::string labels_;
std::mutex mutex_;
std::vector<CollectableEntry> collectables_;
enum class HttpMethod : uint8_t{
Post,
Put,
Delete,
};
public:
using Labels = std::map<std::string, std::string>;
Gateway(const std::string host, const std::string port,
const std::string jobname, const Labels& labels = {})
: job_uri_(host + ':' + port + std::string("/metrics/job/") + jobname)
, labels_{}
{
std::stringstream label_strm;
for (const auto& label : labels) {
label_strm << "/" << label.first << "/" << label.second;
}
labels_ = label_strm.str();
}
void RegisterCollectable(const std::weak_ptr<Collectable>& collectable,
const Labels* labels = nullptr) {
std::stringstream label_strm;
if (labels != nullptr) {
for (const auto& label : *labels) {
label_strm << "/" << label.first << "/" << label.second;
}
}
CleanupStalePointers(collectables_);
collectables_.emplace_back(std::make_pair(collectable, label_strm.str()));
}
static const Labels GetInstanceLabel(const std::string& hostname) {
if (hostname.empty()) {
return Gateway::Labels{};
}
return Gateway::Labels{{"instance", hostname}};
}
// Push metrics to the given pushgateway.
int Push() {
return push(HttpMethod::Post);
}
std::future<int> AsyncPush() {
return async_push(HttpMethod::Post);
}
// PushAdd metrics to the given pushgateway.
int PushAdd() {
return push(HttpMethod::Put);
}
std::future<int> AsyncPushAdd() {
return async_push(HttpMethod::Put);
}
// Delete metrics from the given pushgateway.
int Delete() {
return performHttpRequest(HttpMethod::Delete, job_uri_, {});
}
// Delete metrics from the given pushgateway.
std::future<int> AsyncDelete() {
return std::async(std::launch::async, [&] { return Delete(); });
}
private:
std::string getUri(const CollectableEntry& collectable) const {
return (job_uri_ + labels_ + collectable.second);
}
int performHttpRequest(HttpMethod /*method*/, const std::string& uri_str, const std::string& body) {
std::lock_guard<std::mutex> l(mutex_);
/* Stub function. The implementation will be later, after connecting the
* additional library of HTTP requests. */
jdl::URI uri(uri_str);
jdl::HTTPResponse response = jdl::HTTPClient::request(jdl::HTTPClient::m_post, uri, body);
return std::stoi(response.response);
}
int push(HttpMethod method) {
const auto serializer = TextSerializer{};
for (const auto& wcollectable : collectables_) {
auto collectable = wcollectable.first.lock();
if (!collectable) {
continue;
}
auto metrics = collectable->Collect();
auto uri = getUri(wcollectable);
std::stringstream body;
serializer.Serialize(body, metrics);
std::string body_str = body.str();
auto status_code = performHttpRequest(method, uri, body_str);
if (status_code < 100 || status_code >= 400) {
return status_code;
}
}
return 200;
}
std::future<int> async_push(HttpMethod method) {
const auto serializer = TextSerializer{};
std::vector<std::future<int>> futures;
for (const auto& wcollectable : collectables_) {
auto collectable = wcollectable.first.lock();
if (!collectable) {
continue;
}
auto metrics = collectable->Collect();
auto uri = getUri(wcollectable);
std::stringstream body;
serializer.Serialize(body, metrics);
auto body_ptr = std::make_shared<std::string>(body.str());
futures.emplace_back(std::async(std::launch::async, [method, &uri, &body_ptr, this] {
return performHttpRequest(method, uri, *body_ptr);
}));
}
const auto reduceFutures = [](std::vector<std::future<int>> lfutures) {
auto final_status_code = 200;
for (auto& future : lfutures) {
auto status_code = future.get();
if (status_code < 100 || status_code >= 400) {
final_status_code = status_code;
}
}
return final_status_code;
};
return std::async(std::launch::async, reduceFutures, std::move(futures));
}
static void CleanupStalePointers(std::vector<CollectableEntry>& collectables) {
collectables.erase(std::remove_if(std::begin(collectables), std::end(collectables),
[](const CollectableEntry& candidate) {
return candidate.first.expired();
}),
std::end(collectables));
}
};
} // namespace prometheus