1 year ago

#340670

test-img

Artem Polyakov

Send request from a server to a connected client with RSocket C++

could someone kindly help me with the RSocket issue? I'm trying to send a request from the Server to a connected Client, but the Client Responder does not fire callbacks. I'm using the RSocket-cpp library.

Server code:

TcpConnectionAcceptor::Options opts;
  opts.address = folly::SocketAddress("::", FLAGS_port);
  opts.threads = 2;

  // RSocket server accepting on TCP
  auto rs = std::make_unique<TcpConnectionAcceptor>(std::move(opts));

  auto serverThread = std::thread([&rs] {
    // start accepting connections
    rs->start(
        [](std::unique_ptr<DuplexConnection> connection, folly::EventBase& eventBase) {
          LOG(INFO) << "new incoming connected" << std::endl;

          auto client = RSocket::createClientFromConnection(
              std::move(connection), *eventBase.getEventBase(), SetupParameters(), nullptr, std::make_shared<GenericRequestResponseResponder>());

          LOG(INFO) << "send data" << std::endl;
          client->getRequester()->requestResponse(Payload("hello2"))->subscribe([](Payload p) {
          LOG(INFO) << "Received1 >> " << p.moveDataToString() << std::endl;
          });

          LOG(INFO) << "request is sent from server" << std::endl;
        });
  });

Output:

I0327 07:11:33.583813 23622 RequestResponseHelloWorld_Server.cpp:95] new incoming connected
I0327 07:11:33.602982 23622 RequestResponseHelloWorld_Server.cpp:100] send data
I0327 07:11:33.604566 23622 RequestResponseHelloWorld_Server.cpp:105] request is sent from server

Client code:

class GenericRequestResponseResponder : public rsocket::RSocketResponder
{
    public:
        std::shared_ptr<Single<Payload>> handleRequestResponse(
            Payload request,
            StreamId /*streamId*/) override
        {
            LOG(INFO) << "GenericRequestResponseResponder.handleRequestResponse "
                      << request << std::endl;
            
            // string from payload data
            auto requestString = request.moveDataToString();

            return Single<Payload>::create(
                [name = std::move(requestString)](auto subscriber) {
                    std::stringstream ss;
                    ss << "Ack " << name << "!";
                    std::string s = ss.str();
                    subscriber->onSubscribe(SingleSubscriptions::empty());
                    subscriber->onSuccess(Payload(s, "metadata"));
                });
        }
        void handleFireAndForget(
            rsocket::Payload request,
            rsocket::StreamId /*streamId*/) override
        {
            LOG(INFO) << "GenericRequestResponseResponder.handleRequestResponse "
                      << request << std::endl;
        }
};

folly::SocketAddress address{folly::SocketAddress(host, port)};
        std::shared_ptr<TelemetryConnection> connection{nullptr};

        RSocket::createConnectedClient(
            std::make_unique<TcpConnectionFactory>(
                *m_worker->getEventBase(), std::move(address)), SetupParameters(), std::make_shared<GenericRequestResponseResponder>())
                          .thenValue([this, host, port, &connection](auto&& client) {
                              LOG(INFO) << "client is created" << std::endl;
                              m_clientList.append(client);

                          })
                          .thenError(
                              folly::tag_t<std::exception>{},
                              [&](const std::exception&) {
                                  LOG(ERROR) << "connection failed";
                              }).get();

I was expecting GenericRequestResponseResponder::``handleRequestResponse fired when the server sends the request, but client output is empty

c++

rsocket

0 Answers

Your Answer

Accepted video resources