complete multiple udp send

This commit is contained in:
gxt_kt 2024-11-29 17:10:54 +08:00
parent c9b643b703
commit 2c2098b1b1
4 changed files with 78 additions and 76 deletions

View File

@ -25,7 +25,7 @@ def extract_forward_times(log_data, pattern_str):
return None, None
# 示例用法
log_file_path = "./log.log"
log_file_path = "./bin/log/20241128_log.log"
with open(log_file_path, 'r') as f:
log_data = f.read()

View File

@ -17,7 +17,9 @@
#include "Afx_Struct.h"
#include "SafeQue.h"
#include "ThostFtdcUserApiStruct.h"
#include "concurrentqueue.h"
#include "libhv_udp_server.h"
#include "yaml_config.h"
extern std::mutex id_times_mtx;
extern std::unordered_map<std::string, long> id_times;
@ -29,66 +31,66 @@ typedef struct SignalMsg {
using namespace hv;
#define SERVER_MD_QUE_SIZE 4096
CircularQueue<SignalMsg> server_signal_que(SERVER_MD_QUE_SIZE);
void DebugGetSignal(const SignalMsg& data) {
// gDebug(data.InstrumentID);
// tc.start(std::string(__PRETTY_FUNCTION__)+"push");
// server_signal_que.push(std::move(data));
// tc.stop(std::string(__PRETTY_FUNCTION__)+"push");
void DebugGetSignal(const SignalMsg &data) {
auto cur_time = gxt::GetTimeMs();
auto find = id_times.begin();
// gDebugWarn() << __PRETTY_FUNCTION__;
{
// gDebugLog(id_times.size());
// gDebugLog(data->signals().size());
std::lock_guard<std::mutex> _(id_times_mtx);
find = id_times.find(std::string(data.InstrumentID));
if (find != id_times.end()) {
// gDebugWarn() << gxt::format("id:{} take time {}ms",
// data->instrumentid(), find->second - cur_time);
gDebugLog() << gxt::format("id:{} take time {}ms", data.InstrumentID,
find->second - cur_time);
// gDebugWarn() << gxt::format("id:{} take time {}ms", data.InstrumentID,
// gDebugLog() << gxt::format("id:{} take time {}ms", data.InstrumentID,
// find->second - cur_time);
LOG_INFO(gxt::format("id:{} take time {}ms", data.InstrumentID,
find->second - cur_time));
} else {
// gDebugWarn() << gxt::format("cannot find");
gDebugLog() << gxt::format("cannot find {}", data.InstrumentID);
// gDebugLog() << gxt::format("cannot find {}", data.InstrumentID);
LOG_INFO(gxt::format("cannot find {}", data.InstrumentID));
}
}
}
class GetSignal {
public:
UdpServer srv;
GetSignal() {
int port = 1236;
class Server {
public:
Server(int n = 1, std::function<void(int, void *, size_t)> fun = nullptr)
: fun_(fun) {
static std::string server_port =
MyYAMLConfig::Get()["server_port"].as<std::string>();
int port = stoi(server_port);
int bindfd = srv.createsocket(port);
if (bindfd < 0) {
std::terminate();
}
printf("server bind on port %d, bindfd=%d ...\n", port, bindfd);
srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
gDebug(sizeof(SignalMsg));
if (buf->size() == sizeof(SignalMsg)) {
// gDebug() << "get !";
SignalMsg data;
memcpy(&data, buf->data(), buf->size());
gDebug(data.InstrumentID);
DebugGetSignal(std::move(data));
} else {
gDebugLog() << "not get !";
srvs_ = std::move(decltype(srvs_)(n));
for (int i = 0; i < n; i++) {
auto &srv = srvs_.at(i);
int bindfd = srv.createsocket(port + i);
if (bindfd < 0) {
std::terminate();
}
// echo
// printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
// channel->write(buf);
};
srv.start();
gDebugWarn() << gxt::format("wait read market data port:{}, bindfd={}",
port + i, bindfd);
srv.onMessage = [&, p = port + i](const SocketChannelPtr &channel,
Buffer *buf) {
fun_(p, buf->data(), buf->size());
// printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
// channel->write(buf);
};
srv.start();
}
}
private:
std::vector<UdpServer> srvs_;
std::function<void(int, void *, size_t)> fun_;
};
GetSignal get_market_data;
// int main(int argc, char* argv[]) { return 0; }
void ServerCallBack(int port, void *ptr, size_t size) {
if (size != sizeof(SignalMsg)) {
gDebugWarn() << "error size buf" << size;
std::terminate();
}
SignalMsg test;
memcpy(&test, ptr, size);
DebugGetSignal(std::move(test));
}
Server server(10, ServerCallBack);

View File

@ -11,10 +11,9 @@ moodycamel::ConcurrentQueue<CThostFtdcDepthMarketDataField> ques;
using namespace hv;
template <typename T>
class Client {
public:
Client(int n = 1, std::function<void(const T&)> fun = nullptr)
template <typename T> class Client {
public:
Client(int n = 1, std::function<void(const T &)> fun = nullptr)
: all_ports_n_(n), fun_(fun) {
static std::string client_ip =
MyYAMLConfig::Get()["client_ip"].as<std::string>();
@ -29,35 +28,35 @@ class Client {
// cli.sendto(str);
// });
for (int i = 0; i < n; i++) {
auto& cli = clis_.at(i);
auto &cli = clis_.at(i);
int sockfd = cli.createsocket(port + i, client_ip.c_str());
if (sockfd < 0) {
std::terminate();
}
gDebug() << gxt::format("begin listen {}:{}", client_ip, port + i);
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
gDebugWarn() << gxt::format("wait send market data to {}:{}", client_ip,
port + i);
cli.onMessage = [](const SocketChannelPtr &channel, Buffer *buf) {
gDebug(buf->size());
};
cli.start();
}
}
void Send(const T& val) {
void Send(const T &val) {
clis_.at(cnt_port_).sendto(&val, sizeof(val));
cnt_port_++;
cnt_port_ = cnt_port_ % all_ports_n_;
}
private:
private:
std::vector<UdpClient> clis_;
std::function<void(const T&)> fun_;
std::function<void(const T &)> fun_;
int all_ports_n_ = 0;
int cnt_port_ = 0;
};
template <typename T>
class MarketDataSend {
public:
template <typename T> class MarketDataSend {
public:
MarketDataSend(int threads_n = 1, int port_n = 1) : client(port_n) {
for (int i = 0; i < threads_n; i++) {
vecs.emplace_back(&MarketDataSend<T>::SendGrpcThreads, this);
@ -65,11 +64,11 @@ class MarketDataSend {
}
}
private:
private:
Client<T> client;
std::vector<std::thread> vecs;
private:
private:
void SendGrpcThreads() {
gDebug() << G_SPLIT_LINE;
while (true) {
@ -79,17 +78,12 @@ class MarketDataSend {
gxt::SleepUs(1);
continue;
}
auto time = std::to_string(gxt::GetTimeUs());
memcpy(val_one.reserve1, time.c_str(), time.size());
// std::vector<decltype(val_one)> val;
// val.emplace_back(std::move(val_one));
// TIME_BEGIN_US();
// auto time = std::to_string(gxt::GetTimeUs());
// memcpy(val_one.reserve1, time.c_str(), time.size());
client.Send(val_one);
static size_t send_delay_time_us =
MyYAMLConfig::Get()["send_delay_time_us"].as<size_t>();
gxt::SleepUs(send_delay_time_us);
// TIME_END();
// auto val = queues.pop_batch();
}
};
};

View File

@ -1,12 +1,18 @@
print_send : true
print_send : false
print_recv : false
login_file_path : "./info_files/登陆信息.ini"
# login_file_path : "./info_files/登陆信息_模拟盘.ini"
# client_ip : "192.168.1.153"
client_ip : "127.0.0.1"
client_port : "7890"
test_client_ip : "127.0.0.1"
test_client_port : "1236"
server_ip : "0.0.0.0"
server_port : "1236"
# 发送market data到目标主机 的ip和端口
client_ip : "112.13.94.58"
client_port : "7000"
# 发送一次是否进行延迟
send_delay_time_us : 0
# test_client_ip : "127.0.0.1"
# test_client_port : "7000"
#
# 本机接收signal的端口
server_port : "7000"