우리는 conf/canal.properties 핵심 구성 파일과 conf/example/에 집중해야 합니다. conf 폴더 instance.properties 단일 모니터링 노드 구성 파일
conf/canal.properties
# tcp, kafka, RocketMQ,这里默认是tcp读取模式,采用RocketMQ需要将其改变为RocketMQ模式
canal.serverMode = RocketMQ
# 如果是aliyun的RocketMQ需要配置以下两个KEY,ak/sk
canal.aliyun.accessKey =xxxxxxx
canal.aliyun.secretKey =xxxxxxx
# 监控的节点名称.这个默认就是example如果有多节点可以逗号隔开,如下方的例子
canal.destinations = example,sign
# 如果是aliyun的RocketMQ需要修改canal.mq.accessChannel为cloud默认为local
canal.mq.accessChannel = cloud
#MQ的地址,需要注意这里是不带http://,但是需要带端口号
canal.mq.servers =
#rocketmq实例id
canal.mq.namespace =
conf/example/instance.properties
#mysql地址
canal.instance.master.address=
#以下两个参数需要在开启数据库binlog日志后得到,在数据库查询界面输入查询语句`show master status`,canal.instance.master.journal.name对应File参数,canal.instance.master.position对应Position参数
canal.instance.master.journal.name=
canal.instance.master.position=
#数据库的账号密码
canal.instance.dbUsername=
canal.instance.dbPassword=
#需要监控变动的表
canal.instance.filter.regex=xxx.t_user_order,xxx.t_user_cash_out
#定义发送的mq生产组
canal.mq.producerGroup =
#定义发送到mq的指定主题
canal.mq.topic=
참고: 쓰기 규칙 형식의 경우 모니터링 테이블은 모니터링 테이블 작성 규칙을 참조하세요
Startup
cd /canal/bin
./start.sh
이때 canal 디렉토리에 추가 로그 파일을 찾아 들어가시면 운하 메인 로그 파일을 보실 수 있습니다. 예시 노드 시작 로그입니다.
canal日志中出现
the canal server is running now ......
example日志中出现
init table filter : ^tablename
xxxxxxxxx , the next step is binlog dump
는 성공했다는 의미입니다. 큰 발전을 이루었으며 이제 운하 모니터링이 정상적으로 실행되고 있습니다.
RocketMQ部分
如果用的aliyun的RocketMQ,配置代码部分直接可参考文档
自建的RocketMQ也可参照简单的消费例子监控对应的TOPIC即可
消费Canal发来的数据,格式如下:
{
"data":[
{
//单个修改后表数据,如果同一时间有多个表变动会有多个该JSON对象 }
],
"database":"监控的表所在数据库",
"es":表变动时间,
"id":canal生成的id,
"isDdl":Boolean类型,表示是否DDL语句,
"mysqlType":{
表结构
},
"old":如果是修改类型会填充修改前的值,
"pkNames":[
该表的主键,如"id"
],
"sql":"执行的SQL",
"sqlType":{
字段对应的sqlType,一般使用mysqlType即可
},
"table":"监控的表名",
"ts":canal记录发送时间,
"type":"表的修改类型,入INSERT,UPDATE,DELETE"
}
MQ消费代码主要用了反射,映射到对应的表
//这里的body就是Canal发来的数据
public Action process(String body) {
boolean result = Boolean.FALSE;
JSONObject data = JSONObject.parseObject(body);
log.info("数据库操作日志记录:data:{}",data.toString());
Class c = null;
try {
//这里监控了订单和收益表分别做订单统计和收益日报统计
c = Class.forName(getClassName(data.getString("table")));
} catch (ClassNotFoundException e) {
log.error("error {}",e);
}
if (null != c) {
JSONArray dataArray = data.getJSONArray("data");
if (dataArray != null) {
//把获取到的data部分转换为反射后的实体集合
List list = dataArray.toJavaList(c);
if (CollUtil.isNotEmpty(list)) {
//对修改和写入操作分别进行逻辑操作
String type = data.getString("type");
if ("UPDATE".equals(type)) {
result = uppHistory(list);
} else if ("INSERT".equals(type)) {
result = saveHistory(list);
}
}
}
}
return result ? Action.CommitMessage : Action.ReconsumeLater;
}
/**
* @description: 获取反射ClassName
* @author: chenyunxuan
*/
private String getClassName(String tableName) {
StringBuilder sb = new StringBuilder();
//判断是哪张表的数据
if (tableName.equals("t_user_income_detail")) {
sb.append("cn.mc.core.model.order");
} else if (tableName.equals("t_user_cash_out")) {
sb.append("cn.mc.sync.model");
}
String className = StrUtil.toCamelCase(tableName).substring(1);
return sb.append(".").append(className).toString();
}
/**
* @description: 写入对应类型的统计表
* @author: chenyunxuan
*/
private <T> Boolean saveHistory(List<T> orderList) {
boolean result = Boolean.FALSE;
Object dataType = orderList.get(0);
//用instanceof判断类型进入不同的逻辑处理代码
if (dataType instanceof TUserIncomeDetail) {
result = userOrderHistoryService.saveIncomeDaily(orderList);
} else if (dataType instanceof UserCashOut) {
result = userCashOutHistoryService.delSaveHistoryList(orderList);
}
return result;
}
saveIncomeDaily伪代码
public synchronized Boolean saveIncomeDaily(List orderList) {
//循环收益明细记录
.......
//通过创建时间和用户id查询收益日报表中是否有当日数据
if(不存在当日数据){
//创建当日的收益日报表记录
.....
}
//因为不存在当日记录也会立即写入当日的空数据,所以下面的流程都是走更新流程
//更新当日数据
.......
return Boolean.TRUE;
}
注:代码中应该多打一些日志,方便产生异常收益数据后的校对
后记
至此一个基于canal
+RocketMQ
的收益日报统计异构方案就完成了,下一篇会围绕本文提到的第二个问题减少聚合SQL的产生展开.敬请关注.
相关免费学习推荐:java基础教程