Rumah  >  Artikel  >  pangkalan data  >  Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

WBOY
WBOYke hadapan
2023-05-26 20:28:311574semak imbas

Pengenalan kepada terusan

Asal usul sejarah terusan

Pada zaman awal, Syarikat Alibaba telah ditubuhkan di Hangzhou dan Amerika Syarikat. Contoh pangkalan data digunakan di semua bilik komputer, tetapi disebabkan keperluan perniagaan untuk menyegerakkan data merentas bilik komputer, terusan dilahirkan, yang terutamanya berdasarkan pencetus untuk mendapatkan perubahan tambahan. Bermula pada tahun 2010, Alibaba mula mencuba analisis log pangkalan data secara beransur-ansur untuk mendapatkan data yang diubah secara berperingkat untuk penyegerakan, yang mengakibatkan perniagaan langganan dan penggunaan tambahan.

Versi mysql sumber data semasa yang disokong oleh terusan termasuk: 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x.

senario aplikasi terusan

Perniagaan pada masa ini secara amnya berdasarkan langganan dan penggunaan tambahan log terutamanya termasuk:

  1. Berdasarkan Pangkalan Data analisis log tambahan, menyediakan langganan dan penggunaan data tambahan

  2. Sandaran masa nyata pangkalan data pencerminan pangkalan data

  3. Pembinaan indeks dan penyelenggaraan masa nyata (split Indeks heterogen, indeks terbalik, dsb.)

  4. Pemuatan semula cache perniagaan

  5. Pemprosesan data tambahan dengan logik perniagaan

  6. Prinsip kerja canal

Sebelum memperkenalkan prinsip canal, mari kita fahami prinsip replikasi master-slave mysql.

prinsip replikasi master-slave mysql

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

  • mysql master menulis operasi perubahan data ke dalam binari log binari Dalam log, kandungan yang direkodkan dipanggil peristiwa log binari, yang boleh dilihat melalui arahan acara binlog show

  • hamba mysql akan menyalin peristiwa log binari dalam log binari induk ke Its log geganti log geganti

  • hamba mysql membaca semula dan melaksanakan acara dalam log geganti, memetakan perubahan data kepada jadual pangkalan datanya sendiri

Memahami kerja prinsip mysql, kita boleh meneka secara kasar bahawa kanal juga harus menggunakan logik yang sama untuk melaksanakan fungsi langganan data tambahan Kemudian mari kita lihat bagaimana saluran sebenarnya berfungsi?

Bagaimana canal berfungsi

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

  • canal mensimulasikan protokol interaksi hamba mysql dan menyamar sebagai hamba mysql . Hantar protokol dump ke mysql master

  • mysql master menerima permintaan dump dan mula menolak log binari ke slave (iaitu terusan)

  • objek log binari analisis terusan (data ialah aliran bait)

Berdasarkan prinsip dan kaedah ini, adalah mungkin untuk melengkapkan pemerolehan dan analisis log tambahan pangkalan data, menyediakan langganan dan penggunaan data tambahan , dan merealisasikan keupayaan pemindahan data tambahan masa nyata mysql.

Memandangkan canal adalah rangka kerja sedemikian dan ditulis dalam bahasa java tulen, maka kami akan mula belajar cara menggunakannya dan mengaplikasikannya pada kerja sebenar kami.

persediaan persekitaran docker canal

Disebabkan populariti semasa teknologi kontena, artikel ini menggunakan docker untuk membina persekitaran pembangunan dengan cepat dan cara tradisional membina persekitaran adalah dalam kami Selepas mempelajari cara membina persekitaran kontena buruh pelabuhan, anda juga boleh berjaya membinanya sendiri. Oleh kerana artikel ini menerangkan tentang terusan, ia tidak akan membincangkan terlalu banyak tentang docker. Ia terutamanya akan memperkenalkan konsep asas dan penggunaan arahan docker. Jika anda ingin berkomunikasi dengan lebih ramai pakar teknologi kontena, anda boleh menambahkan saya di WeChat liyingjiese dan menyatakan "Tambah kumpulan". Kumpulan ini mengandungi amalan terbaik syarikat utama di seluruh dunia dan trend industri terkini setiap minggu.

Apa itu docker

