search
HomeJavajavaTutorialUse rabbit mq. to simulate dubbo and make MQ asynchronous calls

Use rabbit mq. to simulate dubbo and make MQ asynchronous calls

Jun 26, 2017 am 11:22 AM
dubboRabbituseasynchronoussimulation

Recently, when I was renovating an old system, I encountered a scenario where rabbitMq was needed. In the previous process, various configurations were required on the sending and consuming ends, which seemed troublesome. Then I suddenly thought of the form of @Reference annotation in dubbo. Can you make a similar shelf so that calling MQ is as convenient and simple as calling the synchronous interface? So I checked the relevant information and read the source code of dubbo, and then I got an idea.

In general Said, the goal to be achieved is like dubbo, the consumer side exposes the interface (you can even reuse the interface defined by dubbo service, so that writing a dubbo service can be synchronous or MQ asynchronous), and the sending side injects object calls through custom annotations Method, after internal processing within the framework, it is converted into asynchronous mq form and sent to the consumer.

For example, the server has an interface:

public interface MqDemoService {
    void dealById(Long id);
}

and has an implementation:

@Slf4j
@Component("mqDemoServiceImpl")
@Service(version = "1.0.0")
public class MqDemoServiceImpl implements MqDemoService {
    @Override
    public void dealById(Long id) {
        log.info("执行findById方法");
    }
}

Among them:

@Slf4j是lombok注解
@Service是dubbo服务端注解

Interested students can check it by themselves

Then the sending end

has a custom annotation:

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface AsyncInvoker {
}

So in the called controller:

@Slf4j
@Controller
public class MqDemoController {
  @AsyncInvoker
  private MqDemoService mqDemoService;

  @RequestMapping(value = "/deal", method = RequestMethod.POST)
  public void deal() {
    mqDemoService.dealById(1L);
  }
}

Pay attention to the attribute mqDemoService annotated by @AsyncInvoker in the Controller. When the object injected through this annotation calls the method, it will be sent through mq and become an asynchronous call.

Okay, the goal to be achieved is very clear. So The problems to be solved are the following aspects:

1,如何确定发送消息的格式,使消费端可以确定调用的方法
2,发送端中如何为注解@AsyncInvoker注释的对象注入实例
3,接收端中如何在接收到消息后调用对应接口的实现方法
4,多个消费服务如何区分mq队列.

1. How to determine the format of sending messages so that the receiving end can determine the calling method

Here I first follow the java reflection call required The parameter simply defines a transmission object:

@Data
public class MqMethodMeta {
  //调用的接口名称(包括包名,用于反射)
  private String interfaceName;
  //调用的方法名
  private String methodName;
  //调用的方法的参数
  private Object[] args;
  //调用的方法的参数类型
  private String[] paramTypeNames;
}

2, how to inject an instance into the object annotated with @AsyncInvoker in the sender

In this scenario, the sender will only introduce the interface of the consumer , will not introduce the implementation. So how does @AsyncInvoker inject objects?

The answer is dynamic proxy.

So how do you let Spring know that the object annotated with @AsyncInvoker needs to be injected with dynamic proxy?

The answer is spring's BeanPostProcessor interface! This interface allows spring to insert user-defined logic before and after processing object creation. I will not expand it in detail here. Students who need it can google/Baidu it. .

Then the idea comes out, the code is as follows:

@Slf4j
@Component
public class AsyncInvokerBeanProcessor implements BeanPostProcessor {
  //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用.
  private final ConcurrentMap<string> proxyMap = new ConcurrentHashMap();

  //注入spring amqp处理mq的对象
  @Autowired
  private RabbitTemplate rabbitTemplate;

