夫天地者,万物之逆旅;光阴者,百代之过客。而浮生若梦,为欢几何?
Java版封装消息队列并实现调用

前言

封装Java版的消息队列服务(jar包),供第三方项目引入调用,类似封装.net版的dll。涉及点:spring boot、maven、rabbitmq

项目概览

这里创建的是spring web的项目,目录结构如下

核心代码

Sender.java

发送消息时,时通过调Api接口来实现的,这里不涉及对RabbitMQ的操作,部分代码如下:

public class Sender {
    private String Url;
    public Sender(String url){
        this.Url = url;
    }
    /**
     * 调发送消息的接口
     * @param appKey
     * @param appSecret
     * @param body
     * @param serviceMark
     * @return
     */
    public JSONObject Send(String appKey,String appSecret,String body,String serviceMark) {
        RestTemplate restTemplate = new RestTemplate();
        String result = "";
        String url = Url + "/api/mq/sendMsg";
        JSONObject postData = new JSONObject();
        postData.put("AppKey", appKey);
        postData.put("AppSecret", appSecret);
        postData.put("ServiceMark", serviceMark);
        postData.put("Content", body);
        JSONObject json = restTemplate.postForEntity(url, postData, JSONObject.class).getBody();
        return json;
    }
}

Receiver.java

public class Receiver {
    public Receiver(){}
    public Receiver(String appKey,String appSecret,String url){
        this.Url = url;
        this.AppSecret = appSecret;
        this.AppKey = appKey;
    }
    private String exchange = "ExchangeTest";
    private String Url;
    private String AppSecret;
    private String AppKey;
    public void Receive(String queue,String routeKey)  {
        try {
            Connection connection = ConnectUtil.getConnection(AppKey,AppSecret,Url);
            if(ConnectUtil.exchange != null && ConnectUtil.exchange.length()>0){
                exchange = ConnectUtil.exchange;
            }
            Channel channel = connection.createChannel();
            channel.queueDeclare(queue,true,false,false,null);
            //队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])
            channel.queueBind(queue, exchange, routeKey);
            //监听并接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received '" + message + "'");
                //TODO 自己的业务逻辑
                SaveRecord(message,queue);
            };
            channel.basicConsume(queue, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            System.out.println(e);
            e.printStackTrace();
        } finally {
        }
    }
    /**
     * 将接收到的消息保存到数据库(调Api)
     * @param message
     * @param appKey
     */
    private void SaveRecord(String message,String appKey) {
        try {
            JSONObject jsonObject = JSONObject.parseObject(message);
            String serviceMark = jsonObject.getString("ServiceMark");
            String msgID = jsonObject.getString("MsgID");
            RestTemplate restTemplate = new RestTemplate();
            String url = Url + "/api/mq/saveConsumeRecord";
            JSONObject postData = new JSONObject();
            postData.put("AppKey", appKey);
            postData.put("Content", message.replaceAll("\\\\",""));
            postData.put("TransferDate", new Date());
            postData.put("ServiceMark", serviceMark);
            postData.put("RecordStatus", 1);
            postData.put("MsgID", msgID);
            postData.put("Result", "消息接收成功-java");
            JSONObject json = restTemplate.postForEntity(url, postData, JSONObject.class).getBody();
            System.out.println(JSON.toJSON(json));
        } catch (Exception e){
            System.out.println(e);
            e.printStackTrace();
            System.out.println(e.fillInStackTrace().toString());
        } finally {
        }
    }
}

打包生成jar文件

1、选择 File -> Project Structure

2、依次选择以下项目

3、根据modules创建jar。如图所示,选择项目,入口类等

4、查看生成的信息,该步可以修改jar输出的目录

5、生成jar,点击idea菜单栏中的build

6、查看生成的jar

第三方项目调用

创建一个新的空的测试项目,然后开始导入前面打包的jar文件

1、File -> Project Strucure,然后如下设置

2、选择前面生成的core.jar文件

确定后。在External Libraries下就会看到已导入的core.jar

这时候,就可以在当前测试项目下,调用jar包里的方法了,主要代码如下:

package com.example.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import mq.anes.utils.*;
@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
//        SpringApplication.run(DemoApplication.class, args);
        String url = "http://localhost:44348/";
        String appKey = "test";
        String appSecret = "MDk4ZjZiY2Q0NjIxZDM3M2NhZGU0ZTgzMjYyN2I0ZjYxNjE3NzcyOTQ5MDM2";
        String content = "{\"PATINET_ID\":\"0000\",\"VISIT_ID\":1,\"OPER_ID\":1,\"OPERATION_NAME\":\"TEST-java\"}";
        Sender sender = new Sender(url);
        JSONObject jsonObject = sender.Send(appKey,appSecret,content,"service02");
        System.out.println(JSON.toJSON(jsonObject));
    }
}

运行结果

扩展

到这一步还没完,虽然可以接收到消息并且保存到数据库,但保存的逻辑是在jar包里封装了,外部调用时没法修改。所以就需要把接收到的消息返回给调用方。在C#里这个利用委托比较容易解决,首先看下C#里的实现方式

Java中的实现方式

首先创建一个监听事件的类:Observer.java

package mq.anes.utils;
import java.util.ArrayList;
import java.util.List;
public class Observer {
    private List<MessageArriveListener> listeners = new ArrayList<MessageArriveListener>();
    public interface MessageArriveListener {
        String MessageArrived(String message);
    }
    public void addOnMessageArriveListener(MessageArriveListener listener) {
        listeners.add(listener);
    }
    public String start(String message)
    {
        while (true) {
            for (MessageArriveListener listener : listeners) {
               return listener.MessageArrived(message);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

修改Receiver.Receive的方法,增加传入Observer类型的参数

重新打包后,在调用的项目中,需要这样调用:

首先需要定义一个空的Observer类,并集成jar包里的Observer类

最后调用方式如下

执行结果如下

作者:一蓑烟雨

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

0

支持

0

反对

posted @2021-5-12  拜读(697)

评论列表

评论内容:



喜欢请打赏

支付宝 微信

请放心支付