Saya percaya bahawa kebanyakan orang telah menggunakan vmware mesin maya Apabila menggunakan vmware untuk membina persekitaran, anda hanya perlu menyediakan sistem imej biasa berjaya dipasang. Persekitaran perisian dan konfigurasi aplikasi yang selebihnya masih dikendalikan dalam mesin maya kerana kami beroperasi pada mesin tempatan Selain itu, vmware menggunakan lebih banyak sumber mesin hos, yang boleh menyebabkan mesin hos menjadi beku, dan imej sistem itu sendiri Juga mengambil terlalu banyak ruang.

Untuk memudahkan semua orang memahami dengan cepat docker, mari kita bandingkan dengan vmware untuk pengenalan Docker menyediakan platform untuk memulakan, membungkus dan menjalankan apl, yang mengasingkan apl (aplikasi) daripada asas. infrastruktur (infrastruktur). Dua konsep terpenting dalam docker ialah imej (serupa dengan imej sistem dalam vmware) dan bekas (serupa dengan sistem yang dipasang dalam vmware).

Apakah itu imej (cermin)

  • Himpunan fail dan data meta (sistem fail akar)

  • Berlapis, dan setiap lapisan boleh menambah, menukar atau memadam fail untuk menjadi imej baharu

  • Imej yang berbeza boleh berkongsi lapisan yang sama

  • Imej itu sendiri adalah baca sahaja

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Apakah itu bekas (bekas)

  • Buat daripada imej (salinan)

  • Buat lapisan bekas (boleh dibaca dan boleh ditulis) di atas lapisan imej

  • Berorientasikan objek analogi: kelas dan kejadian

  • Imej bertanggungjawab untuk penyimpanan dan pengedaran apl dan bekas bertanggungjawab untuk menjalankan apl

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Pengenalan rangkaian Docker

Docker mempunyai tiga jenis rangkaian:

  • jambatan: rangkaian jambatan. Secara lalai, bekas docker mula menggunakan bridge, rangkaian jambatan yang dibuat semasa pemasangan docker Setiap kali bekas docker dimulakan semula, alamat IP yang sepadan akan diperolehi mengikut urutan Ini akan menyebabkan alamat IP docker berubah selepas dimulakan semula.

  • tiada: Tiada rangkaian yang ditentukan. Menggunakan --network=none, bekas docker tidak akan menetapkan IP LAN.

  • hos: rangkaian hos. Jika --network=host digunakan, bekas Docker akan berkongsi rangkaian dengan hos dan kedua-duanya boleh berkomunikasi antara satu sama lain. Apabila menjalankan perkhidmatan web mendengar pada port 8080 dalam bekas, bekas itu secara automatik dipetakan ke port 8080 hos.

Buat rangkaian tersuai: (tetapkan ip tetap)

docker network create --subnet=172.18.0.0/16 mynetwork

Lihat ls rangkaian docker jenis rangkaian sedia ada:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Bina persekitaran terusan

Lampirkan alamat muat turun dan pemasangan docker ==> muat turun docker .

Muat turun imej terusandocker pull canal/canal-server:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Muat turun imej mysql docker pull mysql Yang dimuat turun adalah seperti yang ditunjukkan di bawah:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Lihat imej docker imej yang dimuat turun:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Seterusnya, jana bekas mysql dan bekas pelayan terusan melalui imej:

##生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e mysql_root_password=root mysql
##生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
## 命令介绍
--net mynetwork #使用自定义网络
--ip #指定分配ip

Lihat ps docker kontena yang berjalan dalam docker:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

pengubahsuaian konfigurasi mysql

Di atas hanyalah persediaan awal persekitaran asas, tetapi bagaimana untuk membuat terusan menyamar sebagai salve dan mendapatkan log binari dalam mysql dengan betul?

Untuk mysql binaan sendiri, anda perlu mendayakan fungsi penulisan binlog dahulu, konfigurasikan binlog-format ke mod baris, buka bin_log dengan mengubah suai fail konfigurasi mysql, gunakan find / -name my.cnf untuk mencari my.cnf, dan ubah suai kandungan fail seperti berikut:

