forked from rsocket/rsocket-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRSocketSetup.cpp
More file actions
83 lines (71 loc) · 2.82 KB
/
Copy pathRSocketSetup.cpp
File metadata and controls
83 lines (71 loc) · 2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright 2004-present Facebook. All Rights Reserved.
#include "src/RSocketSetup.h"
#include "src/RSocketParameters.h"
#include "src/internal/RSocketConnectionManager.h"
#include "src/internal/ScheduledRSocketResponder.h"
#include "src/framing/FrameTransport.h"
#include "src/statemachine/RSocketStateMachine.h"
#include "src/RSocketRequester.h"
#include "src/RSocketErrors.h"
#include "src/RSocketStats.h"
namespace rsocket {
RSocketSetup::RSocketSetup(
yarpl::Reference<FrameTransport> frameTransport,
SetupParameters setupParams,
folly::EventBase& eventBase,
RSocketConnectionManager& connectionManager)
: frameTransport_(std::move(frameTransport)),
setupParams_(std::move(setupParams)),
eventBase_(eventBase),
connectionManager_(connectionManager) {}
RSocketSetup::~RSocketSetup() {
if (frameTransport_) {
// this instance was ignored and no RSocket instance was created from it
// we will just close the transport
frameTransport_->closeWithError(std::runtime_error("ignored connection"));
}
}
std::unique_ptr<RSocketRequester> RSocketSetup::createRSocketRequester(
std::shared_ptr<RSocketResponder> requestResponder,
std::shared_ptr<RSocketStats> stats,
std::shared_ptr<RSocketNetworkStats> networkStats) {
auto rs = createRSocketStateMachine(std::move(requestResponder), std::move(stats), std::move(networkStats));
return std::make_unique<RSocketRequester>(std::move(rs), eventBase_);
}
void RSocketSetup::createRSocket(
std::shared_ptr<RSocketResponder> requestResponder,
std::shared_ptr<RSocketStats> stats,
std::shared_ptr<RSocketNetworkStats> networkStats) {
createRSocketStateMachine(std::move(requestResponder), std::move(stats), std::move(networkStats));
}
std::shared_ptr<RSocketStateMachine> RSocketSetup::createRSocketStateMachine(
std::shared_ptr<RSocketResponder> requestResponder,
std::shared_ptr<RSocketStats> stats,
std::shared_ptr<RSocketNetworkStats> networkStats) {
if(requestResponder) {
requestResponder = std::make_shared<ScheduledRSocketResponder>(
std::move(requestResponder), eventBase_);
} else {
// if the responder was not provided, we will create a default one
requestResponder = std::make_shared<RSocketResponder>();
}
if (!stats) {
stats = RSocketStats::noop();
}
auto rs = std::make_shared<RSocketStateMachine>(
eventBase_,
std::move(requestResponder),
nullptr,
ReactiveSocketMode::SERVER,
std::move(stats),
std::move(networkStats));
connectionManager_.manageConnection(rs, eventBase_);
rs->connectServer(std::move(frameTransport_), setupParams_);
return rs;
}
void RSocketSetup::error(const RSocketError& error) {
// TODO emit ERROR ... but how do I do that here?
frameTransport_->closeWithError(std::runtime_error(error.what()));
frameTransport_ = nullptr;
}
}