libhv/test/test_udp_mul_server_main.cc

163 lines
3.5 KiB
C++

#include <detail/logfile.h>
#include <mutex>
#include "libhv_udp_server.h"
#include "test_struct.h"
using namespace hv;
// std::vector<long> times(100000);
std::atomic_long times{0};
std::atomic_long cnts{0};
class Server {
public:
Server(int n = 1, std::function<void(int, void*, size_t)> fun = nullptr)
: fun_(fun) {
static std::string server_port =
MyYAMLConfig::Get()["server_port"].as<std::string>();
int port = stoi(server_port);
srvs_ = std::move(decltype(srvs_)(n));
for (int i = 0; i < n; i++) {
auto& srv = srvs_.at(i);
int bindfd = srv.createsocket(port + i);
if (bindfd < 0) {
std::terminate();
}
gDebug() << gxt::format("server bind port:{}, bindfd={}", port + i,
bindfd);
srv.onMessage = [&, p = port + i](const SocketChannelPtr& channel,
Buffer* buf) {
fun_(p, buf->data(), buf->size());
// printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
// channel->write(buf);
};
srv.start();
}
}
private:
std::vector<UdpServer> srvs_;
std::function<void(int, void*, size_t)> fun_;
};
template <typename T>
class SafeQueue {
public:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
public:
void push(const T& item, bool bNotify = true);
void notify();
bool empty();
size_t size();
T pop();
bool full();
void reserve(const size_t& qCapacity);
};
template <typename T>
void SafeQueue<T>::push(const T& item, bool bNotify) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(item);
lock.unlock();
if (bNotify) {
m_cv.notify_one();
}
}
template <typename T>
void SafeQueue<T>::notify() {
m_cv.notify_all();
}
template <typename T>
bool SafeQueue<T>::empty() {
return m_queue.empty();
}
template <typename T>
size_t SafeQueue<T>::size() {
return m_queue.size();
}
template <typename T>
T SafeQueue<T>::pop() {
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [&]() { return !m_queue.empty(); });
T item = m_queue.front();
m_queue.pop();
return item;
}
template <typename T>
bool SafeQueue<T>::full() {
return !m_queue.empty();
}
template <typename T>
void SafeQueue<T>::reserve(const size_t& qCapacity) {};
// SafeQueue<TestStruct> ques;
#include "concurrentqueue.h"
moodycamel::ConcurrentQueue<TestStruct> ques;
void ServerCallBack(int port, void* ptr, size_t size) {
if (size != sizeof(TestStruct)) {
gDebugWarn() << "error size buf" << size;
std::terminate();
}
TestStruct test;
memcpy(&test, ptr, size);
std::string time_str = test.time;
auto time = stol(time_str);
auto cur_time = gxt::GetTimeUs();
times += (cur_time - time);
cnts++;
ques.enqueue(std::move(test));
// ques.enqueue(std::move(test));
// ques.push(std::move(test));
// printf("[%d]:time:%ld\n", port, cur_time - time);
// gDebugLog(cur_time - time);
}
int main(int argc, char* argv[]) {
static int ports = MyYAMLConfig::Get()["ports"].as<int>();
Server server(ports, ServerCallBack);
std::vector<std::thread> vecs;
for (int i = 0; i < 5; i++) {
vecs.emplace_back([&]() {
while (true) {
// auto val = ques.pop();
TestStruct val;
ques.try_dequeue(val);
gDebugLog() << val.id;
}
});
}
while (true) {
gxt::Sleep(3);
long time = times;
long cnt = cnts;
times = 0;
cnts = 0;
gDebugWarn((double)(time) / cnt);
}
return 0;
}