[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=row # 选择row模式
server_id=1 # 配置mysql replaction需要定义,不要和canal的slaveid重复

Masukkan bekas mysql docker exec -it mysql bash。

Buat saluran akaun yang dipautkan ke mysql dan berikan kebenaran sebagai hamba mysql, jika anda sudah mempunyai akaun boleh terus memberikan:

mysql -uroot -proot
# 创建账号
create user canal identified by 'canal'; 
# 授予权限
grant select, replication slave, replication client on *.* to 'canal'@'%';
-- grant all privileges on *.* to 'canal'@'%' ;
# 刷新并应用
flush privileges;

Mulakan semula pangkalan data Akhir sekali, cukup uji sama ada konfigurasi my.cnf berkuat kuasa:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

show variables like 'log_bin';
show variables like 'log_bin';
show master status;

konfigurasi pelayan terusan pengubahsuaian

Masukkan bekas pelayan terusan docker exec -it canal-server bash.

Edit konfigurasi pelayan terusan vi canal-server/conf/example/instance.properties:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Untuk konfigurasi lanjut, sila rujuk ==>canal配置说明 。

Mulakan semula bekas pelayan terusan docker restart canal-server Masukkan bekas untuk melihat log permulaan:

docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Pada ketika ini, kerja persekitaran kami sudah sedia!

Tarik data dan simpannya secara serentak ke elasticsearch

Elasticsearch dalam artikel ini juga dibina berdasarkan persekitaran docker, jadi pembaca boleh melaksanakan arahan berikut:

# 下载对镜像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 创建容器并运行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine

The persekitaran sudah sedia, kini Tiba masanya untuk memulakan bahagian praktikal pengekodan kami, bagaimana untuk mendapatkan data binlog selepas analisis terusan melalui aplikasi. Pertama, kami membina aplikasi demo terusan berdasarkan but spring. Strukturnya adalah seperti yang ditunjukkan di bawah:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

student.java

package com.example.canal.study.pojo;
import lombok.data;
import java.io.serializable;
// @data 用户生产getter、setter方法
@data
public class student implements serializable {
private string id;
private string name;
private int age;
private string sex;
private string city;
}

canalconfig.java

package com.example.canal.study.common;
import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.client.canalconnectors;
import org.apache.http.httphost;
import org.elasticsearch.client.restclient;
import org.elasticsearch.client.resthighlevelclient;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.net.inetsocketaddress;
/**
* @author haha
*/
@configuration
public class canalconfig {
// @value 获取 application.properties配置中端内容
@value("${canal.server.ip}")
private string canalip;
@value("${canal.server.port}")
private integer canalport;
@value("${canal.destination}")
private string destination;
@value("${elasticsearch.server.ip}")
private string elasticsearchip;
@value("${elasticsearch.server.port}")
private integer elasticsearchport;
@value("${zookeeper.server.ip}")
private string zkserverip;
// 获取简单canal-server连接
@bean
public canalconnector canalsimpleconnector() {
 canalconnector canalconnector = canalconnectors.newsingleconnector(new inetsocketaddress(canalip, canalport), destination, "", "");
 return canalconnector;
}
// 通过连接zookeeper获取canal-server连接
@bean
public canalconnector canalhaconnector() {
 canalconnector canalconnector = canalconnectors.newclusterconnector(zkserverip, destination, "", "");
 return canalconnector;
}
// elasticsearch 7.x客户端
@bean
public resthighlevelclient resthighlevelclient() {
 resthighlevelclient client = new resthighlevelclient(
   restclient.builder(new httphost(elasticsearchip, elasticsearchport))
 );
 return client;
}
}

canaldataparser.java

Memandangkan terdapat banyak kod dalam kelas ini, bahagian yang lebih penting diekstrak dalam artikel ini boleh didapati daripada github:

public static class twotuple<a, b> {
 public final a eventtype;
 public final b columnmap;
 public twotuple(a a, b b) {
  eventtype = a;
  columnmap = b;
 }
}
public static list<twotuple<eventtype, map>> printentry(list<entry> entrys) {
 list<twotuple<eventtype, map>> rows = new arraylist<>();
 for (entry entry : entrys) {
  // binlog event的事件事件
  long executetime = entry.getheader().getexecutetime();
  // 当前应用获取到该binlog锁延迟的时间
  long delaytime = system.currenttimemillis() - executetime;
  date date = new date(entry.getheader().getexecutetime());
  simpledateformat simpledateformat = new simpledateformat("yyyy-mm-dd hh:mm:ss");
  // 当前的entry(binary log event)的条目类型属于事务
  if (entry.getentrytype() == entrytype.transactionbegin || entry.getentrytype() == entrytype.transactionend) {
   if (entry.getentrytype() == entrytype.transactionbegin) {
    transactionbegin begin = null;
    try {
     begin = transactionbegin.parsefrom(entry.getstorevalue());
    } catch (invalidprotocolbufferexception e) {
     throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e);
    }
    // 打印事务头信息,执行的线程id,事务耗时
    logger.info(transaction_format,
      new object[]{entry.getheader().getlogfilename(),
        string.valueof(entry.getheader().getlogfileoffset()),
        string.valueof(entry.getheader().getexecutetime()),
        simpledateformat.format(date),
        entry.getheader().getgtid(),
        string.valueof(delaytime)});
    logger.info(" begin ----> thread id: {}", begin.getthreadid());
    printxainfo(begin.getpropslist());
   } else if (entry.getentrytype() == entrytype.transactionend) {
    transactionend end = null;
    try {
     end = transactionend.parsefrom(entry.getstorevalue());
    } catch (invalidprotocolbufferexception e) {
     throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e);
    }
    // 打印事务提交信息,事务id
    logger.info("----------------\n");
    logger.info(" end ----> transaction id: {}", end.gettransactionid());
    printxainfo(end.getpropslist());
    logger.info(transaction_format,
      new object[]{entry.getheader().getlogfilename(),
        string.valueof(entry.getheader().getlogfileoffset()),
        string.valueof(entry.getheader().getexecutetime()), simpledateformat.format(date),
        entry.getheader().getgtid(), string.valueof(delaytime)});
   }
   continue;
  }
  // 当前entry(binary log event)的条目类型属于原始数据
  if (entry.getentrytype() == entrytype.rowdata) {
   rowchange rowchage = null;
   try {
    // 获取储存的内容
    rowchage = rowchange.parsefrom(entry.getstorevalue());
   } catch (exception e) {
    throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e);
   }
   // 获取当前内容的事件类型
   eventtype eventtype = rowchage.geteventtype();
   logger.info(row_format,
     new object[]{entry.getheader().getlogfilename(),
       string.valueof(entry.getheader().getlogfileoffset()), entry.getheader().getschemaname(),
       entry.getheader().gettablename(), eventtype,
       string.valueof(entry.getheader().getexecutetime()), simpledateformat.format(date),
       entry.getheader().getgtid(), string.valueof(delaytime)});
   // 事件类型是query或数据定义语言ddl直接打印sql语句,跳出继续下一次循环
   if (eventtype == eventtype.query || rowchage.getisddl()) {
    logger.info(" sql ----> " + rowchage.getsql() + sep);
    continue;
   }
   printxainfo(rowchage.getpropslist());
   // 循环当前内容条目的具体数据
   for (rowdata rowdata : rowchage.getrowdataslist()) {
    list<canalentry.column> columns;
    // 事件类型是delete返回删除前的列内容,否则返回改变后列的内容
    if (eventtype == canalentry.eventtype.delete) {
     columns = rowdata.getbeforecolumnslist();
    } else {
     columns = rowdata.getaftercolumnslist();
    }
    hashmap<string, object> map = new hashmap<>(16);
    // 循环把列的name与value放入map中
    for (column column: columns){
     map.put(column.getname(), column.getvalue());
    }
    rows.add(new twotuple<>(eventtype, map));
   }
  }
 }
 return rows;
}

