1 year ago
#340670
Artem Polyakov
Send request from a server to a connected client with RSocket C++
could someone kindly help me with the RSocket issue? I'm trying to send a request from the Server to a connected Client, but the Client Responder does not fire callbacks. I'm using the RSocket-cpp library.
Server code:
TcpConnectionAcceptor::Options opts;
opts.address = folly::SocketAddress("::", FLAGS_port);
opts.threads = 2;
// RSocket server accepting on TCP
auto rs = std::make_unique<TcpConnectionAcceptor>(std::move(opts));
auto serverThread = std::thread([&rs] {
// start accepting connections
rs->start(
[](std::unique_ptr<DuplexConnection> connection, folly::EventBase& eventBase) {
LOG(INFO) << "new incoming connected" << std::endl;
auto client = RSocket::createClientFromConnection(
std::move(connection), *eventBase.getEventBase(), SetupParameters(), nullptr, std::make_shared<GenericRequestResponseResponder>());
LOG(INFO) << "send data" << std::endl;
client->getRequester()->requestResponse(Payload("hello2"))->subscribe([](Payload p) {
LOG(INFO) << "Received1 >> " << p.moveDataToString() << std::endl;
});
LOG(INFO) << "request is sent from server" << std::endl;
});
});
Output:
I0327 07:11:33.583813 23622 RequestResponseHelloWorld_Server.cpp:95] new incoming connected
I0327 07:11:33.602982 23622 RequestResponseHelloWorld_Server.cpp:100] send data
I0327 07:11:33.604566 23622 RequestResponseHelloWorld_Server.cpp:105] request is sent from server
Client code:
class GenericRequestResponseResponder : public rsocket::RSocketResponder
{
public:
std::shared_ptr<Single<Payload>> handleRequestResponse(
Payload request,
StreamId /*streamId*/) override
{
LOG(INFO) << "GenericRequestResponseResponder.handleRequestResponse "
<< request << std::endl;
// string from payload data
auto requestString = request.moveDataToString();
return Single<Payload>::create(
[name = std::move(requestString)](auto subscriber) {
std::stringstream ss;
ss << "Ack " << name << "!";
std::string s = ss.str();
subscriber->onSubscribe(SingleSubscriptions::empty());
subscriber->onSuccess(Payload(s, "metadata"));
});
}
void handleFireAndForget(
rsocket::Payload request,
rsocket::StreamId /*streamId*/) override
{
LOG(INFO) << "GenericRequestResponseResponder.handleRequestResponse "
<< request << std::endl;
}
};
folly::SocketAddress address{folly::SocketAddress(host, port)};
std::shared_ptr<TelemetryConnection> connection{nullptr};
RSocket::createConnectedClient(
std::make_unique<TcpConnectionFactory>(
*m_worker->getEventBase(), std::move(address)), SetupParameters(), std::make_shared<GenericRequestResponseResponder>())
.thenValue([this, host, port, &connection](auto&& client) {
LOG(INFO) << "client is created" << std::endl;
m_clientList.append(client);
})
.thenError(
folly::tag_t<std::exception>{},
[&](const std::exception&) {
LOG(ERROR) << "connection failed";
}).get();
I was expecting GenericRequestResponseResponder::``handleRequestResponse
fired when the server sends the request, but client output is empty
c++
rsocket
0 Answers
Your Answer