ホームページ >Java >&#&チュートリアル >ダボをシミュレートし、MQ 非同期呼び出しを行うには、rabbit mq を使用します。
最近、古いシステムを改修していて、その前のプロセスで、送信側と消費側でさまざまな設定が必要になるシナリオに遭遇しました。そこで、ふと @Reference アノテーションの形式を思い出しました。 dubbo は可能でしょうか? MQ を呼び出すのが同期インターフェースを呼び出すのと同じくらい便利で簡単になるように、どうすればよいでしょうか。一般に、達成すべき目標は、dubbo と同様に、コンシューマ側がインターフェイスを公開し (dubbo サービスによって定義されたインターフェイスを再利用して、dubbo サービスの作成を同期または MQ 非同期にすることもできます)、送信側がインジェクトします。オブジェクトはカスタム アノテーションを通じてメソッドを呼び出し、フレームワーク内で内部的に処理されます。その後、非同期 MQ フォームに変換され、コンシューマーに送信されます。
たとえば、サーバーには
public interface MqDemoService { void dealById(Long id); }という実装があります。
その中に:
@Slf4j @Component("mqDemoServiceImpl") @Service(version = "1.0.0") public class MqDemoServiceImpl implements MqDemoService { @Override public void dealById(Long id) { log.info("执行findById方法"); } }
興味のある学生は自分でチェックしてみてください
その後、送信側
がカスタマイズされています 注:
@Slf4j是lombok注解 @Service是dubbo服务端注解
したがって、呼び出し元のコントローラーでは:
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface AsyncInvoker { }
@AsyncInvoker によって注釈が付けられた属性 mqDemoService に注意してくださいこのアノテーションを介して挿入されたオブジェクトがメソッドを呼び出すと、それは mq を介して送信され、非同期呼び出しになります。
達成すべき目標は非常に明確です。すると、解決すべき問題は次のとおりです。 :
@Slf4j @Controller public class MqDemoController { @AsyncInvoker private MqDemoService mqDemoService; @RequestMapping(value = "/deal", method = RequestMethod.POST) public void deal() { mqDemoService.dealById(1L); } }
1. 受信側が呼び出しメソッドを決定できるように、送信メッセージの形式を決定する方法
ここでは、まず Java リフレクション呼び出しの要件に従います。パラメーターは単に送信オブジェクトを定義します:
1,如何确定发送消息的格式,使消费端可以确定调用的方法 2,发送端中如何为注解@AsyncInvoker注释的对象注入实例 3,接收端中如何在接收到消息后调用对应接口的实现方法 4,多个消费服务如何区分mq队列.
このシナリオでは、送信側は実装ではなく、コンシューマー側のインターフェイスのみを導入します。 では、@AsyncInvoker はどのようにオブジェクトを挿入するのでしょうか?
答えは、Spring の BeanPostProcessor インターフェースです。このインターフェースにより、Spring は前後にユーザー定義のロジックを挿入します。オブジェクトの作成についてはここでは説明しません。必要な場合は、Baidu で検索してください。コードは次のとおりです。
@Data public class MqMethodMeta { //调用的接口名称(包括包名,用于反射) private String interfaceName; //调用的方法名 private String methodName; //调用的方法的参数 private Object[] args; //调用的方法的参数类型 private String[] paramTypeNames; }
受信側で対応するインターフェイスを呼び出すのは非常に簡単で、コードに直接アクセスするために MqMethodMeta オブジェクトを取得するだけです:
@Slf4j @Component public class AsyncInvokerBeanProcessor implements BeanPostProcessor { //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用. private final ConcurrentMap<String, Object> proxyMap = new ConcurrentHashMap<>(); //注入spring amqp处理mq的对象 @Autowired private RabbitTemplate rabbitTemplate; //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理. @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { //获取该实例中的有@AsyncInvoker注解的field Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { try { if (!field.isAccessible()) { field.setAccessible(true); } AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class); if (asyncInvoker != null) { //创建代理对象,赋值给该feild Object value = createProxy(field.getType()); if (value != null) { field.set(bean, value); } } } catch (Throwable e) { log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e); } } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } private Object createProxy(Class clz) { String interfaceName; if (clz.isInterface()) { interfaceName = clz.getName(); } else { throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface."); } Object proxy = proxyMap.get(interfaceName); if (proxy == null) { Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.debug("执行动态代理! method:{} ,args: {}", method, args); if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) { throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法"); } //动态代理中创建mq传输对象并发送. MqMethodMeta mqMethodMeta = new MqMethodMeta(); mqMethodMeta.setInterfaceName(clz.getName()); mqMethodMeta.setMethodName(method.getName()); mqMethodMeta.setArgs(args); String[] paramTypeNames = new String[args.length]; for (int i = 0; i < args.length; i++) { paramTypeNames[i] = args[i].getClass().getName(); } mqMethodMeta.setParamTypeNames(paramTypeNames); RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory()); Exchange exchange = new TopicExchange("exchange.demo.web.adaptor"); admin.declareExchange(exchange); //关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta); return null; } }); proxyMap.putIfAbsent(interfaceName, newProxy); proxy = proxyMap.get(interfaceName); } return proxy; } }
4。複数のコンシューマー サービスを区別するにはどうすればよいですか?
@Slf4j public class AsyncMethodListener implements ApplicationContextAware { private ApplicationContext applicationContext; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" )) public void messageHandle(@Payload MqMethodMeta message) { try { log.info("收到message: {}", message); Class clz = Class.forName(message.getInterfaceName()); String methodName = message.getMethodName(); Object[] args = message.getArgs(); Class[] paramTypes = new Class[message.getParamTypeNames().length]; for (int i = 0; i < message.getParamTypeNames().length; i++) { paramTypes[i] = Class.forName(message.getParamTypeNames()[i]); } //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下 for (int i = 0; i < args.length; i++) { Class c = paramTypes[i]; if (args[i] instanceof Integer && c.equals(Long.class)) { args[i] = ((Integer) args[i]).longValue(); } } //拿到spring管理的对应接口的实现 Object invoker = applicationContext.getBean(clz); Method method = clz.getMethod(methodName, paramTypes); method.invoke(invoker, args); } catch (Exception e) { e.printStackTrace(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }は、設定ファイル:たとえば、システム 1 の設定は次のとおりです:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" ))
${demo.mq.method.queue} ${demo.mq.method.routekey}
送信側のコードを見てください:
demo.mq.method.queue=com.demo.service.project1.# demo.mq.method.routekey=com.demo.service.project1.#
ここで clz.getName()私たちのシステムには優れた下請け戦略があるため、システム 1 の clz.getName() は com.demo.service.project1 で始まる必要があります。たとえば、project1 の値がリスナーに送信されます。 clz.getName() は com.demo .service.project1.MqDemoService (「.#」は次の複数の識別子と一致します。これは、rabbitMQ のトピック タイプ交換の機能です)。
この時点で、私たちが望んでいた目標は、冒頭の達成は達成されました。今後は、mq を使用して非同期呼び出しを行う必要があります。同期メソッドのように使用できます。
Spring での mq の使用方法については、ここでは詳しく説明しません。ドキュメント:
demo.mq.method.queue=com.demo.service.project2.# demo.mq.method.routekey=com.demo.service.project2.#
一連のデモ コードは、記録と参照用に後で提供されます
概要
この方法にはまだいくつかの問題があります。例:
//关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
最初の質問は、対応するコードを使用した後に検討する必要があります。 2 番目の質問は主に、コンシューマがパラメータのタイプやその他の状況を変更した場合、再公開後に古いメッセージと互換性があるかどうかです。現時点ではこれに関する良いアイデアはありません。これは単なるブレインストーミング用です。
以上がダボをシミュレートし、MQ 非同期呼び出しを行うには、rabbit mq を使用します。の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。