elasticutils.java

package com.example.canal.study.common;
import com.alibaba.fastjson.json;
import com.example.canal.study.pojo.student;
import lombok.extern.slf4j.slf4j;
import org.elasticsearch.client.resthighlevelclient;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
import org.elasticsearch.action.docwriterequest;
import org.elasticsearch.action.delete.deleterequest;
import org.elasticsearch.action.delete.deleteresponse;
import org.elasticsearch.action.get.getrequest;
import org.elasticsearch.action.get.getresponse;
import org.elasticsearch.action.index.indexrequest;
import org.elasticsearch.action.index.indexresponse;
import org.elasticsearch.action.update.updaterequest;
import org.elasticsearch.action.update.updateresponse;
import org.elasticsearch.client.requestoptions;
import org.elasticsearch.common.xcontent.xcontenttype;
import java.io.ioexception;
import java.util.map;
/**
* @author haha
*/
@slf4j
@component
public class elasticutils {
@autowired
private resthighlevelclient resthighlevelclient;
/**
 * 新增
 * @param student 
 * @param index 索引
 */
public void savees(student student, string index) {
 indexrequest indexrequest = new indexrequest(index)
   .id(student.getid())
   .source(json.tojsonstring(student), xcontenttype.json)
   .optype(docwriterequest.optype.create);
 try {
  indexresponse response = resthighlevelclient.index(indexrequest, requestoptions.default);
  log.info("保存数据至elasticsearch成功:{}", response.getid());
 } catch (ioexception e) {
  log.error("保存数据至elasticsearch失败: {}", e);
 }
}
/**
 * 查看
 * @param index 索引
 * @param id _id
 * @throws ioexception
 */
public void getes(string index, string id) throws ioexception {
 getrequest getrequest = new getrequest(index, id);
 getresponse response = resthighlevelclient.get(getrequest, requestoptions.default);
 map<string, object> fields = response.getsource();
 for (map.entry<string, object> entry : fields.entryset()) {
  system.out.println(entry.getkey() + ":" + entry.getvalue());
 }
}
/**
 * 更新
 * @param student
 * @param index 索引
 * @throws ioexception
 */
public void updatees(student student, string index) throws ioexception {
 updaterequest updaterequest = new updaterequest(index, student.getid());
 updaterequest.upsert(json.tojsonstring(student), xcontenttype.json);
 updateresponse response = resthighlevelclient.update(updaterequest, requestoptions.default);
 log.info("更新数据至elasticsearch成功:{}", response.getid());
}
/**
 * 根据id删除数据
 * @param index 索引
 * @param id _id
 * @throws ioexception
 */
public void deletees(string index, string id) throws ioexception {
 deleterequest deleterequest = new deleterequest(index, id);
 deleteresponse response = resthighlevelclient.delete(deleterequest, requestoptions.default);
 log.info("删除数据至elasticsearch成功:{}", response.getid());
}
}

