>  기사  >  Java  >  MQ 메시지를 보내고 받기 위해 HTTP 프로토콜을 구현하는 Java 코드 예제

MQ 메시지를 보내고 받기 위해 HTTP 프로토콜을 구현하는 Java 코드 예제

Y2J
Y2J원래의
2017-05-02 13:59:392531검색

이 글에서는 주로 HTTP 프로토콜을 사용하여 Java 환경에서 MQ 메시지를 주고받는 방법을 예제 코드를 통해 자세히 소개합니다. 필요한 친구들이 참고할 수 있도록

1.

프로젝트 POM 파일에 HTTP Java 클라이언트 종속성을 추가합니다.

<dependency>
  <groupId>org.eclipse.jetty</groupId>
  <artifactId>jetty-client</artifactId>
  <version>9.3.4.RC1</version>
 </dependency>  
 <dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>ons-client</artifactId>
  <version>1.1.11</version>
 </dependency>

2. 코드 구성(user.properties) 실행

구성 파일(user.properties)에 해당 내용을 설정해야 합니다. 자세한 내용은 애플리케이션 MQ 리소스를 참조하세요.

#您在控制台创建的Topic
Topic=xxx
#公测url
URL=http://publictest-rest.ons.aliyun.com
#阿里云身份验证码
Ak=xxx
#阿里云身份验证密钥
Sk=xxx
#MQ控制台创建的Producer ID
ProducerID=xxx
#MQ控制台创建的Consumer ID
ConsumerID=xxx

참고: URL의 키, 태그 및 POST 콘텐츠 유형에는 제한이 없습니다. 키와 태그가 동일하고 고유하다면 user.properties에 배치할 수 있습니다.

3. HTTP 메시지 전송 샘플 코드

您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。

package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Properties;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.StringContentProvider;
import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
public class HttpProducer {
  public static String SIGNATURE="Signature";
  public static String NUM="num";
  public static String CONSUMERID="ConsumerID";
  public static String PRODUCERID="ProducerID";
  public static String TIMEOUT="timeout";
  public static String TOPIC="Topic";
  public static String AK="AccessKey";
  public static String BODY="body"; 
  public static String MSGHANDLE="msgHandle";
  public static String TIME="time";
  public static void main(String[] args) throws Exception {
    HttpClient httpClient=new HttpClient(); 
    httpClient.setMaxConnectionsPerDestination(1);
    httpClient.start(); 
    Properties properties=new Properties();
    properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));
    String topic=properties.getProperty("Topic"); //请在user.properties配置您的Topic
    String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
    String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
    String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
    String pid=properties.getProperty("ProducerID");//请在user.properties配置您的Producer ID
    String date=String.valueOf(new Date().getTime()); 
    String sign=null;
    String body="hello ons http";
    String NEWLINE="\n";
    String signString;
    for (int i = 0; i < 10; i++) {
      date=String.valueOf(new Date().getTime());
      Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");
      ContentProvider content=new StringContentProvider(body);
      req.content(content);
      signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;
      System.out.println(signString);
      sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
      req.header(SIGNATURE, sign);
      req.header(AK, ak);
      req.header(PRODUCERID, pid);
      ContentResponse response;
      response=req.send();
      System.out.println("send msg:"+response.getStatus()+response.getContentAsString());
    } 
  }
}

4. HTTP 메시지 수신 샘플 코드

를 클릭하세요. 다음 지침에서는 해당 매개변수를 설정하고 HTTP 메시지 수신 기능을 테스트합니다.

package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservice.ons.mqtt.demo.MqttProducer;
import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
public class HttpConsumer {
  public static String SIGNATURE="Signature";
  public static String NUM="num";
  public static String CONSUMERID="ConsumerID";
  public static String PRODUCERID="ProducerID";
  public static String TIMEOUT="timeout";
  public static String TOPIC="Topic";
  public static String AK="AccessKey";
  public static String BODY="body"; 
  public static String MSGHANDLE="msgHandle";
  public static String TIME="time";
  public static void main(String[] args) throws Exception {
    HttpClient httpClient=new HttpClient(); 
    httpClient.setMaxConnectionsPerDestination(1);
    httpClient.start(); 
    Properties properties=new Properties();
    properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));
    String topic=properties.getProperty("Topic"); //请在user.properties配置您的topic
    String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
    String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
    String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
    String cid=properties.getProperty("ConsumerID");//请在user.properties配置您的Consumer ID
    String date=String.valueOf(new Date().getTime()); 
    String sign=null;
    String NEWLINE="\n";
    String signString;
    System.out.println(NEWLINE+NEWLINE);
    while (true) { 
      try {
        date=String.valueOf(new Date().getTime());
        Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);
        req.method(HttpMethod.GET);
        ContentResponse response;
        signString=topic+NEWLINE+cid+NEWLINE+date;
        sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
        req.header(SIGNATURE, sign);
        req.header(AK, ak);
        req.header(CONSUMERID, cid);
        long start=System.currentTimeMillis();
        response=req.send();
        System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000 
                  +"  "+response.getStatus()+"  "+response.getContentAsString()); 
        List<SimpleMessage> list = null;
        if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) {
           list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class);
        }
        if (list==null||list.size()==0) {
          Thread.sleep(100);
          continue;
        } 
        System.out.println("size is :"+list.size());
        for (SimpleMessage simpleMessage : list) {
          date=String.valueOf(new Date().getTime());
          System.out.println("receive msg:"+simpleMessage.getBody()+"  born time "+simpleMessage.getBornTime());
          req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date);
          req.method(HttpMethod.DELETE);
          signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date;
          sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
          req.header(SIGNATURE, sign);
          req.header(AK, ak);
          req.header(CONSUMERID, cid);
          response=req.send();  
          System.out.println("delete msg:"+response.toString());
        } 
        Thread.sleep(100);
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
}

