Heim  >  Artikel  >  Datenbank  >  Detaillierte Beispiele für Datenasymmetrie zwischen MySQL und Elasticsearch

Detaillierte Beispiele für Datenasymmetrie zwischen MySQL und Elasticsearch

小云云
小云云Original
2017-12-22 13:42:122378Durchsuche

jdbc-input-plugin kann nur das Anhängen von Datenbanken und inkrementelles Schreiben für Elasticsearch implementieren, aber häufig führt die Datenbank auf der JDBC-Quellenseite möglicherweise Datenbanklösch- oder -aktualisierungsvorgänge aus. Dadurch besteht eine Asymmetrie zwischen der Datenbank und der Suchmaschinendatenbank. In diesem Artikel werden hauptsächlich relevante Informationen zu Lösungen für das inkrementelle Schreiben von Elasticsearch vorgestellt. Die Datenbank auf der JDBC-Quelle führt häufig Datenbanklösch- oder Aktualisierungsvorgänge durch. Hier finden Sie Lösungen Als nächstes hoffe ich, dass es allen helfen kann.

Wenn Sie ein Entwicklungsteam haben, können Sie natürlich ein Programm schreiben, um Suchmaschinenvorgänge beim Löschen oder Aktualisieren zu synchronisieren. Wenn Sie nicht über diese Fähigkeit verfügen, können Sie die folgende Methode ausprobieren.

Hier gibt es einen Datentabellenartikel. Das mtime-Feld definiert ON UPDATE CURRENT_TIMESTAMP, sodass sich der Zeitpunkt der Aktualisierung von mtime jedes Mal ändert


mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field    | Type     | Null | Key | Default            | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id     | int(11)   | NO  |   | 0               |    |
| title    | mediumtext  | NO  |   | NULL              |    |
| description | mediumtext  | YES |   | NULL              |    |
| author   | varchar(100) | YES |   | NULL              |    |
| source   | varchar(100) | YES |   | NULL              |    |
| content   | longtext   | YES |   | NULL              |    |
| status   | enum('Y','N')| NO  |   | 'N'              |    |
| ctime    | timestamp  | NO  |   | CURRENT_TIMESTAMP       |    |
| mtime    | timestamp  | YES |   | ON UPDATE CURRENT_TIMESTAMP  |    |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash erhöht mtime Die Abfrageregel


jdbc {
  jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
  jdbc_user => "cms"
  jdbc_password => "password"
  schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
  statement => "select * from article where mtime > :sql_last_value"
  use_column_value => true
  tracking_column => "mtime"
  tracking_column_type => "timestamp" 
  record_last_run => true
  last_run_metadata_path => "/var/tmp/article-mtime.last"
 }

erstellt eine Papierkorbtabelle, die zur Lösung des Problems der Datenbanklöschung oder des Deaktivierungsstatus = 'N' verwendet wird.


CREATE TABLE `elasticsearch_trash` (
 `id` int(11) NOT NULL,
 `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

Erstellen Sie einen Trigger für die Artikeltabelle


CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
 -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
 IF NEW.status = 'N' THEN
 insert into elasticsearch_trash(id) values(OLD.id);
 END IF;
 -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
  IF NEW.status = 'Y' THEN
 delete from elasticsearch_trash where id = OLD.id;
 END IF;
END

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
 -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
 insert into elasticsearch_trash(id) values(OLD.id);
END

Als nächstes müssen wir einen einfachen Auslöser schreiben Die Shell wird einmal pro Minute ausgeführt, ruft Daten aus der Datentabelle „Elasticsearch_trash“ ab und ruft dann mit dem Befehl „curl“ die Elasticsearch-Restful-Schnittstelle auf, um die wiederhergestellten Daten zu löschen.

Sie können auch verwandte Programme entwickeln. Hier ist ein Beispiel für eine geplante Spring-Boot-Aufgabe.

Entität


package cn.netkiller.api.domain.elasticsearch;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table
public class ElasticsearchTrash {
 @Id
 private int id;

 @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
 private Date ctime;

 public int getId() {
 return id;
 }

 public void setId(int id) {
 this.id = id;
 }

 public Date getCtime() {
 return ctime;
 }

 public void setCtime(Date ctime) {
 this.ctime = ctime;
 }

}

Lager


package cn.netkiller.api.repository.elasticsearch;

import org.springframework.data.repository.CrudRepository;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{


}

Zeitgesteuerte Aufgabe


package cn.netkiller.api.schedule;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

@Component
public class ScheduledTasks {
 private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);

 @Autowired
 private TransportClient client;

 @Autowired
 private ElasticsearchTrashRepository alasticsearchTrashRepository;

 public ScheduledTasks() {
 }

 @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
 public void cleanTrash() {
 for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
  DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
  RestStatus status = response.status();
  logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
  if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
  alasticsearchTrashRepository.delete(elasticsearchTrash);
  }
 }
 }
}

Spring Boot startet das Hauptprogramm.


package cn.netkiller.api;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

 public static void main(String[] args) {
 SpringApplication.run(Application.class, args);
 }
}

Verwandte Empfehlungen:

Was ist Elasticsearch? Wo kann Elasticsearch eingesetzt werden?

Beispiel-Tutorial für Elasticsearch-Index und Dokumentbetrieb

Detaillierte Erläuterung des Beispiel-Tutorials zur Verwendung von Elasticsearch im Frühjahr

Das obige ist der detaillierte Inhalt vonDetaillierte Beispiele für Datenasymmetrie zwischen MySQL und Elasticsearch. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn