add udp time delay test and multi port test

This commit is contained in:
gxt_kt 2024-11-27 21:48:04 +08:00
parent 2999f9ae96
commit 835853e9fc
11 changed files with 4295 additions and 0 deletions

View File

@ -43,3 +43,6 @@ add_executable(udp_client src/udp_client_main.cc)
target_link_libraries(udp_client ${LIB})
add_executable(udp_server src/udp_server_main.cc)
target_link_libraries(udp_server ${LIB})
enable_testing()
add_subdirectory(test)

3858
include/concurrentqueue.h Normal file

File diff suppressed because it is too large Load Diff

13
test/CMakeLists.txt Normal file
View File

@ -0,0 +1,13 @@
find_package(yaml-cpp REQUIRED)
include_directories(yaml_config)
add_executable(test_udp_client test_udp_client_main.cc yaml_config/yaml_config.cc)
target_link_libraries(test_udp_client ${LIB} yaml-cpp)
add_executable(test_udp_server test_udp_server_main.cc yaml_config/yaml_config.cc)
target_link_libraries(test_udp_server ${LIB} yaml-cpp)
add_executable(test_udp_mul_client test_udp_mul_client_main.cc yaml_config/yaml_config.cc)
target_link_libraries(test_udp_mul_client ${LIB} yaml-cpp)
add_executable(test_udp_mul_server test_udp_mul_server_main.cc yaml_config/yaml_config.cc)
target_link_libraries(test_udp_mul_server ${LIB} yaml-cpp)

15
test/test_struct.h Normal file
View File

@ -0,0 +1,15 @@
#pragma once
#include <debugstream/debugstream.h>
#include "yaml_config.h"
struct TestStruct {
char id[100];
char time[100];
char reserve1[50];
char reserve2[50];
int int_[10];
long long_[10];
double double_[10];
};

View File

@ -0,0 +1,43 @@
#include "libhv_udp_client.h"
#include "test_struct.h"
using namespace hv;
int main(int argc, char* argv[]) {
UdpClient cli;
static std::string client_ip =
MyYAMLConfig::Get()["client_ip"].as<std::string>();
static std::string client_port =
MyYAMLConfig::Get()["client_port"].as<std::string>();
int sockfd = cli.createsocket(stoi(client_port), client_ip.c_str());
if (sockfd < 0) {
return -20;
}
gDebug() << gxt::format("begin listen {}:{}", client_ip, client_port);
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
gDebug(buf->size());
};
cli.start();
// sendto(time) every 3s
// cli.loop()->setInterval(3000, [&cli](TimerID timerID) {
// std::string str = gxt::format("{}", gxt::GetTimeUs());
// cli.sendto(str);
// });
TestStruct test;
int i = 10000;
TIME_BEGIN_US();
while (i--) {
auto time = gxt::GetTimeUs();
std::string time_str = std::to_string(time);
memcpy(test.time, time_str.c_str(), time_str.size());
test.time[time_str.size()] = '\0';
cli.sendto(&test, sizeof(test));
// gxt::SleepUs();
}
TIME_END();
return 0;
}

View File

@ -0,0 +1,72 @@
#include "libhv_udp_client.h"
#include "test_struct.h"
using namespace hv;
template <typename T>
class Client {
public:
Client(int n = 1, std::function<void(const T&)> fun = nullptr)
: all_ports_n_(n), fun_(fun) {
static std::string client_ip =
MyYAMLConfig::Get()["client_ip"].as<std::string>();
static std::string client_port =
MyYAMLConfig::Get()["client_port"].as<std::string>();
int port = stoi(client_port);
clis_ = std::move(decltype(clis_)(n));
// sendto(time) every 3s
// cli.loop()->setInterval(3000, [&cli](TimerID timerID) {
// std::string str = gxt::format("{}", gxt::GetTimeUs());
// cli.sendto(str);
// });
for (int i = 0; i < n; i++) {
auto& cli = clis_.at(i);
int sockfd = cli.createsocket(port + i, client_ip.c_str());
if (sockfd < 0) {
std::terminate();
}
gDebug() << gxt::format("begin listen {}:{}", client_ip, port + i);
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
gDebug(buf->size());
};
cli.start();
}
}
void Send(const T& val) {
clis_.at(cnt_port_).sendto(&val, sizeof(val));
cnt_port_++;
cnt_port_ = cnt_port_ % all_ports_n_;
}
private:
std::vector<UdpClient> clis_;
std::function<void(const T&)> fun_;
int all_ports_n_ = 0;
int cnt_port_ = 0;
};
int main(int argc, char* argv[]) {
static int ports = MyYAMLConfig::Get()["ports"].as<int>();
Client<TestStruct> clients(ports);
// while (true) {
// gxt::Sleep(3);
TestStruct test;
int datas_n = MyYAMLConfig::Get()["datas_n"].as<int>();
TIME_BEGIN_US();
while (datas_n--) {
auto time = gxt::GetTimeUs();
std::string time_str = std::to_string(time);
memcpy(test.time, time_str.c_str(), time_str.size());
test.time[time_str.size()] = '\0';
clients.Send(test);
// cli.sendto(&test, sizeof(test));
// gxt::SleepUs();
}
TIME_END();
// }
return 0;
}

View File

@ -0,0 +1,162 @@
#include <detail/logfile.h>
#include <mutex>
#include "libhv_udp_server.h"
#include "test_struct.h"
using namespace hv;
// std::vector<long> times(100000);
std::atomic_long times{0};
std::atomic_long cnts{0};
class Server {
public:
Server(int n = 1, std::function<void(int, void*, size_t)> fun = nullptr)
: fun_(fun) {
static std::string server_port =
MyYAMLConfig::Get()["server_port"].as<std::string>();
int port = stoi(server_port);
srvs_ = std::move(decltype(srvs_)(n));
for (int i = 0; i < n; i++) {
auto& srv = srvs_.at(i);
int bindfd = srv.createsocket(port + i);
if (bindfd < 0) {
std::terminate();
}
gDebug() << gxt::format("server bind port:{}, bindfd={}", port + i,
bindfd);
srv.onMessage = [&, p = port + i](const SocketChannelPtr& channel,
Buffer* buf) {
fun_(p, buf->data(), buf->size());
// printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
// channel->write(buf);
};
srv.start();
}
}
private:
std::vector<UdpServer> srvs_;
std::function<void(int, void*, size_t)> fun_;
};
template <typename T>
class SafeQueue {
public:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
public:
void push(const T& item, bool bNotify = true);
void notify();
bool empty();
size_t size();
T pop();
bool full();
void reserve(const size_t& qCapacity);
};
template <typename T>
void SafeQueue<T>::push(const T& item, bool bNotify) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(item);
lock.unlock();
if (bNotify) {
m_cv.notify_one();
}
}
template <typename T>
void SafeQueue<T>::notify() {
m_cv.notify_all();
}
template <typename T>
bool SafeQueue<T>::empty() {
return m_queue.empty();
}
template <typename T>
size_t SafeQueue<T>::size() {
return m_queue.size();
}
template <typename T>
T SafeQueue<T>::pop() {
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [&]() { return !m_queue.empty(); });
T item = m_queue.front();
m_queue.pop();
return item;
}
template <typename T>
bool SafeQueue<T>::full() {
return !m_queue.empty();
}
template <typename T>
void SafeQueue<T>::reserve(const size_t& qCapacity) {};
// SafeQueue<TestStruct> ques;
#include "concurrentqueue.h"
moodycamel::ConcurrentQueue<TestStruct> ques;
void ServerCallBack(int port, void* ptr, size_t size) {
if (size != sizeof(TestStruct)) {
gDebugWarn() << "error size buf" << size;
std::terminate();
}
TestStruct test;
memcpy(&test, ptr, size);
std::string time_str = test.time;
auto time = stol(time_str);
auto cur_time = gxt::GetTimeUs();
times += (cur_time - time);
cnts++;
ques.enqueue(std::move(test));
// ques.enqueue(std::move(test));
// ques.push(std::move(test));
// printf("[%d]:time:%ld\n", port, cur_time - time);
// gDebugLog(cur_time - time);
}
int main(int argc, char* argv[]) {
static int ports = MyYAMLConfig::Get()["ports"].as<int>();
Server server(ports, ServerCallBack);
std::vector<std::thread> vecs;
for (int i = 0; i < 5; i++) {
vecs.emplace_back([&]() {
while (true) {
// auto val = ques.pop();
TestStruct val;
ques.try_dequeue(val);
gDebugLog() << val.id;
}
});
}
while (true) {
gxt::Sleep(3);
long time = times;
long cnt = cnts;
times = 0;
cnts = 0;
gDebugWarn((double)(time) / cnt);
}
return 0;
}

View File

@ -0,0 +1,50 @@
#include "libhv_udp_server.h"
#include "test_struct.h"
using namespace hv;
int main(int argc, char* argv[]) {
static std::string server_port =
MyYAMLConfig::Get()["server_port"].as<std::string>();
UdpServer srv;
int bindfd = srv.createsocket(std::stol(server_port));
if (bindfd < 0) {
return -20;
}
gDebug() << gxt::format("server bind port:{}, bindfd={}", server_port,
bindfd);
TestStruct test;
srv.onMessage = [&](const SocketChannelPtr& channel, Buffer* buf) {
if (buf->size() != sizeof(TestStruct)) {
gDebugWarn() << "error size buf" << buf->size();
std::terminate();
}
memcpy(&test, buf->data(), buf->size());
std::string time_str = test.time;
auto time = stol(time_str);
auto cur_time = gxt::GetTimeUs();
gDebugLog(cur_time - time);
// printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
// channel->write(buf);
};
srv.start();
while (true) {
gxt::Sleep(1);
}
// std::string str;
// while (std::getline(std::cin, str)) {
// if (str == "close") {
// srv.closesocket();
// } else if (str == "start") {
// srv.start();
// } else if (str == "stop") {
// srv.stop();
// break;
// } else {
// srv.sendto(str);
// }
// }
return 0;
}

View File

@ -0,0 +1,56 @@
#include "yaml_config.h"
#include <filesystem>
#include <iostream>
YAML::Node ReadConfigYamlFile(const std::string& yaml_config_path);
YAML::Node& MyYAMLConfig::Get() {
static std::string yaml_config_path = []() -> std::string {
std::filesystem::path current_dir =
std::filesystem::path(__FILE__).parent_path();
std::filesystem::path yaml_path = current_dir / "yaml_config.yaml";
std::cout << "yaml file path: " << yaml_path << std::endl;
return yaml_path;
}();
// gDebugWarn(yaml_config_path);
static YAML::Node yaml_config = ReadConfigYamlFile(yaml_config_path);
return yaml_config;
// std::once_flag flag; // once_flag 对象
// yaml_config = ReadConfigYamlFile();
// auto fun=[this](){
// yaml_config = ReadConfigYamlFile();
// };
// std::call_once(flag, ]); // 确保 initialize 只被调用一次
}
YAML::Node ReadConfigYamlFile(const std::string& yaml_config_path) {
YAML::Node res;
std::cout << __PRETTY_FUNCTION__ << ": " << std::endl;
std::cout << "BEGIN READ FILE: " << yaml_config_path << std::endl;
bool read_successful_flag = false;
try {
// Load the YAML file
res = YAML::LoadFile(yaml_config_path);
read_successful_flag = true;
} catch (const YAML::Exception& e) {
std::cerr << "Error while reading the YAML file: " << yaml_config_path
<< e.what() << std::endl;
}
if (!read_successful_flag) {
// std::cerr << "backtrace:" << __PRETTY_FUNCTION__ << std::endl;
// std::cerr << "backtrace:" << __PRETTY_FUNCTION__ << std::endl;
// std::cerr << "backtrace:" << __PRETTY_FUNCTION__ << std::endl;
std::cerr << "Error while reading the YAML file!" << yaml_config_path
<< std::endl;
std::cerr << "Error while reading the YAML file!" << yaml_config_path
<< std::endl;
std::cerr << "Error while reading the YAML file!" << yaml_config_path
<< std::endl;
std::terminate();
}
std::cout << "Read yaml config file successfully! " << yaml_config_path
<< std::endl;
return res;
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <yaml-cpp/yaml.h>
#include <string>
class MyYAMLConfig {
public:
static YAML::Node& Get();
};

View File

@ -0,0 +1,13 @@
print_send : true
print_recv : false
client_ip : "127.0.0.1"
client_port : "7890"
# test_client_ip : "127.0.0.1"
# test_client_port : "1236"
server_ip : "0.0.0.0"
server_port : "7890"
# send_delay_time_us : 0
#
ports : 5
datas_n : 100000