binlogelasticsearch.java

package com.example.canal.study.action;
import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.protocol.canalentry;
import com.alibaba.otter.canal.protocol.message;
import com.example.canal.study.common.canaldataparser;
import com.example.canal.study.common.elasticutils;
import com.example.canal.study.pojo.student;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.stereotype.component;
import java.io.ioexception;
import java.util.list;
import java.util.map;
/**
* @author haha
*/
@slf4j
@component
public class binlogelasticsearch {
@autowired
private canalconnector canalsimpleconnector;
@autowired
private elasticutils elasticutils;
//@qualifier("canalhaconnector")使用名为canalhaconnector的bean
@autowired
@qualifier("canalhaconnector")
private canalconnector canalhaconnector;
public void binlogtoelasticsearch() throws ioexception {
 opencanalconnector(canalhaconnector);
 // 轮询拉取数据
 integer batchsize = 5 * 1024;
 while (true) {
  message message = canalhaconnector.getwithoutack(batchsize);
//   message message = canalsimpleconnector.getwithoutack(batchsize);
  long id = message.getid();
  int size = message.getentries().size();
  log.info("当前监控到binlog消息数量{}", size);
  if (id == -1 || size == 0) {
   try {
    // 等待2秒
    thread.sleep(2000);
   } catch (interruptedexception e) {
    e.printstacktrace();
   }
  } else {
   //1. 解析message对象
   list<canalentry.entry> entries = message.getentries();
   list<canaldataparser.twotuple<canalentry.eventtype, map>> rows = canaldataparser.printentry(entries);
   for (canaldataparser.twotuple<canalentry.eventtype, map> tuple : rows) {
    if(tuple.eventtype == canalentry.eventtype.insert) {
     student student = createstudent(tuple);
     // 2。将解析出的对象同步到elasticsearch中
     elasticutils.savees(student, "student_index");
     // 3.消息确认已处理
//     canalsimpleconnector.ack(id);
     canalhaconnector.ack(id);
    }
    if(tuple.eventtype == canalentry.eventtype.update){
     student student = createstudent(tuple);
     elasticutils.updatees(student, "student_index");
     // 3.消息确认已处理
//     canalsimpleconnector.ack(id);
     canalhaconnector.ack(id);
    }
    if(tuple.eventtype == canalentry.eventtype.delete){
     elasticutils.deletees("student_index", tuple.columnmap.get("id").tostring());
     canalhaconnector.ack(id);
    }
   }
  }
 }
}
/**
 * 封装数据至student
 * @param tuple
 * @return
 */
private student createstudent(canaldataparser.twotuple<canalentry.eventtype, map> tuple){
 student student = new student();
 student.setid(tuple.columnmap.get("id").tostring());
 student.setage(integer.parseint(tuple.columnmap.get("age").tostring()));
 student.setname(tuple.columnmap.get("name").tostring());
 student.setsex(tuple.columnmap.get("sex").tostring());
 student.setcity(tuple.columnmap.get("city").tostring());
 return student;
}
/**
 * 打开canal连接
 *
 * @param canalconnector
 */
private void opencanalconnector(canalconnector canalconnector) {
 //连接canalserver
 canalconnector.connect();
 // 订阅destination
 canalconnector.subscribe();
}
/**
 * 关闭canal连接
 *
 * @param canalconnector
 */
private void closecanalconnector(canalconnector canalconnector) {
 //关闭连接canalserver
 canalconnector.disconnect();
 // 注销订阅destination
 canalconnector.unsubscribe();
}
}

