163 lines
3.5 KiB
C++
163 lines
3.5 KiB
C++
#include <detail/logfile.h>
|
|
|
|
#include <mutex>
|
|
|
|
#include "libhv_udp_server.h"
|
|
#include "test_struct.h"
|
|
|
|
using namespace hv;
|
|
|
|
// std::vector<long> times(100000);
|
|
std::atomic_long times{0};
|
|
std::atomic_long cnts{0};
|
|
|
|
class Server {
|
|
public:
|
|
Server(int n = 1, std::function<void(int, void*, size_t)> fun = nullptr)
|
|
: fun_(fun) {
|
|
static std::string server_port =
|
|
MyYAMLConfig::Get()["server_port"].as<std::string>();
|
|
int port = stoi(server_port);
|
|
|
|
srvs_ = std::move(decltype(srvs_)(n));
|
|
for (int i = 0; i < n; i++) {
|
|
auto& srv = srvs_.at(i);
|
|
int bindfd = srv.createsocket(port + i);
|
|
if (bindfd < 0) {
|
|
std::terminate();
|
|
}
|
|
gDebug() << gxt::format("server bind port:{}, bindfd={}", port + i,
|
|
bindfd);
|
|
|
|
srv.onMessage = [&, p = port + i](const SocketChannelPtr& channel,
|
|
Buffer* buf) {
|
|
fun_(p, buf->data(), buf->size());
|
|
// printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
|
|
// channel->write(buf);
|
|
};
|
|
srv.start();
|
|
}
|
|
}
|
|
|
|
private:
|
|
std::vector<UdpServer> srvs_;
|
|
std::function<void(int, void*, size_t)> fun_;
|
|
};
|
|
|
|
template <typename T>
|
|
class SafeQueue {
|
|
public:
|
|
std::queue<T> m_queue;
|
|
std::mutex m_mutex;
|
|
std::condition_variable m_cv;
|
|
|
|
public:
|
|
void push(const T& item, bool bNotify = true);
|
|
|
|
void notify();
|
|
|
|
bool empty();
|
|
|
|
size_t size();
|
|
|
|
T pop();
|
|
|
|
bool full();
|
|
|
|
void reserve(const size_t& qCapacity);
|
|
};
|
|
|
|
template <typename T>
|
|
void SafeQueue<T>::push(const T& item, bool bNotify) {
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
m_queue.push(item);
|
|
lock.unlock();
|
|
if (bNotify) {
|
|
m_cv.notify_one();
|
|
}
|
|
}
|
|
|
|
template <typename T>
|
|
void SafeQueue<T>::notify() {
|
|
m_cv.notify_all();
|
|
}
|
|
|
|
template <typename T>
|
|
bool SafeQueue<T>::empty() {
|
|
return m_queue.empty();
|
|
}
|
|
|
|
template <typename T>
|
|
size_t SafeQueue<T>::size() {
|
|
return m_queue.size();
|
|
}
|
|
|
|
template <typename T>
|
|
T SafeQueue<T>::pop() {
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
m_cv.wait(lock, [&]() { return !m_queue.empty(); });
|
|
T item = m_queue.front();
|
|
m_queue.pop();
|
|
return item;
|
|
}
|
|
|
|
template <typename T>
|
|
bool SafeQueue<T>::full() {
|
|
return !m_queue.empty();
|
|
}
|
|
|
|
template <typename T>
|
|
void SafeQueue<T>::reserve(const size_t& qCapacity) {};
|
|
|
|
// SafeQueue<TestStruct> ques;
|
|
#include "concurrentqueue.h"
|
|
moodycamel::ConcurrentQueue<TestStruct> ques;
|
|
|
|
|
|
void ServerCallBack(int port, void* ptr, size_t size) {
|
|
if (size != sizeof(TestStruct)) {
|
|
gDebugWarn() << "error size buf" << size;
|
|
std::terminate();
|
|
}
|
|
TestStruct test;
|
|
memcpy(&test, ptr, size);
|
|
std::string time_str = test.time;
|
|
auto time = stol(time_str);
|
|
auto cur_time = gxt::GetTimeUs();
|
|
times += (cur_time - time);
|
|
cnts++;
|
|
|
|
ques.enqueue(std::move(test));
|
|
// ques.enqueue(std::move(test));
|
|
// ques.push(std::move(test));
|
|
// printf("[%d]:time:%ld\n", port, cur_time - time);
|
|
// gDebugLog(cur_time - time);
|
|
}
|
|
|
|
int main(int argc, char* argv[]) {
|
|
static int ports = MyYAMLConfig::Get()["ports"].as<int>();
|
|
Server server(ports, ServerCallBack);
|
|
|
|
std::vector<std::thread> vecs;
|
|
for (int i = 0; i < 5; i++) {
|
|
vecs.emplace_back([&]() {
|
|
while (true) {
|
|
// auto val = ques.pop();
|
|
TestStruct val;
|
|
ques.try_dequeue(val);
|
|
gDebugLog() << val.id;
|
|
}
|
|
});
|
|
}
|
|
while (true) {
|
|
gxt::Sleep(3);
|
|
long time = times;
|
|
long cnt = cnts;
|
|
times = 0;
|
|
cnts = 0;
|
|
gDebugWarn((double)(time) / cnt);
|
|
}
|
|
|
|
return 0;
|
|
}
|