c++ - RPC failure code 14 in our simple GRPC example program -
we've made progress in getting grpc running under rhel 7.
our application has 1 rather complicated structure 3 levels of nesting outer level implementing "oneof" keyword.
find our other structures run fine, 1 gives rpc failure code=14.
we've simplified part of application as possible can recompiled , run easily.
here's .proto file, updated accommodate uli's question:
syntax = "proto3"; option java_multiple_files = true; option java_package = "io.grpc.examples.debug"; option java_outer_classname = "debugproto"; option objc_class_prefix = "debug"; package debugpackage; service debugservice { rpc debug_val_container_get (input_int32_request) returns (outer_container) {} } message input_int32_request { int32 ival = 1; } message inner_container { repeated uint32 val_array = 1; } message middle_container { inner_container vac = 1; } message other_container { int32 other_val = 1; } message outer_container { oneof reply { middle_container r1 = 1; other_container r2 = 2; } }
(please note java lines in prototype code in there because in grpc website examples. our code entirely c++, no java. don't know if means can without of these "option java..." lines).
here's our client source code:
#include <iostream> #include <memory> #include <string> #include <grpc++/grpc++.h> #include <grpc/support/log.h> #include <thread> #include <unistd.h> #include "debug.grpc.pb.h" using grpc::channel; using grpc::clientasyncresponsereader; using grpc::clientcontext; using grpc::completionqueue; using grpc::status; using debugpackage::input_int32_request; using debugpackage::inner_container; using debugpackage::middle_container; using debugpackage::outer_container; using debugpackage::debugservice; class debugclient { public: explicit debugclient(std::shared_ptr<channel> channel) : stub_(debugservice::newstub(channel)) {} void debug_val_container_get() { std::cout << "in debug_val_container_get" << std::endl; // data sending server input_int32_request val; val.set_ival(0); asyncclientcall* call = new asyncclientcall; call->response_reader = stub_->asyncdebug_val_container_get(&call->context, val, &cq_); call->response_reader->finish(&call->reply_, &call->status, (void*)call); } void asynccompleterpc() { void* got_tag; bool ok = false; while (cq_.next(&got_tag, &ok)) { asyncclientcall* call = static_cast<asyncclientcall*>(got_tag); gpr_assert(ok); if (call->status.ok()) { if (call->reply_.has_r1()) { std::cout << call << " debug received: " << call->reply_.r1().vac().val_array(0) << std::endl; } } else { std::cout << call << " rpc failed" << std::endl; std::cout << " rpc failure code = " << call->status.error_code() << std::endl; std::cout << " rpc failure message = " << call->status.error_message() << std::endl; } delete call; } } private: struct asyncclientcall { outer_container reply_; clientcontext context; status status; std::unique_ptr<clientasyncresponsereader<outer_container>> response_reader; }; std::unique_ptr<debugservice::stub> stub_; completionqueue cq_; }; int main(int argc, char** argv) { debugclient debug0(grpc::createchannel("172.16.17.46:50050", grpc::insecurechannelcredentials())); std::thread thread0_ = std::thread(&debugclient::asynccompleterpc, &debug0); debug0.debug_val_container_get(); sleep(1); std::cout << "press control-c quit" << std::endl << std::endl; thread0_.join(); //blocks forever return 0; }
and, here's our server source code:
#include <memory> #include <iostream> #include <string> #include <thread> #include <grpc++/grpc++.h> #include <grpc/support/log.h> #include "debug.grpc.pb.h" #include <time.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> using grpc::server; using grpc::serverasyncresponsewriter; using grpc::serverbuilder; using grpc::servercontext; using grpc::servercompletionqueue; using grpc::status; using debugpackage::inner_container; using debugpackage::input_int32_request; using debugpackage::middle_container; using debugpackage::outer_container; using debugpackage::debugservice; std::string save_server_address; class serverimpl final { public: ~serverimpl() { server_->shutdown(); cq_->shutdown(); } void run() { std::string server_address("0.0.0.0:50050"); serverbuilder builder; builder.addlisteningport(server_address, grpc::insecureservercredentials()); builder.registerservice(&service_); cq_ = builder.addcompletionqueue(); server_ = builder.buildandstart(); std::cout << "server listening on " << server_address << std::endl; save_server_address = server_address; handlerpcs(); } private: class calldata { public: virtual void proceed() = 0; }; class debuggetcalldata final : public calldata{ public: debuggetcalldata(debugservice::asyncservice* service, servercompletionqueue* cq) : service_(service), cq_(cq), responder_(&ctx_), status_(create) { proceed(); } void proceed() { if (status_ == create) { status_ = process; service_->requestdebug_val_container_get(&ctx_, &request_, &responder_, cq_, cq_, this); } else if (status_ == process) { new debuggetcalldata(service_, cq_); char *portchar; portchar = (char *) save_server_address.c_str(); long cq_addr = (long) cq_; int cq_addr32 = (int) (cq_addr & 0xfffffff); srand(cq_addr32); fprintf(stderr, "%s task started\n", portchar); fflush(stderr); unsigned int return_val = 10; inner_container ic; ic.add_val_array(return_val); middle_container reply_temp; reply_temp.set_allocated_vac(&ic); reply_.set_allocated_r1(&reply_temp); fprintf(stderr, "%s %s task done\n", portchar, "val_container_get"); fflush(stderr); status_ = finish; responder_.finish(reply_, status::ok, this); } else { gpr_assert(status_ == finish); } } private: debugservice::asyncservice* service_; servercompletionqueue* cq_; servercontext ctx_; input_int32_request request_; outer_container reply_; serverasyncresponsewriter<outer_container> responder_; enum callstatus { create, process, finish }; callstatus status_; }; void handlerpcs() { new debuggetcalldata(&service_, cq_.get()); void* tag; bool ok; while (true) { gpr_assert(cq_->next(&tag, &ok)); gpr_assert(ok); static_cast<calldata*>(tag)->proceed(); } } std::unique_ptr<servercompletionqueue> cq_; debugservice::asyncservice service_; std::unique_ptr<server> server_; }; int main() { serverimpl server; server.run(); return 0; }
the output when run looks this:
[fossum@netsres46 debug]$ debug_client2 in debug_val_container_get 0xb73ff0 rpc failed rpc failure code = 14 rpc failure message = endpoint read failed press control-c quit
we ran server under gdb, , found place in generated file "debug.pb.cc" if comment out 1 line, starts working.
here's pertinent piece of generated file "debug.pb.cc":
middle_container::~middle_container() { // @@protoc_insertion_point(destructor:debugpackage.middle_container) shareddtor(); } void middle_container::shareddtor() { if (this != internal_default_instance()) { delete vac_; // comment out 1 line, make problem go away } }
the "delete vac_" line appears attempt delete storage either has been deleted, or deleted somewhere else. please, can this? [the files below still files use generate code, , debug problem point]
i have no idea whether i've uncovered bug in grpc, or whether i've coded wrong.
the issue allocated middle_container reply_tmp
on stack in server. result gets destructed pass out of scope. @ time, have called finish
not yet waited result. since async server, data must remain alive until you've received tag it. why manually editing destructor works in case; you're nullifying destructor (and leaking memory result).
Comments
Post a Comment