canaldemoapplication.java(spring boot启动类)

package com.example.canal.study;
import com.example.canal.study.action.binlogelasticsearch;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.applicationarguments;
import org.springframework.boot.applicationrunner;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
/**
* @author haha
*/
@springbootapplication
public class canaldemoapplication implements applicationrunner {
@autowired
private binlogelasticsearch binlogelasticsearch;
public static void main(string[] args) {
 springapplication.run(canaldemoapplication.class, args);
}
// 程序启动则执行run方法
@override
public void run(applicationarguments args) throws exception {
 binlogelasticsearch.binlogtoelasticsearch();
}
}

application.properties

server.port=8081
spring.application.name = canal-demo
canal.server.ip = 192.168.124.5
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = 192.168.124.5:2181
zookeeper.sasl.client = false
elasticsearch.server.ip = 192.168.124.5
elasticsearch.server.port = 9200

canal集群高可用的搭建

通过上面的学习,我们知道了单机直连方式的canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么canala的多实例集群方式如何搭建呢!

基于zookeeper获取canal实例

准备zookeeper的docker镜像与容器:

docker pull zookeeper
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server

1、机器准备:

  • 运行canal的容器ip: 172.18.0.4 , 172.18.0.8

  • zookeeper容器ip:172.18.0.3:2181

  • mysql容器ip:172.18.0.6:3306

2、按照部署和配置,在单台机器上各自完成配置,演示时instance name为example。

3、修改canal.properties,加上zookeeper配置并修改canal端口:

canal.port=11113
canal.zkservers=172.18.0.3:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

4、创建example目录,并修改instance.properties:

canal.instance.mysql.slaveid = 1235 
#之前的canal slaveid是1234,保证slaveid不重复即可
canal.instance.master.address = 172.18.0.6:3306

注意: 两台机器上的instance目录的名字需要保证完全一致,ha模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。

启动两个不同容器的canal,启动后,可以通过tail -100f logs/example/example.log查看启动日志,只会看到一台机器上出现了启动成功的日志。

比如我这里启动成功的是 172.18.0.4:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

查看一下zookeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111:

[zk: localhost:2181(connected) 15] get /otter/canal/destinations/example/running 
{"active":true,"address":"172.18.0.4:11111","cid":1}

客户端链接, 消费数据

可以通过指定zookeeper地址和canal的instance name,canal client会自动从zookeeper中的running节点获取当前服务的工作节点,然后与其建立链接:

[zk: localhost:2181(connected) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1}

对应的客户端编码可以使用如下形式,上文中的canalconfig.java中的canalhaconnector就是一个ha连接:

canalconnector connector = canalconnectors.newclusterconnector("172.18.0.3:2181", "example", "", "");

