0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

什么是远程过程调用

科技绿洲 来源:Linux开发架构之路 作者:Linux开发架构之路 2023-11-10 10:10 次阅读

开发环境:Ubuntu VS Code

编译器:g++

编程语言:C++

框架源码下载:GitHub

认识RPC

RPC的全称是远程过程调用(Remote Procedure Call)。

什么是远程过程调用呢?

那么对于一个聊天系统有int send_information(int friend_id,string msg)这个方法,我们的一个处理逻辑是不是这样:

  • 调用bool is_exist(int friend_id)判断用户是否在线
  • 根据结果在决议是发送在线消息还是离线消息。

那么对于一个继承了登录和聊天功能的系统,我们在本地调用一个函数,就直接返回值=函数名(参数1,参数2),就直接执行了这个过程并且得到了返回值。

但是如果考虑高并发、高可用以及系统扩展性的话,那我们不得不引入分布式的设计。这意味这,登录和聊天会部署在不同的机器上!那么要完成上面的逻辑,就不得不依靠网络,将要调用的函数名以及参数通过网络打包发送给另一个机器,然后由另一台机器将结果发过来。

而我们要做的RPC框架就是为使这个过程更好用而设计的。

图片

RPC框架的设计

博文的标题是基于 protobuf 和 zookeeper 的RPC框架,那么protobuf 和 zookeeper又在整个框架啊中扮演怎样的角色呢?

protobuf 作用

protobuf 主要是作为整个框架的传输协议。我们可以看一下整个框架对于传输信息的格式定义:

message RpcHeader
{
bytes service_name = 1; //类名
bytes method_name = 2; //方法名
uint32 args_size = 3; //参数大小
}

可以看到,它定义了要调用方法是属于哪个类的哪个方法以及这个方法所需要的的参数大小。

那么参数呢?是怎样定义的?

首先我们来看一下我们框架内传输的数据是什么:4字节标识头部长度+RpcHeader+args

/*
* 16表示整个类名+方法名+参数长度的大小
有个这个长度,我们就可以从整个长度中截取UserServiceLogin15这一段
再根据RpcHeader来反序列话得出类名和方法名以及参数长度三个重要数据
* 15表示后面的参数长度
由于我们找到了类名和方法名,我们就可以在整个框架存储这些信息的地方得到一个对于这个方法的描述。
然后借用protobuf的service对象提供的一个接口GetResponsePrototype,并且根据这个方法的描述来反序列化出参数。这个都是根据我们注册的方法的描述来做的。
*/
18UserServiceLogin15zhang san123456

注:如果还是有点迷糊,可以保留参数解释信息,看到后面就大致懂了

zookeeper

zookeeper 呢在这里其实主要就是起到了一个配置中心的目的。

什么意思呢?

zookeeper上面我们标识了每个类的方法所对应的分布式节点地址,当我们其他服务器想要RPC的时候,就去 zookeeper 上面查询对应要调用的服务在哪个节点上。

图片

注意:这里就相当于,我其他服务器来 zookeeper 查询User::is_exist,然后会得到127.0.0.1:8001 这个值,这个值就是你布置这个功能的一个RPC节点的网络标识符,然后向这个节点去发送参数并且等待这个节点的相应。

从框架的使用来认识

框架的使用一般都是在example目录下的 callee/UserService.cpp 里面

RpcApplication::init(argc, argv);

//框架服务提供provider
RpcProvider provide;
provide.notify_service(new UserService());
provide.run();

可以看到,主要去做了三个事情:

  • 首先 RPC 框架肯定是部署到一台服务器上的,所以我们需要对这个服务器的 ip 和 port 进行初始化
  • 然后创建一个 porvider(也就是server)对象,将当前 UserService 这个对象传递给他,也就是其实这个 RPC 框架和我们执行具体业务的节点是在同一个服务器上的。RPC框架负责解析其他服务器传递过来的请求,然后将这些参数传递给本地的方法。并将返回值返回给其他服务器。
  • 最后是去让这个 provider 去 run 起来。具体我们其实可以看一下源代码:
