Skip to content

Commit f0410b8

Browse files
author
Alexandre jublot
committed
fix: changed receivedCallbacks return type to int to handle exclusion of it
1 parent 8cb7002 commit f0410b8

20 files changed

Lines changed: 146 additions & 74 deletions

include/polymorph/network/tcp/IPacketHandler.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ namespace polymorph::network::tcp {
2828
* @param handler The receive handler function
2929
*/
3030
template<typename T>
31-
void registerReceiveHandler(polymorph::network::OpId opId, std::function<bool(const PacketHeader &, const T &)> handler)
31+
void registerReceiveHandler(polymorph::network::OpId opId, std::function<int(const PacketHeader &, const T &)> handler)
3232
{
3333
_registerReceiveHandler(opId, [handler](const PacketHeader &header, const std::vector<std::byte> &bytes) {
3434
Packet<T> packet = SerializerTrait<Packet<T>>::deserialize(bytes);
@@ -45,6 +45,6 @@ namespace polymorph::network::tcp {
4545

4646

4747
private:
48-
virtual void _registerReceiveHandler(polymorph::network::OpId opId, std::function<bool(const PacketHeader &, const std::vector<std::byte> &)> handler) = 0;
48+
virtual void _registerReceiveHandler(polymorph::network::OpId opId, std::function<int(const PacketHeader &, const std::vector<std::byte> &)> handler) = 0;
4949
};
5050
}

include/polymorph/network/udp/IPacketHandler.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ namespace polymorph::network::udp {
2828
* @param handler The receive handler function
2929
*/
3030
template<typename T>
31-
void registerReceiveHandler(polymorph::network::OpId opId, std::function<void(const PacketHeader &, const T &)> handler)
31+
void registerReceiveHandler(polymorph::network::OpId opId, std::function<int(const PacketHeader &, const T &)> handler)
3232
{
3333
_registerReceiveHandler(opId, [handler](const PacketHeader &header, const std::vector<std::byte> &bytes) {
3434
Packet<T> packet = SerializerTrait<Packet<T>>::deserialize(bytes);
@@ -45,6 +45,6 @@ namespace polymorph::network::udp {
4545

4646

4747
private:
48-
virtual void _registerReceiveHandler(polymorph::network::OpId opId, std::function<void(const PacketHeader &, const std::vector<std::byte> &)> handler) = 0;
48+
virtual void _registerReceiveHandler(polymorph::network::OpId opId, std::function<int(const PacketHeader &, const std::vector<std::byte> &)> handler) = 0;
4949
};
5050
}

src/include/tcp/PacketHandler.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ namespace polymorph::network::tcp
5252
* @brief The packet handlers are functions that will be called when a packet with the corresponding operation code is received
5353
*/
5454
std::map<polymorph::network::OpId,
55-
std::vector<std::function<bool(const PacketHeader &header, const std::vector<std::byte> &bytes)>>> _receiveCallbacks;
55+
std::vector<std::shared_ptr<std::function<int(const PacketHeader &header, const std::vector<std::byte> &bytes)>>>> _receiveCallbacks;
5656
/**
5757
* @property Thread that will handle the network operations (asio context)
5858
*/
@@ -83,7 +83,7 @@ namespace polymorph::network::tcp
8383

8484

8585
private:
86-
void _registerReceiveHandler(polymorph::network::OpId opId, std::function<bool(const PacketHeader &, const std::vector<std::byte> &)> handler) override;
86+
void _registerReceiveHandler(polymorph::network::OpId opId, std::function<int(const PacketHeader &, const std::vector<std::byte> &)> handler) override;
8787

8888

8989

src/include/udp/AConnector.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ namespace polymorph::network::udp {
6464

6565
std::atomic<bool> _receiveInProgress;
6666

67+
std::atomic<bool> _stopping = false;
68+
6769
std::mutex _sendQueueMutex;
6870

6971
asio::ip::udp::endpoint _endpoint;
@@ -91,6 +93,8 @@ namespace polymorph::network::udp {
9193

9294
bool isReceiveInProgress() const;
9395

96+
void stop();
97+
9498
private:
9599
/**
96100
* @brief Start a receive operation

src/include/udp/APacketHandler.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ namespace polymorph::network::udp
5858
/**
5959
* @property The callbacks to call when a packet is received
6060
*/
61-
std::map<OpId, std::vector<std::function<void(const PacketHeader &, const std::vector<std::byte> &)>>> _receiveCallbacks;
61+
std::map<OpId, std::vector<std::shared_ptr<std::function<int(const PacketHeader &, const std::vector<std::byte> &)>>>> _receiveCallbacks;
6262

6363
/**
6464
* @property The callbacks to call when a packet is sent
@@ -110,7 +110,7 @@ namespace polymorph::network::udp
110110
*/
111111
void _broadcastReceivedPacket(const PacketHeader &header, const std::vector<std::byte> &bytes);
112112

113-
void _registerReceiveHandler(polymorph::network::OpId opId, std::function<void(const PacketHeader &, const std::vector<std::byte> &)> handler) override;
113+
void _registerReceiveHandler(polymorph::network::OpId opId, std::function<int(const PacketHeader &, const std::vector<std::byte> &)> handler) override;
114114

115115

116116

src/include/udp/PacketStore.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ namespace polymorph::network::udp
2626
////////////////////// CONSTRUCTORS/DESTRUCTORS /////////////////////////
2727

2828
public:
29-
PacketStore(asio::io_context &context, std::map<OpId, bool> safeties, std::function<void(std::shared_ptr<SafePacketManager>)> resendCallback);
29+
PacketStore(asio::io_context &context, std::map<OpId, bool> safeties);
3030

3131

3232
~PacketStore() = default;
@@ -113,6 +113,8 @@ namespace polymorph::network::udp
113113
*/
114114
void confirmSent(const asio::ip::udp::endpoint &endpoint, const PacketId &id);
115115

116+
void setResendCallback(std::function<void(std::shared_ptr<SafePacketManager>)> callback);
117+
116118
private:
117119

118120
/**

src/src/tcp/ClientImpl.cpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,16 @@ void polymorph::network::tcp::ClientImpl::connect(std::function<void(bool, Sessi
3737

3838
_doReceive();
3939

40-
ConnectionDto dto{ 0, 0};
41-
send<ConnectionDto>(ConnectionDto::opId, dto);
42-
_isConnecting = false;
43-
4440
registerReceiveHandler<ConnectionResponseDto>(ConnectionResponseDto::opId, [this, callback](const PacketHeader &header, const ConnectionResponseDto &payload) {
4541
_isConnected = payload.authorized;
4642
_currentSession = header.sId;
4743
callback(payload.authorized, header.sId);
4844
return true;
4945
});
46+
47+
ConnectionDto dto{ 0, 0};
48+
send<ConnectionDto>(ConnectionDto::opId, dto);
49+
_isConnecting = false;
5050
});
5151
_run();
5252
}
@@ -66,15 +66,16 @@ void polymorph::network::tcp::ClientImpl::connectWithSession(polymorph::network:
6666

6767
_doReceive();
6868

69-
ConnectionDto dto{ .sessionId = session, .authKey = authKey };
70-
send<ConnectionDto>(ConnectionDto::opId, dto);
71-
_isConnecting = false;
72-
7369
registerReceiveHandler<ConnectionResponseDto>(ConnectionResponseDto::opId, [this, callback](const PacketHeader &header, const ConnectionResponseDto &payload) {
7470
_isConnected = payload.authorized;
7571
callback(payload.authorized, header.sId);
7672
return true;
7773
});
74+
75+
76+
ConnectionDto dto{ .sessionId = session, .authKey = authKey };
77+
send<ConnectionDto>(ConnectionDto::opId, dto);
78+
_isConnecting = false;
7879
});
7980
_run();
8081
}

src/src/tcp/PacketHandler.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,28 @@ bool
2222
polymorph::network::tcp::APacketHandler::packetReceived(const PacketHeader&, const std::vector<std::byte> &bytes)
2323
{
2424
PacketHeader header {0};
25+
int res;
26+
std::vector<std::shared_ptr<std::function<int(const PacketHeader &header, const std::vector<std::byte> &bytes)>>> callbacksToPop;
2527

2628
try {
2729
header = SerializerTrait<PacketHeader>::deserialize(bytes);
2830
if (_receiveCallbacks.contains(header.opId)) {
2931
for (auto &callback : _receiveCallbacks[header.opId]) {
30-
if (!callback(header, bytes))
32+
res = (*callback)(header, bytes);
33+
if (res == -1)
3134
return false;
35+
if (res == 0)
36+
callbacksToPop.push_back(callback);
3237
}
3338
}
3439
} catch (const exceptions::DeserializingException &e) {
3540
std::cerr << "Error while deserializing packet header: " << e.what() << std::endl;
3641
return false;
3742
}
43+
for (auto &toPop: callbacksToPop) {
44+
auto it = std::find(_receiveCallbacks[header.opId].begin(), _receiveCallbacks[header.opId].end(), toPop);
45+
_receiveCallbacks[header.opId].erase(it);
46+
}
3847
return true;
3948
}
4049

@@ -50,7 +59,7 @@ void polymorph::network::tcp::APacketHandler::_run()
5059
});
5160
}
5261

53-
void polymorph::network::tcp::APacketHandler::_registerReceiveHandler(polymorph::network::OpId opId, std::function<bool(const PacketHeader &, const std::vector<std::byte> &)> handler)
62+
void polymorph::network::tcp::APacketHandler::_registerReceiveHandler(polymorph::network::OpId opId, std::function<int(const PacketHeader &, const std::vector<std::byte> &)> handler)
5463
{
55-
_receiveCallbacks[opId].emplace_back(std::move(handler));
64+
_receiveCallbacks[opId].emplace_back(std::make_shared<std::function<int(const PacketHeader &, const std::vector<std::byte> &)>>(std::move(handler)));
5665
}

src/src/udp/AConnector.cpp

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,38 +36,44 @@ void polymorph::network::udp::AConnector::_doReceive()
3636
{
3737
_receiveInProgress = false;
3838
_socket.async_receive_from(asio::buffer(_receiveBuffer), _endpoint,
39-
[this](const asio::error_code &error, std::size_t bytesReceived) {
40-
if (error) {
41-
std::cerr << "Error while receiving data: " << error.message() << std::endl;
42-
return;
43-
}
44-
_receiveInProgress = true;
45-
std::vector<std::byte> data(_receiveBuffer.begin(), _receiveBuffer.begin() + bytesReceived);
46-
_determinePacket(data);
47-
_doReceive();
48-
});
39+
[this](const asio::error_code &error, std::size_t bytesReceived) {
40+
if (error == asio::error::operation_aborted || _stopping)
41+
return;
42+
if (error) {
43+
std::cerr << "Error while receiving data: " << error.message() << std::endl;
44+
return;
45+
}
46+
_receiveInProgress = true;
47+
std::vector<std::byte> data(_receiveBuffer.begin(), _receiveBuffer.begin() + bytesReceived);
48+
_determinePacket(data);
49+
_doReceive();
50+
}
51+
);
4952
}
5053

5154
void polymorph::network::udp::AConnector::_doSend()
5255
{
5356
std::lock_guard<std::mutex> lock(_sendQueueMutex);
5457
_socket.async_send_to(asio::buffer(_sendQueue.front().second), _sendQueue.front().first,
55-
[this](const asio::error_code &error, std::size_t) {
56-
if (error) {
57-
std::cerr << "Error while sending packet: " << error.message() << std::endl;
58-
return;
59-
}
60-
std::unique_lock<std::mutex> lock(_sendQueueMutex);
61-
auto header = SerializerTrait<PacketHeader>::deserialize(_sendQueue.front().second);
62-
_packetSent(_sendQueue.front().first, header, _sendQueue.front().second);
63-
_sendQueue.pop();
64-
if (!_sendQueue.empty()) {
65-
lock.unlock();
66-
_doSend();
67-
} else {
68-
_writeInProgress = false;
69-
}
70-
});
58+
[this](const asio::error_code &error, std::size_t) {
59+
if (error == asio::error::operation_aborted || _stopping)
60+
return;
61+
if (error) {
62+
std::cerr << "Error while sending packet: " << error.message() << std::endl;
63+
return;
64+
}
65+
std::unique_lock<std::mutex> lock(_sendQueueMutex);
66+
auto header = SerializerTrait<PacketHeader>::deserialize(_sendQueue.front().second);
67+
_packetSent(_sendQueue.front().first, header, _sendQueue.front().second);
68+
_sendQueue.pop();
69+
if (!_sendQueue.empty()) {
70+
lock.unlock();
71+
_doSend();
72+
} else {
73+
_writeInProgress = false;
74+
}
75+
}
76+
);
7177
}
7278

7379
void polymorph::network::udp::AConnector::_determinePacket(const std::vector<std::byte> &data)
@@ -97,3 +103,8 @@ bool polymorph::network::udp::AConnector::isReceiveInProgress() const
97103
{
98104
return _receiveInProgress;
99105
}
106+
107+
void polymorph::network::udp::AConnector::stop()
108+
{
109+
_stopping = true;
110+
}

src/src/udp/APacketHandler.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,21 @@ void polymorph::network::udp::APacketHandler::_broadcastReceivedPacket(const pol
7272
const std::vector<std::byte> &bytes)
7373
{
7474
if (_receiveCallbacks.contains(header.opId)) {
75+
std::vector<std::shared_ptr<std::function<int(const PacketHeader &, const std::vector<std::byte> &)>>> callbacksToPop;
76+
7577
for (auto &callback : _receiveCallbacks[header.opId]) {
76-
callback(header, bytes);
78+
auto res = (*callback)(header, bytes);
79+
if (res == 0)
80+
callbacksToPop.push_back(callback);
81+
}
82+
for (auto &callback : callbacksToPop) {
83+
auto it = std::find(_receiveCallbacks[header.opId].begin(), _receiveCallbacks[header.opId].end(), callback);
84+
_receiveCallbacks[header.opId].erase(it);
7785
}
7886
}
7987
}
8088

81-
void polymorph::network::udp::APacketHandler::_registerReceiveHandler(polymorph::network::OpId opId, std::function<void(const PacketHeader &, const std::vector<std::byte> &)> handler)
89+
void polymorph::network::udp::APacketHandler::_registerReceiveHandler(polymorph::network::OpId opId, std::function<int(const PacketHeader &, const std::vector<std::byte> &)> handler)
8290
{
83-
_receiveCallbacks[opId].emplace_back(std::move(handler));
91+
_receiveCallbacks[opId].emplace_back(std::make_shared<std::function<int(const PacketHeader &, const std::vector<std::byte> &)>>(std::move(handler)));
8492
}

0 commit comments

Comments
 (0)