Maison > Article > base de données > Exemples détaillés d'asymétrie de données entre MySQL et Elasticsearch
jdbc-input-plugin ne peut implémenter que l'ajout de base de données et l'écriture incrémentielle pour elasticsearch, mais souvent la base de données au niveau de la source jdbc peut effectuer des opérations de suppression ou de mise à jour de base de données. En conséquence, il existe une asymétrie entre la base de données et la base de données du moteur de recherche. Cet article présente principalement des informations pertinentes sur les solutions au problème d'asymétrie des données entre MySQL et Elasticsearch. Pour l'écriture incrémentielle d'Elasticsearch, la base de données au niveau de la source jdbc peut souvent effectuer des opérations de suppression ou de mise à jour de la base de données. Les solutions sont fournies ici. eux. Ensuite, j’espère que cela pourra aider tout le monde.
Bien sûr, si vous disposez d'une équipe de développement, vous pouvez écrire un programme pour synchroniser les opérations du moteur de recherche lors de la suppression ou de la mise à jour. Si vous ne disposez pas de cette capacité, vous pouvez essayer la méthode suivante.
Il y a un article sur la table de données ici, le champ mtime définit ON UPDATE CURRENT_TIMESTAMP donc l'heure de mise à jour de mtime changera à chaque fois
mysql> desc article; +-------------+--------------+------+-----+--------------------------------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | content | longtext | YES | | NULL | | | status | enum('Y','N')| NO | | 'N' | | | ctime | timestamp | NO | | CURRENT_TIMESTAMP | | | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)
logstash augmente mtime La règle de requête
jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次 statement => "select * from article where mtime > :sql_last_value" use_column_value => true tracking_column => "mtime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-mtime.last" }
crée une table de corbeille, qui est utilisée pour résoudre le problème de suppression de base de données ou de désactivation de l'état = 'N'.
CREATE TABLE `elasticsearch_trash` ( `id` int(11) NOT NULL, `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
Créer un déclencheur pour la table d'articles
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。 IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。 IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF; END CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。 insert into elasticsearch_trash(id) values(OLD.id); END
Ensuite, nous devons écrire un simple Le Shell s'exécute une fois par minute, récupère les données de la table de données elasticsearch_trash, puis utilise la commande curl pour appeler l'interface restful elasticsearch afin de supprimer les données récupérées.
Vous pouvez également développer des programmes associés. Voici un exemple de tâche planifiée au démarrage de Spring.
Entité
package cn.netkiller.api.domain.elasticsearch; import java.util.Date; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table public class ElasticsearchTrash { @Id private int id; @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP") private Date ctime; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getCtime() { return ctime; } public void setCtime(Date ctime) { this.ctime = ctime; } }
Entrepôt
package cn.netkiller.api.repository.elasticsearch; import org.springframework.data.repository.CrudRepository; import com.example.api.domain.elasticsearch.ElasticsearchTrash; public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{ }
Tâche chronométrée
package cn.netkiller.api.schedule; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.example.api.domain.elasticsearch.ElasticsearchTrash; import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Component public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); @Autowired private TransportClient client; @Autowired private ElasticsearchTrashRepository alasticsearchTrashRepository; public ScheduledTasks() { } @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务 public void cleanTrash() { for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get(); RestStatus status = response.status(); logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString()); if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) { alasticsearchTrashRepository.delete(elasticsearchTrash); } } } }
Spring Boot démarre le programme principal.
package cn.netkiller.api; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Recommandations associées :
Qu'est-ce qu'Elasticsearch ? Où peut-on utiliser Elasticsearch ?
Tutoriel d'exemple d'indexation et d'opération de document Elasticsearch
Explication détaillée de l'exemple de tutoriel d'utilisation d'Elasticsearch au printemps
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!