jdbc-input-plugin kann nur das Anhängen von Datenbanken und inkrementelles Schreiben für Elasticsearch implementieren, aber häufig führt die Datenbank auf der JDBC-Quellenseite möglicherweise Datenbanklösch- oder -aktualisierungsvorgänge aus. Dadurch besteht eine Asymmetrie zwischen der Datenbank und der Suchmaschinendatenbank. In diesem Artikel werden hauptsächlich relevante Informationen zu Lösungen für das inkrementelle Schreiben von Elasticsearch vorgestellt. Die Datenbank auf der JDBC-Quelle führt häufig Datenbanklösch- oder Aktualisierungsvorgänge durch. Hier finden Sie Lösungen Als nächstes hoffe ich, dass es allen helfen kann.
Wenn Sie ein Entwicklungsteam haben, können Sie natürlich ein Programm schreiben, um Suchmaschinenvorgänge beim Löschen oder Aktualisieren zu synchronisieren. Wenn Sie nicht über diese Fähigkeit verfügen, können Sie die folgende Methode ausprobieren.
Hier gibt es einen Datentabellenartikel. Das mtime-Feld definiert ON UPDATE CURRENT_TIMESTAMP, sodass sich der Zeitpunkt der Aktualisierung von mtime jedes Mal ändert
mysql> desc article; +-------------+--------------+------+-----+--------------------------------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | content | longtext | YES | | NULL | | | status | enum('Y','N')| NO | | 'N' | | | ctime | timestamp | NO | | CURRENT_TIMESTAMP | | | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)
logstash erhöht mtime Die Abfrageregel
jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次 statement => "select * from article where mtime > :sql_last_value" use_column_value => true tracking_column => "mtime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-mtime.last" }
erstellt eine Papierkorbtabelle, die zur Lösung des Problems der Datenbanklöschung oder des Deaktivierungsstatus = 'N' verwendet wird.
CREATE TABLE `elasticsearch_trash` ( `id` int(11) NOT NULL, `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
Erstellen Sie einen Trigger für die Artikeltabelle
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。 IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。 IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF; END CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。 insert into elasticsearch_trash(id) values(OLD.id); END
Als nächstes müssen wir einen einfachen Auslöser schreiben Die Shell wird einmal pro Minute ausgeführt, ruft Daten aus der Datentabelle „Elasticsearch_trash“ ab und ruft dann mit dem Befehl „curl“ die Elasticsearch-Restful-Schnittstelle auf, um die wiederhergestellten Daten zu löschen.
Sie können auch verwandte Programme entwickeln. Hier ist ein Beispiel für eine geplante Spring-Boot-Aufgabe.
Entität
package cn.netkiller.api.domain.elasticsearch; import java.util.Date; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table public class ElasticsearchTrash { @Id private int id; @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP") private Date ctime; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getCtime() { return ctime; } public void setCtime(Date ctime) { this.ctime = ctime; } }
Lager
package cn.netkiller.api.repository.elasticsearch; import org.springframework.data.repository.CrudRepository; import com.example.api.domain.elasticsearch.ElasticsearchTrash; public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{ }
Zeitgesteuerte Aufgabe
package cn.netkiller.api.schedule; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.example.api.domain.elasticsearch.ElasticsearchTrash; import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Component public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); @Autowired private TransportClient client; @Autowired private ElasticsearchTrashRepository alasticsearchTrashRepository; public ScheduledTasks() { } @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务 public void cleanTrash() { for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get(); RestStatus status = response.status(); logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString()); if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) { alasticsearchTrashRepository.delete(elasticsearchTrash); } } } }
Spring Boot startet das Hauptprogramm.
package cn.netkiller.api; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Verwandte Empfehlungen:
Was ist Elasticsearch? Wo kann Elasticsearch eingesetzt werden?
Beispiel-Tutorial für Elasticsearch-Index und Dokumentbetrieb
Detaillierte Erläuterung des Beispiel-Tutorials zur Verwendung von Elasticsearch im Frühjahr
Das obige ist der detaillierte Inhalt vonDetaillierte Beispiele für Datenasymmetrie zwischen MySQL und Elasticsearch. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!