//启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::run()
{
//获取ip和port
string ip = RpcApplication::get_instance().get_configure().find_load("rpcserver_ip");
uint16_t port = atoi(RpcApplication::get_instance().get_configure().find_load("rpcserver_port").c_str());
//cout << ip << ":" << port << endl;
InetAddress address(ip, port);

//创建tcpserver对象
TcpServer server(&eventloop_, address, "RpcProvider");
//绑定链接回调和消息读写回调方法
server.setConnectionCallback(bind(&RpcProvider::on_connection, this, _1));
server.setMessageCallback(bind(&RpcProvider::on_message, this, _1, _2, _3));

//设置muduo库的线程数量
server.setThreadNum(4);

//把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
ZookeeperClient zk_client;
zk_client.start();

//在配置中心中创建节点
for (auto &sp : service_map_)
{
string service_path = "/" + sp.first;
zk_client.create(service_path.c_str(), nullptr, 0);
for (auto &mp : sp.second.method_map_)
{
string method_path = service_path + "/" + mp.first;
char method_path_data[128] = {0};
sprintf(method_path_data, "%s:%d", ip.c_str(), port);
//ZOO_EPHEMERAL 表示znode时候临时性节点
zk_client.create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}

RPC_LOG_INFO("server RpcProvider [ip: %s][port: %d]", ip.c_str(), port);
//启动网络服务
server.start();
eventloop_.loop();
}

可以看到,整个 run 其实就是干了这么几件事情:

  • 因为底层调用的是muduo网络库,所以这里会获取ip地址和端口号,然后初始化网络层
  • 然后去设置一个连接回调以及发生读写事件时候的回调函数(稍后介绍)
  • 然后设置整个 muduo 网络库工作的线程数量
  • 然后创建zookeeper配置中心,将这些方法的信息以及本机的IP地址注册到zookeeper
  • 然后开启本机服务器的事件循环,等待其他服务器的连接

其他服务器是怎样调用的呢

我们看一下 example 目录下的 caller/CallUserService.cpp 里面是怎样调用的。

//初始化ip地址和端口号
RpcApplication::init(argc, argv);

//演示调用远程发布的rpc方法login
ik::UserServiceRpc_Stub stub(new RpcChannel());

//请求参数和响应
ik::LoginRequest request;
request.set_name("zhang san");
request.set_password("123456");

ik::LoginResponse response;
RpcControl control;
//发起rpc调用,等待响应返回
stub.Login(&control, &request, &response, nullptr);

//rpc调用完成,读调用的结果
if (response.errmsg().error() == 0)
{
//没错误
cout << "rpc login response: " << response.success() << endl;
}
else
{
//有错误
cout << "rpc login response errer: " << response.errmsg().error_msg() << endl;
}

同样,也是有以下几个步骤:

  • 初始化 RPC 远程调用要连接的服务器
  • 定义一个 UserSeervice 的 stub 桩类,由这个装类去调用Login方法,这个login方法可以去看一下源码的定义:
//重写基类UserServiceRpc的虚函数,供框架调用
void Login(::google::protobuf::RpcController *controller,
const ::ik::LoginRequest *request,
::ik::LoginResponse *response,
::google::protobuf::Closure *done)
{
//框架给业务上报了请求参数 request,业务获取相应数据做本地业务
string name = request->name();
string password = request->password();

//本地业务
bool login_result = Login(name, password);

//把响应给调用方返回
ik::ErrorMsg *errmsg = response->mutable_errmsg();
errmsg->set_error(0);
errmsg->set_error_msg("");
response->set_success(login_result);

//执行回调操作
done->Run();
}

可以看到,Login的 RPC 重载函数有四个参数:controller(表示函数是否出错)、request(参数)、response(返回值)、done(回调函数)

其主要做的也是去围绕着解析参数,将参数放入本地调用的方法,将结果返回并执行回调函数。至于这个回调函数则是在服务端执行读写事件回调函数绑定的。

绑定的是如下方法:

void RpcProvider::send_rpc_response(const TcpConnectionPtr &conn, google::protobuf::Message *response)
{
string response_str;
//序列化
if (response->SerializeToString(&response_str))
{
//发送序列化的数据
conn->send(response_str);
}
else
{
//序列化失败
RPC_LOG_ERROR("serialize reponse error");
}
//短链接
conn->shutdown();
}

它会将由bind函数绑定的参数:response_str 发送回去,然后调用方的服务器就会收到这个返回值并解析它。

桩类是干嘛的

那么其实现在来说,我们并没有看到调用方是何时发送了要调用方法以及相应参数?

我们还是需要去返回桩类,这个是由protobuf自动去帮你生成的。

class UserServiceRpc_Stub : public UserServiceRpc {
public:
UserServiceRpc_Stub(::google::protobuf::RpcChannel* channel);
UserServiceRpc_Stub(::google::protobuf::RpcChannel* channel,
::google::protobuf::Service::ChannelOwnership ownership);
~UserServiceRpc_Stub();

inline ::google::protobuf::RpcChannel* channel() { return channel_; }

// implements UserServiceRpc ------------------------------------------

void Login(::google::protobuf::RpcController* controller,
const ::ik::LoginRequest* request,
::ik::LoginResponse* response,
::google::protobuf::Closure* done);
void Register(::google::protobuf::RpcController* controller,
const ::ik::RegisterRequest* request,
::ik::RegisterResponse* response,
::google::protobuf::Closure* done);
private:
::google::protobuf::RpcChannel* channel_;
bool owns_channel_;
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(UserServiceRpc_Stub);
};

我们在定义桩类的时候,会传入一个RpcCannel的指针,这个绑定到这个桩类的channel_指针。

当我们去调用这个桩类的Login方法的时候,会去调用传递进来的channel的CallMethod方法:

void UserServiceRpc_Stub::Login(::google::protobuf::RpcController* controller,
const ::ik::LoginRequest* request,
::ik::LoginResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}

所以,显而易见,我去发送这些方法啊,参数啊,都是在CallMethod这个方法中执行的。

那么CallMethod里面执行的内容对我们理解RPC调用体系至关重要(代码比较长,可以直接跳过听结论):

void RpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
google::protobuf::RpcController *controller,
const google::protobuf::Message *request,
google::protobuf::Message *response,
google::protobuf::Closure *done)
{
const google::protobuf::ServiceDescriptor *service_des = method->service();
string service_name = service_des->name();
string method_name = method->name();

//获取参数的序列化字符串长度 args_size
int args_size = 0;
string args_str;
if (request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}

//定义rpc的请求header
ikrpc::RpcHeader rpc_header;
rpc_header.set_service_name(service_name);
rpc_header.set_method_name(method_name);
rpc_header.set_args_size(args_size);

uint32_t header_size = 0;
string rpc_header_str;
if (rpc_header.SerializeToString(&rpc_header_str))
{
//序列化成功
header_size = rpc_header_str.size();
}
else
{
//序列化失败
controller->SetFailed("serialize rpc_header error!");
return;
}

//组织待发送的rpc请求的字符串
string send_rpc_str;
send_rpc_str.insert(0, string((char *)&header_size, 4)); //header_size
send_rpc_str += rpc_header_str; //rpc_header
send_rpc_str += args_str; //args_str

//使用tcp编程,完成rpc方法的远程调用
int client_fd = socket(AF_INET, SOCK_STREAM, 0);
if (client_fd == -1)
{
close(client_fd);
RPC_LOG_FATAL("create socket error! errno:%d", errno);
}

//获取ip和port
ZookeeperClient zk_client;
zk_client.start();
string method_path = "/" + service_name + "/" + method_name;
string host_data = zk_client.get_data(method_path.c_str());
if(host_data == "")
{
controller->SetFailed(method_path+" is not exist");
return;
}
int host_index = host_data.find(":");
if(host_index == -1)
{
controller->SetFailed(method_path+" address is invalid!");
return;
}
string ip = host_data.substr(0,host_index);
uint16_t port = atoi(host_data.substr(host_index+1,host_data.size()-host_index).c_str());

struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

if (connect(client_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
{
close(client_fd);
RPC_LOG_FATAL("connet error! errno: %d", errno);
}

// 发送rpc请求
if (send(client_fd, send_rpc_str.c_str(), send_rpc_str.size(), 0) == -1)
{
controller->SetFailed("send error! errno: " + errno);
close(client_fd);
return;
}

//接受rpc请求
char recv_buffer[BUFF_SIZE] = {0};
ssize_t recv_size = recv(client_fd, recv_buffer, BUFF_SIZE, 0);
if (recv_size == -1)
{
controller->SetFailed("recv error! errno: " + errno);
close(client_fd);
return;
}

//反序列化响应数据
//String 因为遇到�会认为是字符串结束,所以用Array
if (!response->ParseFromArray(recv_buffer, recv_size))
{
string recv = recv_buffer;
controller->SetFailed("parse error! response_str: " + recv);
close(client_fd);
return;
}
close(client_fd);
}
  • 组织要发送的 request_str 字符串
  • 从zookeeper中拿到服务端的 ip 和 port,连接服务端
  • 发送 request_str
  • 接受服务端返回过来的 response 字符串并反序列化出结果

总结一下RPC流程

图片

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • RPC
    RPC
    +关注

    关注

    0

    文章

    102

    浏览量

    11424
  • 源码
    +关注

    关注

    8

    文章

    574

    浏览量

    28595
  • 函数
    +关注

    关注

    3

    文章

    3911

    浏览量

    61313
  • 远程过程调用

    关注

    0

    文章

    2

    浏览量

    683
收藏 人收藏

    评论

    相关推荐

    嵌入式远程过程调用组件--eRPC

    概述 RPC(Remote Procedure call)远程过程调用。其分为两部分:远程过程过程调用
    的头像 发表于 04-06 14:15 1676次阅读

    Multism12安装过程出现“远程过程调用失败且未运行”

    开始安装就弹出这个对话框。安装环境是WINXP64位。请问高手是怎么回事呢?
    发表于 03-11 16:32

    SQL server服务,远程过程调用失败的解决方法

    成功解决SQL server服务,远程过程调用失败
    发表于 12-27 10:11

    Linux rpc编程过程

    通过rpcgen的man手册看到此工具的作用是把RPC源程序编译成C语言源程序,从而轻松实现远程过程调用
    发表于 07-24 07:25

    RPC的结构原理是什么?

    远程过程调用(RPC)是一个协议,程序可以使用这个协议请求网络中另一台计算机上某程序的服务而不需知道网络细节。(过程调用有时也称作函数调用,或子例行程序
    发表于 10-12 10:43

    一种开源的NET系统推荐

    C# 源码 AForge.NETRPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议
    发表于 07-01 06:20

    嵌入式RPC的设计与实现

    在研究远程过程调用的原理和嵌入式系统特点的基础上,提出一种远程过程调用的设计以及在VxWorks操作系统上服务器端和在Windows操作系统上客户端的实现。经在项目中的应用,本设计
    发表于 03-21 15:30 23次下载

    多核下基于RDMA的高效RPC研究

    本文针对目前基于远程直接内存访问(remote direct memory access,RDMA)的远程过程调用(remote procedure call,RPC)设计在多核背景下存在的性能和扩展性问题,提出了一种新的基于RDMA Write的高效
    发表于 09-26 15:27 0次下载
    多核下基于RDMA的高效RPC研究

    初步了解Linux的NFS服务

    RPC(远程过程调用,定义在RFC1057),见名知意,就是客户端调用远程服务器上函数的一种方法。RPC的目标就是要以下2~8这些步骤都封装起来,让用户对这些细节透明。
    发表于 05-06 15:43 697次阅读
    初步了解Linux的NFS服务

    RPC如何在远程过程调用

    RPC(Remote Procedure Call Protocol)即远程过程调用,也就是调用的函数是在其它的控制板上运行的,不需要理会底层的通讯协议。
    的头像 发表于 02-07 09:52 640次阅读
    RPC如何在<b class='flag-5'>远程</b><b class='flag-5'>过程</b>中<b class='flag-5'>调用</b>?

    OpenDaylight中的RPC &amp; Notification是什么

    我们将介绍RPC和Notification,并从进程内外的通信开始,着重介绍远程过程调用和发布-订阅机制,然后分析MD-SAL的通信交互过程
    的头像 发表于 02-14 15:15 628次阅读
    OpenDaylight中的RPC &amp; Notification是什么

    gRPC内存马研究与查杀

    远程过程调用(Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。
    的头像 发表于 06-01 11:24 1107次阅读
    gRPC内存马研究与查杀

    RPC接口与HTTP接口哪一个更好?

    HTTP接口和RPC接口都是生产上常用的接口,顾名思义,HTTP接口使用基于HTTP协议的URL传参调用,而RPC接口则基于远程过程调用
    发表于 06-13 09:18 663次阅读
    RPC接口与HTTP接口哪一个更好?

    基于Client/Server架构的HTTP接口和RPC接口

    HTTP接口和RPC接口都是生产上常用的接口,顾名思义,HTTP接口使用基于HTTP协议的URL传参调用,而RPC接口则基于远程过程调用
    发表于 06-13 09:18 422次阅读
    基于Client/Server架构的HTTP接口和RPC接口