java Basic Tutorial column will show you how to conveniently count order income.
Statistical order income is to do e-commerce This is a common problem for APPs of this type. The regular requirements generally include user benefitsdaily/monthly/annual reports
. These report-type data pose considerable challenges to table design and program design. Conventionalaggregation queries
The query time of the statement will gradually become longer as the income statement data becomes larger and larger. At this time, we need to think about how to design the income statement to query more efficiently? What kind of design can make statistical income simple?
Daily income report . A single row record is written to the user's income for the day.
Reduce Query the user's day/month/ The amount of data when calculating annual income. Taking a single user as an example, splitting users will only generate up to
31 pieces of data in a month. This is a controllable growth rate. If the income statement is used, because the data in the income statement The amount corresponds one-to-one with the number of orders placed by the user. If the amount of orders placed by the user is large, the table will be very large. In the early stage when the number of users begins to increase, this method can be used to avoid large data volume statistics. In the later period, if the number of users increases, the daily If the report data increases, you can consider dividing the table.
Summarized the above problems. I started data collection. Finally, I used canal
RocketMQ
As a heterogeneous solution.
Let’s briefly introduce this Two technical frameworks:
Note: I use aliyun's Family Bucket, MQ and mysql are both from Alibaba Cloud. If it is a self-built server, there may be differences. I will try my best to mark it later.
conf: stores core configuration files
We need to focus on the conf/canal.properties core configuration file in the conf folder and the conf/example/instance.properties single monitoring node configuration file
# tcp, kafka, RocketMQ,这里默认是tcp读取模式,采用RocketMQ需要将其改变为RocketMQ模式 canal.serverMode = RocketMQ # 如果是aliyun的RocketMQ需要配置以下两个KEY,ak/sk canal.aliyun.accessKey =xxxxxxx canal.aliyun.secretKey =xxxxxxx # 监控的节点名称.这个默认就是example如果有多节点可以逗号隔开,如下方的例子 canal.destinations = example,sign # 如果是aliyun的RocketMQ需要修改canal.mq.accessChannel为cloud默认为local canal.mq.accessChannel = cloud #MQ的地址,需要注意这里是不带http://,但是需要带端口号 canal.mq.servers = #rocketmq实例id canal.mq.namespace =
#mysql地址 canal.instance.master.address= #以下两个参数需要在开启数据库binlog日志后得到,在数据库查询界面输入查询语句`show master status`,canal.instance.master.journal.name对应File参数,canal.instance.master.position对应Position参数 canal.instance.master.journal.name= canal.instance.master.position= #数据库的账号密码 canal.instance.dbUsername= canal.instance.dbPassword= #需要监控变动的表 canal.instance.filter.regex=xxx.t_user_order,xxx.t_user_cash_out #定义发送的mq生产组 canal.mq.producerGroup = #定义发送到mq的指定主题 canal.mq.topic=
Note: For the writing rule format of the monitoring table, refer to the writing rules of the monitoring table
cd /canal/bin ./start.sh
At this time, you will find that there is an additional log file in the canal directory. When you enter, you can see the canal main log file and the example node startup log.
canal日志中出现 the canal server is running now ...... example日志中出现 init table filter : ^tablename xxxxxxxxx , the next step is binlog dump
means that you have succeeded. A big step forward, canal monitoring is running normally.
如果用的aliyun的RocketMQ,配置代码部分直接可参考文档 自建的RocketMQ也可参照简单的消费例子监控对应的TOPIC即可 消费Canal发来的数据,格式如下:
{ "data":[ { //单个修改后表数据,如果同一时间有多个表变动会有多个该JSON对象 } ], "database":"监控的表所在数据库", "es":表变动时间, "id":canal生成的id, "isDdl":Boolean类型,表示是否DDL语句, "mysqlType":{ 表结构 }, "old":如果是修改类型会填充修改前的值, "pkNames":[ 该表的主键,如"id" ], "sql":"执行的SQL", "sqlType":{ 字段对应的sqlType,一般使用mysqlType即可 }, "table":"监控的表名", "ts":canal记录发送时间, "type":"表的修改类型,入INSERT,UPDATE,DELETE" }
MQ消费代码主要用了反射,映射到对应的表
//这里的body就是Canal发来的数据 public Action process(String body) { boolean result = Boolean.FALSE; JSONObject data = JSONObject.parseObject(body); log.info("数据库操作日志记录:data:{}",data.toString()); Class c = null; try { //这里监控了订单和收益表分别做订单统计和收益日报统计 c = Class.forName(getClassName(data.getString("table"))); } catch (ClassNotFoundException e) { log.error("error {}",e); } if (null != c) { JSONArray dataArray = data.getJSONArray("data"); if (dataArray != null) { //把获取到的data部分转换为反射后的实体集合 List list = dataArray.toJavaList(c); if (CollUtil.isNotEmpty(list)) { //对修改和写入操作分别进行逻辑操作 String type = data.getString("type"); if ("UPDATE".equals(type)) { result = uppHistory(list); } else if ("INSERT".equals(type)) { result = saveHistory(list); } } } } return result ? Action.CommitMessage : Action.ReconsumeLater; } /** * @description: 获取反射ClassName * @author: chenyunxuan */ private String getClassName(String tableName) { StringBuilder sb = new StringBuilder(); //判断是哪张表的数据 if (tableName.equals("t_user_income_detail")) { sb.append("cn.mc.core.model.order"); } else if (tableName.equals("t_user_cash_out")) { sb.append("cn.mc.sync.model"); } String className = StrUtil.toCamelCase(tableName).substring(1); return sb.append(".").append(className).toString(); } /** * @description: 写入对应类型的统计表 * @author: chenyunxuan */ private <T> Boolean saveHistory(List<T> orderList) { boolean result = Boolean.FALSE; Object dataType = orderList.get(0); //用instanceof判断类型进入不同的逻辑处理代码 if (dataType instanceof TUserIncomeDetail) { result = userOrderHistoryService.saveIncomeDaily(orderList); } else if (dataType instanceof UserCashOut) { result = userCashOutHistoryService.delSaveHistoryList(orderList); } return result; }
saveIncomeDaily伪代码
public synchronized Boolean saveIncomeDaily(List orderList) { //循环收益明细记录 ....... //通过创建时间和用户id查询收益日报表中是否有当日数据 if(不存在当日数据){ //创建当日的收益日报表记录 ..... } //因为不存在当日记录也会立即写入当日的空数据,所以下面的流程都是走更新流程 //更新当日数据 ....... return Boolean.TRUE; }
注:代码中应该多打一些日志,方便产生异常收益数据后的校对
至此一个基于canal
+RocketMQ
的收益日报统计异构方案就完成了,下一篇会围绕本文提到的第二个问题减少聚合SQL的产生展开.敬请关注.
相关免费学习推荐:java基础教程
The above is the detailed content of Convenient statistical order income (1). For more information, please follow other related articles on the PHP Chinese website!