  //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理.
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    //获取该实例中的有@AsyncInvoker注解的field
    Field[] fields = bean.getClass().getDeclaredFields();
    for (Field field : fields) {
      try {
        if (!field.isAccessible()) {
          field.setAccessible(true);
        }
        AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class);
        if (asyncInvoker != null) {
          //创建代理对象,赋值给该feild
          Object value = createProxy(field.getType());
          if (value != null) {
            field.set(bean, value);
          }
        }
      } catch (Throwable e) {
        log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e);
      }
    }
    return bean;
  }

  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    return bean;
  }

  private Object createProxy(Class clz) {
    String interfaceName;
    if (clz.isInterface()) {
      interfaceName = clz.getName();
    } else {
      throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface.");
    }

    Object proxy = proxyMap.get(interfaceName);
    if (proxy == null) {
      Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
          log.debug("执行动态代理! method:{} ,args: {}", method, args);
          if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) {
            throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法");
          }
          //动态代理中创建mq传输对象并发送.
          MqMethodMeta mqMethodMeta = new MqMethodMeta();
          mqMethodMeta.setInterfaceName(clz.getName());
          mqMethodMeta.setMethodName(method.getName());
          mqMethodMeta.setArgs(args);
          String[] paramTypeNames = new String[args.length];
          for (int i = 0; i <h2 id="how-to-call-the-implementation-method-of-the-corresponding-interface-after-receiving-the-message-in-the-receiving-end">3, how to call the implementation method of the corresponding interface after receiving the message in the receiving end</h2>
<p>The receiving end calls the corresponding interface It's very simple. You just need to get the MqMethodMeta object and make a reflection call. Go directly to the code: </p>
<pre class="brush:php;toolbar:false">@Slf4j
public class AsyncMethodListener implements ApplicationContextAware {
  private ApplicationContext applicationContext;

  @RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${demo.mq.method.queue}", durable = "true"),
      exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"),
      key = "${demo.mq.method.routekey}"
  ))
  public void messageHandle(@Payload MqMethodMeta message) {
    try {
      log.info("收到message: {}", message);
      Class clz = Class.forName(message.getInterfaceName());
      String methodName = message.getMethodName();
      Object[] args = message.getArgs();
      Class[] paramTypes = new Class[message.getParamTypeNames().length];
      for (int i = 0; i <h2 id="how-to-distinguish-mq-queues-between-multiple-consumer-services">4, how to distinguish mq queues between multiple consumer services.</h2><p>It is used here Rabbit's topic type exchange.<br>First perform configurable management of the queue and routekey in the consumer listener:</p><pre class="brush:php;toolbar:false">@RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${demo.mq.method.queue}", durable = "true"),
      exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"),
      key = "${demo.mq.method.routekey}"
  ))

Note that the

${demo.mq.method.queue}
${demo.mq.method.routekey}

here is read from the configuration file :

For example, the configuration in system 1 is as follows:

demo.mq.method.queue=com.demo.service.project1.#
demo.mq.method.routekey=com.demo.service.project1.#

The configuration in system 2 is as follows:

demo.mq.method.queue=com.demo.service.project2.#
demo.mq.method.routekey=com.demo.service.project2.#

Look at the code in the sender:

//关注此处clz.getName(),用于处理问题4
rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);

The clz.getName() here. Since our system has a good subcontracting strategy, the clz.getName() of system 1 must start with com.demo.service.project1. It will definitely be sent to listener in project1. For example, the value of clz.getName() is com.demo.service.project1.MqDemoService (".#" matches the following multiple identifiers, which is a feature of topic type exchange in rabbitMQ).

At this point, the goal we wanted to achieve at the beginning has been achieved. In the future, when you need to use mq to make asynchronous calls, you can use it like a synchronous method.

The use of mq in spring will not be listed in detail here. , you can refer to the documentation:

http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/

A set of demo codes will be provided later for recording and reference

Summary

There are still some problems in this method. For example :

1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考.
2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.

The first question should be considered after you need to use the corresponding business. Or if you have ideas, you can discuss it.

The second question is mainly considered if the consumer side When the parameter type or other conditions are changed, after re-publishing, it will be compatible with the old messages that may remain in mq. There is really no good idea for this at the moment, and it is just for brainstorming.

The above is the detailed content of Use rabbit mq. to simulate dubbo and make MQ asynchronous calls. For more information, please follow other related articles on the PHP Chinese website!

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Java compilation error: How do package declaration and access permissions change after moving the class file?Java compilation error: How do package declaration and access permissions change after moving the class file?Apr 19, 2025 pm 07:12 PM

Packages and Directories in Java: The logic behind compiler errors In Java development, you often encounter problems with packages and directories. This article will explore Java in depth...

Is JWT suitable for dynamic permission change scenarios?Is JWT suitable for dynamic permission change scenarios?Apr 19, 2025 pm 07:06 PM

JWT and Session Choice: Tradeoffs under Dynamic Permission Changes Many Beginners on JWT and Session...

How to properly configure apple-app-site-association file in pagoda nginx to avoid 404 errors?How to properly configure apple-app-site-association file in pagoda nginx to avoid 404 errors?Apr 19, 2025 pm 07:03 PM

How to correctly configure apple-app-site-association file in Baota nginx? Recently, the company's iOS department sent an apple-app-site-association file and...

What are the differences in the classification and implementation methods of the two consistency consensus algorithms?What are the differences in the classification and implementation methods of the two consistency consensus algorithms?Apr 19, 2025 pm 07:00 PM

How to understand the classification and implementation methods of two consistency consensus algorithms? At the protocol level, there has been no new members in the selection of consistency algorithms for many years. ...

What is the difference between IS TRUE and =True query conditions in MySQL?What is the difference between IS TRUE and =True query conditions in MySQL?Apr 19, 2025 pm 06:54 PM

The difference between ISTRUE and =True query conditions in MySQL In MySQL database, when processing Boolean values ​​(Booleans), ISTRUE and =TRUE...

How to avoid data overwriting and style loss of merged cells when using EasyExcel for template filling?How to avoid data overwriting and style loss of merged cells when using EasyExcel for template filling?Apr 19, 2025 pm 06:51 PM

How to avoid data overwriting and style loss of merged cells when using EasyExcel for template filling? Using EasyExcel for Excel...

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Dreamweaver Mac version

Dreamweaver Mac version

Visual web development tools

WebStorm Mac version

WebStorm Mac version

Useful JavaScript development tools

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment