最近在改造舊系統,遇到了需要使用rabbitMq的場景.在以前使用的過程中需要在發送端和消費端各種配置,感覺比較麻煩,然後突然想到了dubbo中@Reference註解的形式,可不可以做一個類似的架子,這樣調用MQ的時候就像調用同步接口一樣方便簡單呢?於是查了相關資料和看了dubbo的源碼,之後就有了思路.
總的來說,要實現的目標就是像dubbo一樣,消費端暴露接口(甚至可以復用dubbo服務定義的接口,這樣寫一個dubbo服務即可同步也可MQ異步),發送端通過自定義的註解注入對象調用方法,透過框架內部處理之後轉換成非同步mq形式傳送到消費端.
例如服務端有介面:
public interface MqDemoService { void dealById(Long id); }
有實作:
@Slf4j @Component("mqDemoServiceImpl") @Service(version = "1.0.0") public class MqDemoServiceImpl implements MqDemoService { @Override public void dealById(Long id) { log.info("执行findById方法"); } }
其中:
@Slf4j是lombok注解 @Service是dubbo服务端注解
有興趣的同學自行查閱
然後是發送端
有自訂註解:
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface AsyncInvoker { }
於是在呼叫的controller中:
@Slf4j @Controller public class MqDemoController { @AsyncInvoker private MqDemoService mqDemoService; @RequestMapping(value = "/deal", method = RequestMethod.POST) public void deal() { mqDemoService.dealById(1L); } }
注意Controller中@AsyncInvoker註解的屬性mqDemoService,透過這個註解注入的物件呼叫方法的時候會透過mq發送變為非同步呼叫.
好了,要實現的目標很清晰了.那麼要解決的問題就是以下幾個方面了:
1,如何确定发送消息的格式,使消费端可以确定调用的方法 2,发送端中如何为注解@AsyncInvoker注释的对象注入实例 3,接收端中如何在接收到消息后调用对应接口的实现方法 4,多个消费服务如何区分mq队列.
1,如何確定發送訊息的格式,使接收端可以確定調用的方法
這裡我先按照java反射調用需要的參數簡單定義了一個傳輸物件:
@Data public class MqMethodMeta { //调用的接口名称(包括包名,用于反射) private String interfaceName; //调用的方法名 private String methodName; //调用的方法的参数 private Object[] args; //调用的方法的参数类型 private String[] paramTypeNames; }
2,發送端中如何為註解@AsyncInvoker註解的物件注入實例
在這個場景中,發送端是只會引入消費端的接口,不會引入實現的.那麼@AsyncInvoker如何注入對象呢?
答案就是動態代理.
那麼還有如何讓Spring知道@AsyncInvoker註解的對像要注入動態代理呢?
答案就是spring的BeanPostProcessor接口了!這個接口允許spring在處理對象創建的前後插入用戶自己定義的邏輯,在這裡就不細細展開了,有需要的同學自行google/百度了哈.
那麼思路出來了,程式碼如下:
@Slf4j @Component public class AsyncInvokerBeanProcessor implements BeanPostProcessor { //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用. private final ConcurrentMap<string> proxyMap = new ConcurrentHashMap(); //注入spring amqp处理mq的对象 @Autowired private RabbitTemplate rabbitTemplate; //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理. @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { //获取该实例中的有@AsyncInvoker注解的field Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { try { if (!field.isAccessible()) { field.setAccessible(true); } AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class); if (asyncInvoker != null) { //创建代理对象,赋值给该feild Object value = createProxy(field.getType()); if (value != null) { field.set(bean, value); } } } catch (Throwable e) { log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e); } } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } private Object createProxy(Class clz) { String interfaceName; if (clz.isInterface()) { interfaceName = clz.getName(); } else { throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface."); } Object proxy = proxyMap.get(interfaceName); if (proxy == null) { Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.debug("执行动态代理! method:{} ,args: {}", method, args); if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) { throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法"); } //动态代理中创建mq传输对象并发送. MqMethodMeta mqMethodMeta = new MqMethodMeta(); mqMethodMeta.setInterfaceName(clz.getName()); mqMethodMeta.setMethodName(method.getName()); mqMethodMeta.setArgs(args); String[] paramTypeNames = new String[args.length]; for (int i = 0; i <h2 id="接收端中如何在接收到訊息後呼叫對應介面的實作方法">3,接收端中如何在接收到訊息後呼叫對應介面的實作方法</h2> <p>接收端呼叫對應接口就很簡單了,只需要拿到MqMethodMeta物件進行反射呼叫就好了,直接上程式碼:</p> <pre class="brush:php;toolbar:false">@Slf4j public class AsyncMethodListener implements ApplicationContextAware { private ApplicationContext applicationContext; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" )) public void messageHandle(@Payload MqMethodMeta message) { try { log.info("收到message: {}", message); Class clz = Class.forName(message.getInterfaceName()); String methodName = message.getMethodName(); Object[] args = message.getArgs(); Class[] paramTypes = new Class[message.getParamTypeNames().length]; for (int i = 0; i <h2 id="多個消費服務如何區分mq佇列">4,多個消費服務如何區分mq佇列.</h2><p>這裡就使用到了rabbit的topic類型exchange.<br>首先對消費端listener中的queue和routekey進行可設定話管理:</p><pre class="brush:php;toolbar:false">@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" ))
注意這裡的
${demo.mq.method.queue} ${demo.mq.method.routekey}
是從設定檔讀取出來的:
例如係統1中是如下配置:
demo.mq.method.queue=com.demo.service.project1.# demo.mq.method.routekey=com.demo.service.project1.#
系統2中是如下配置:
demo.mq.method.queue=com.demo.service.project2.# demo.mq.method.routekey=com.demo.service.project2.#
再看發送端中那段程式碼:
//关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
這裡面的clz.getName(). 由於我們系統是有良好的分包策略,所以系統1的clz.getName()一定是以com.demo.service.project1為開頭的.一定會發送到project1中的listener.例如clz.getName()值為com.demo.service.project1.MqDemoService (".#"匹配後面多個標示符,此為rabbitMQ中topic類型exchange的特性).
至此,一開始想要達成的目標已經達成.今後需要用mq做異步調用的時候可以像同步方法一樣使用了.
對於mq在spring中的使用在此就不詳細列舉了,可以參考文件:
http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/
稍後會提供一套demo程式碼出來供記錄和參考
總結
#目前這套方法中還是存在一些問題的.例如:
1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考. 2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.
第一個問題等需要用到對應業務後再做考慮吧.或者有思路的通知可以探討一下.
第二個問題主要考慮的是如果消費端更改了參數類型或者其他之類的情況下,重新發布後,對於可能殘留在mq中的老消息的兼容.這個目前確實沒有什麼好思路,拋出也是為了集思廣益了.
以上是利用rabbit mq.模擬dubbo,使MQ非同步調用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本文討論了使用Maven和Gradle進行Java項目管理,構建自動化和依賴性解決方案,以比較其方法和優化策略。

本文使用Maven和Gradle之類的工具討論了具有適當的版本控制和依賴關係管理的自定義Java庫(JAR文件)的創建和使用。

本文討論了使用咖啡因和Guava緩存在Java中實施多層緩存以提高應用程序性能。它涵蓋設置,集成和績效優勢,以及配置和驅逐政策管理最佳PRA

本文討論了使用JPA進行對象相關映射,並具有高級功能,例如緩存和懶惰加載。它涵蓋了設置,實體映射和優化性能的最佳實踐,同時突出潛在的陷阱。[159個字符]

Java的類上載涉及使用帶有引導,擴展程序和應用程序類負載器的分層系統加載,鏈接和初始化類。父代授權模型確保首先加載核心類別,從而影響自定義類LOA


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

ZendStudio 13.5.1 Mac
強大的PHP整合開發環境

Atom編輯器mac版下載
最受歡迎的的開源編輯器

Dreamweaver CS6
視覺化網頁開發工具

MinGW - Minimalist GNU for Windows
這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

MantisBT
Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。