5. HTTP 샘플 프로그램 도구 클래스

(1) 메시지 캡슐화 클래스: SimpleMessage.java

package com.aliyun.openservice.ons.http.demo;
public class SimpleMessage {
  private String body;
  private String msgId;
  private String bornTime;
  private String msgHandle;
  private int reconsumeTimes;
  private String tag;
  public void setTag(String tag) {
    this.tag = tag;
  }
  public String getTag() {
    return tag;
  }
  public int getReconsumeTimes() {
    return reconsumeTimes;
  }
  public void setReconsumeTimes(int reconsumeTimes) {
    this.reconsumeTimes = reconsumeTimes;
  }
  public void setMsgHandle(String msgHandle) {
    this.msgHandle = msgHandle;
  }
  public String getMsgHandle() {
    return msgHandle;
  }
  public String getBody() {
    return body;
  }
  public void setBody(String body) {
    this.body = body;
  }
  public String getMsgId() {
    return msgId;
  }
  public void setMsgId(String msgId) {
    this.msgId = msgId;
  }
  public String getBornTime() {
    return bornTime;
  }
  public void setBornTime(String bornTime) {
    this.bornTime = bornTime;
  }
}

(2) 문자열 서명 클래스: MD5.java

package com.aliyun.openservice.ons.http.demo;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.LoggerFactory;
public class MD5 {
  private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class);
  private static char[] digits = { &#39;0&#39;, &#39;1&#39;, &#39;2&#39;, &#39;3&#39;, &#39;4&#39;, &#39;5&#39;, &#39;6&#39;, &#39;7&#39;, &#39;8&#39;, &#39;9&#39;, &#39;a&#39;, &#39;b&#39;, &#39;c&#39;, &#39;d&#39;, &#39;e&#39;, &#39;f&#39; };
  private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>(16);
  static {
    for (int i = 0; i < digits.length; ++i) {
      rDigits.put(digits[i], i);
    }
  }
  private static MD5 me = new MD5();
  private MessageDigest mHasher;
  private final ReentrantLock opLock = new ReentrantLock();
  private MD5() {
    try {
      this.mHasher = MessageDigest.getInstance("md5");
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
  public static MD5 getInstance() {
    return me;
  }
  public String getMD5String(String content) {
    return this.bytes2string(this.hash(content));
  }
  public String getMD5String(byte[] content) {
    return this.bytes2string(this.hash(content));
  }
  public byte[] getMD5Bytes(byte[] content) {
    return this.hash(content);
  }
  public byte[] hash(String str) {
    this.opLock.lock();
    try {
      byte[] bt = this.mHasher.digest(str.getBytes("utf-8"));
      if (null == bt || bt.length != 16) {
        throw new IllegalArgumentException("md5 need");
      }
      return bt;
    } catch (UnsupportedEncodingException e) {
      throw new RuntimeException("unsupported utf-8 encoding", e);
    } finally {
      this.opLock.unlock();
    }
  }
  public byte[] hash(byte[] data) {
    this.opLock.lock();
    try {
      byte[] bt = this.mHasher.digest(data);
      if (null == bt || bt.length != 16) {
        throw new IllegalArgumentException("md5 need");
      }
      return bt;
    } finally {
      this.opLock.unlock();
    }
  }
  public String bytes2string(byte[] bt) {
    int l = bt.length;
    char[] out = new char[l << 1];
    for (int i = 0, j = 0; i < l; i++) {
      out[j++] = digits[(0xF0 & bt[i]) >>> 4];
      out[j++] = digits[0x0F & bt[i]];
    }
    if (log.isDebugEnabled()) {
      log.debug("[hash]" + new String(out));
    }
    return new String(out);
  }
  public byte[] string2bytes(String str) {
    if (null == str) {
      throw new NullPointerException("Argument is not allowed empty");
    }
    if (str.length() != 32) {
      throw new IllegalArgumentException("String length must equals 32");
    }
    byte[] data = new byte[16];
    char[] chs = str.toCharArray();
    for (int i = 0; i < 16; ++i) {
      int h = rDigits.get(chs[i * 2]).intValue();
      int l = rDigits.get(chs[i * 2 + 1]).intValue();
      data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F);
    }
    return data;
  }
}

이 기사가 도움이 되길 바랍니다

위 내용은 MQ 메시지를 보내고 받기 위해 HTTP 프로토콜을 구현하는 Java 코드 예제의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.