链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,链接的端口信息等(聪明的你,应该也可以发现,canal client也可以支持ha功能):

[zk: localhost:2181(connected) 4] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.124.5:59887","clientid":1001}

数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点(下次你重启client时,会从这最后一个位点继续进行消费):

[zk: localhost:2181(connected) 5] get /otter/canal/destinations/example/1001/cursor

{"@type":"com.alibaba.otter.canal.protocol.position.logposition","identity":{"slaveid":-1,"sourceaddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalname":"binlog.000004","position":2169,"timestamp":1562672817000}}

停止正在工作的172.18.0.4的canal server:

docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh

这时172.18.0.8会立马启动example instance,提供新的数据服务:

[zk: localhost:2181(connected) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.8:11111","cid":1}

与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成。

异常与总结

elasticsearch-head无法访问elasticsearch

es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。所以我们需要修改es的配置文件,增加一些配置项来解决这个问题,如下:

[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml 
# 文件末尾加上如下配置
http.cors.enabled: true
http.cors.allow-origin: "*"

修改完配置文件后需重启es服务。

elasticsearch-head查询报406 not acceptable

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

解决方法:

1、进入head安装目录;

2、cd _site/

3、编辑vendor.js 共有两处

#6886行 contenttype: "application/x-www-form-urlencoded
改成 contenttype: "application/json;charset=utf-8"
 #7574行 var inspectdata = s.contenttype === "application/x-www-form-urlencoded" &&
改成 var inspectdata = s.contenttype === "application/json;charset=utf-8" &&

使用elasticsearch-rest-high-level-clientorg.elasticsearch.action.index.indexrequest.ifseqno

#pom中除了加入依赖
<dependency>
<groupid>org.elasticsearch.client</groupid>
<artifactid>elasticsearch-rest-high-level-client</artifactid>
<version>7.1.1</version>
</dependency>
#还需加入
<dependency>
<groupid>org.elasticsearch</groupid>
<artifactid>elasticsearch</artifactid>
<version>7.1.1</version>
</dependency>

相关参考: 。

为什么elasticsearch要在7.x版本不能使用type?

参考: 为什么elasticsearch要在7.x版本去掉type?

使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.nonodeavailableexception

由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方transportclient,而es官方计划放弃transportclient,工具以es官方推荐的resthighlevelclient进行调用请求。 可参考 resthighlevelclient api 。

设置docker容器开启启动

如果创建时未指定 --restart=always ,可通过update 命令
docker update --restart=always [containerid]

docker for mac network host模式不生效

host模式是为了性能,但是这却对docker的隔离性造成了破坏,导致安全性降低。 在性能场景下,可以用--netwokr host开启host模式,但需要注意的是,如果你用windows或mac本地启动容器的话,会遇到host模式失效的问题。原因是host模式只支持linux宿主机。

参见官方文档:    。

客户端连接zookeeper报authenticate using sasl(unknow error)

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

  • zookeeper.jar tidak konsisten dengan versi zookeeper dalam dokcer

  • zookeeper.jar menggunakan versi sebelum 3.4.6 Versi

Ralat ini bermakna penjaga zoo, sebagai aplikasi luaran, perlu memohon sumber daripada sistem Apabila memohon sumber, ia perlu disahkan, dan sasl ialah kaedah pengesahan . Kami perlu mencari cara untuk memintasnya. Elakkan menunggu dan meningkatkan kecekapan.

Tambah system.setproperty("zookeeper.sasl.client", "false");, pada kod projek Jika ia adalah projek but spring, anda boleh menambah application.properties pada zookeeper.sasl.client=false.

Rujukan: peningkatan penggunaan cpu oleh semakan sasl yang tidak perlu.

Jika anda menukar versi zookeeper.jar yang bergantung kepada canal.client.jar

Muat turun kod sumber rasmi terusan ke klon git tempatan, dan kemudian ubah suai fail pom.xml di bawah modul pelanggan kandungan zookeeper, dan kemudian pasang semula mvn:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Ganti pakej yang bergantung kepada projek anda dengan pakej yang baru mvn install dihasilkan:

Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal

Atas ialah kandungan terperinci Bagaimana untuk melaksanakan fungsi penghantaran data tambahan masa nyata MySQL berdasarkan Docker dan Canal. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam