夫天地者,万物之逆旅;光阴者,百代之过客。而浮生若梦,为欢几何?
RabbitMQ 入门篇之—RPC基础篇

什么是RPC?

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RabbitMQ下RPC的实现

官方文档:http://www.rabbitmq.com/tutorials/tutorial-six-python.html

RabbitMQ 官方已经提供了RPC的封装,我们只需写代码实现接口,写自己的业务代码即可。并且官网API 文档中已经有C#的案例供我们参考。

接下来我们通过写一个简单的客户端和服务器demo来熟悉RabbitMQ RPC的使用。

服务端代码:

static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.UserName = "guest";
            factory.Password = "guest";
            factory.VirtualHost = "/";
            factory.HostName = "127.0.0.1";
            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            channel.QueueDeclare("rpc_test", false, false, false, null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume("rpc_test", false, consumer);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("服务端接收到:"+message);
                var sendMsg = "server to client msg";
                Console.WriteLine("服务器发送消息:"+sendMsg);
                var responseBytes = Encoding.UTF8.GetBytes(sendMsg);
                channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                  basicProperties: replyProps, body: responseBytes);
                channel.BasicAck(deliveryTag: ea.DeliveryTag,
                  multiple: false);
            };
        }

客户端代码:

static void Main(string[] args)
        {
            BlockingCollection<string> respQueue = new BlockingCollection<string>();
            ConnectionFactory factory = new ConnectionFactory();
            factory.UserName = "guest";
            factory.Password = "guest";
            factory.VirtualHost = "/";
            factory.HostName = "127.0.0.1";
            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            var replyQueueName = channel.QueueDeclare().QueueName;
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            var props = channel.CreateBasicProperties();
            var correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId;
            props.ReplyTo = replyQueueName;
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);
                }
            };
            var sendMsg = "hello rabbitmq";
            var messageBytes = Encoding.UTF8.GetBytes(sendMsg);
            channel.BasicPublish(
                exchange: "",
                routingKey: "rpc_test",
                basicProperties: props,
                body: messageBytes);
            Console.WriteLine("客户端发送消息:"+ sendMsg);
            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                noAck: false);
            var replyMsg = respQueue.Take();
            connection.Close();
            Console.WriteLine("接收到的服务器消息:"+replyMsg);
        }

运行结果:

RabbitMQ RPC都做了哪些?

官网给出了一张图,直观的描述了RPC的整个过程,这张图已经能够很好的帮助我们理解RPC了。直接上图:

RabbitMQ RPC的整个过程如下:

客户端将消息内容发给指定队列(rpc_queue),消息属性中携带消息被处理后服务端给客户端回复消息的队列名称(名称可以自动生成,也可以指定一个具体的队列)和为防止消息串行的Correlation id,同时客户端订阅该名称的队列,用于接收服务端返回值;服务端订阅队列(rpc_queue),接收到客户端的消息并处理后将回复的消息发给客户端定义的回复队列;客户端通过订阅回复队列从中获取到服务端的回复消息。整个过程其实就是请求队列和回复队列之间进行消息转发。

运行中监控如下图:


RabbitMQ RPC高度封装与压测

示例中的代码只可以做为演示,要放到线上产品中使用,我们需要对客户端和服务端进行高度封装并且进行压力测试,以免多线程下数据出现串行等问题。Rabbit 提供了SimpleRpcServer 和 SimpleRpcClient 这两个类简化我们的代码,基于这两个类笔者重新写了个demo。

客户端代码段:

static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.UserName = "guest";
            factory.Password = "guest";
            factory.VirtualHost = "/";
            factory.HostName = "127.0.0.1";
            var connection = factory.CreateConnection();
           
            for (int i = 0; i < 10; i++)
            {
                Task.Factory.StartNew(() => {
                    var channel = connection.CreateModel();
                    for (int j = 0; j < 50; j++)
                    {
                        SimpleRpcClient client = new SimpleRpcClient(channel, "rpc_test");
                        var sendMsg = "线程:" + System.Threading.Thread.CurrentThread.ManagedThreadId + " Num:" + j;
                        var result = client.Call(Encoding.UTF8.GetBytes(sendMsg));
                        Console.WriteLine("客户端线程:" + System.Threading.Thread.CurrentThread.ManagedThreadId + " Num:" + j + "接收数据:" + Encoding.UTF8.GetString(result));
                        System.Threading.Thread.SpinWait(600);
                    }
                });
            }
        }

服务端代码段:

 public class MyRpcServer : SimpleRpcServer
    {
        public MyRpcServer(Subscription subscription) : base(subscription)
        {

        }
        public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
        {
            replyProperties = null;
            return body;
        }
        public override void ProcessRequest(BasicDeliverEventArgs evt)
        {
            var result = evt.Body;
            Console.WriteLine("服务端接收到数据:"+ Encoding.UTF8.GetString(result) +" ==开始处理业务逻辑");
            base.ProcessRequest(evt);
        }
    }
    
    static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.UserName = "guest";
            factory.Password = "guest";
            factory.VirtualHost = "/";
            factory.HostName = "127.0.0.1";
            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            MyRpcServer server = new MyRpcServer(new Subscription(channel, "rpc_test"));
            server.MainLoop();
        }

从上图可以看出,在较低的并发下,官方封装的客户端和服务端在多线程并发下没有出现数据串行问题,基本满足了我们的使用要求,可以放心使用。

作者:暗夜余晖

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

0

支持

0

反对

posted @2018-7-7  拜读(2645)

评论列表

评论内容:



喜欢请打赏

支付宝 微信

请放心支付