Heim >Backend-Entwicklung >PHP-Tutorial >PHP implementiert die Open-Source-Pulsar-Echtzeit-Datenverarbeitung

PHP implementiert die Open-Source-Pulsar-Echtzeit-Datenverarbeitung

PHPz
PHPzOriginal
2023-06-18 09:08:201887Durchsuche

Mit der Entwicklung der Internet-Technologie und dem explosionsartigen Wachstum des Datenvolumens ist die Verarbeitung riesiger Datenmengen zu einem der Probleme geworden, denen sich Internetunternehmen heute stellen müssen. Herkömmliche Datenverarbeitungslösungen, insbesondere Stapelverarbeitungslösungen, können den Anforderungen an Echtzeit und Hochverfügbarkeit nicht mehr gerecht werden. Derzeit ist die Echtzeit-Datenverarbeitung zu einer der besten Lösungen geworden. Als Entwickler müssen wir auch auf den eleganten und effizienten Umgang mit großen Datenmengen achten.

Pulsar ist ein Echtzeit-Datenverarbeitungs-Framework von Yahoo. Es nutzt eine mehrschichtige Architektur, um die Datenverarbeitung effizienter und skalierbarer zu gestalten. Es unterstützt mehrere Clientsprachen, darunter Java, Python, Ruby und PHP. Als sehr beliebte Sprache verfügt PHP über eine einfache Syntax und einen geringen Lernaufwand. Sie ist für viele Unternehmen zu einer der bevorzugten Sprachen für die Entwicklung von Echtzeit-Datenverarbeitungsanwendungen geworden. In diesem Artikel wird erläutert, wie Sie mit PHP die Echtzeit-Datenverarbeitung von Open-Source-Pulsar implementieren.

Vorbereitung

Bevor Sie Pulsar verwenden, müssen Sie Pulsar zunächst herunterladen und installieren. Relevante Softwarepakete und Dokumentationen können von der offiziellen Website von Pulsar bezogen und auf dem lokalen Computer oder auf einem Knoten im Cluster für lokale Entwicklung und Tests installiert werden.

Während des PHP-Entwicklungsprozesses müssen Sie das Client-SDK pulsar-client-php verwenden. Es kann über Tools wie Composer installiert werden. Der spezifische Prozess ist wie folgt:

// 安装pulsar-client-php
composer require apache/pulsar

Nach Abschluss der Installation folgt die grundlegende Konfiguration für die Verwendung von Pulsar.

use ApachePulsarAuthenticationAuthenticationFactory;
use ApachePulsarClientBuilder;
use ApachePulsarProducerConfiguration;
use ApachePulsarSerializationSerialization;

// 配置生产者的信息
$clientBuilder = new ClientBuilder();
$clientBuilder->setServiceUrl('pulsar://localhost:6650');
$clientBuilder->setAuthentication(
    AuthenticationFactory::token('your-token-string')
);

$producerConf = new ProducerConfiguration();
$producerConf->setTopic('your-topic-name');
$producerConf->setSendTimeout(3000);
$producerConf->setSerialization(Serialization::JSON);

// 创建生产者实例
$producer = $clientBuilder->build()->createProducer($producerConf);
$producer->send('your message');

Im obigen Code erstellen wir zunächst den Pulsar-Produzenten über die Klasse ClientBuilder. Beim Erstellen eines Produzenten müssen wir die Methode setServiceUrl festlegen, um die URL des Pulsar-Dienstes anzugeben, und die Methode setAuthentication, um die Authentifizierung durchzuführen. Darüber hinaus müssen Sie die Konfigurationsinformationen des Herstellers festlegen, z. B. Thema, Zeitüberschreitung usw. ClientBuilder类来创建Pulsar的生产者。在创建生产者的时候,我们需要设置setServiceUrl方法来指定Pulsar Service的URL,setAuthentication方法来进行身份验证。另外需要设置生产者的配置信息,如话题、超时等。

Pulsar的使用

Pulsar提供了Producer和Consumer两种基本的组件实现实时数据处理。Producer用于将数据发送到指定的Pulsar topic,而Consumer则从topic中消费数据。下面我们将详细介绍如何使用这两种组件来完成实时数据处理。

Producer

首先,我们通过以下步骤来创建一个Producer实例:

// 导入命名空间
use ApachePulsarClientBuilder;

// 创建Pulsar client实例
$clientBuilder = new ClientBuilder();
$client = $clientBuilder->serviceUrl('pulsar://localhost:6650')->build();

// 创建Producer对象
$producer = $client->createProducer(
    [
        'topic' => 'your-topic',
    ]
);

在创建生产者时,需要设置生产者所属的Pulsar topic。此外,还有其他可选项,如“producerName”、“initialSequenceId”、“sendTimeout”等。这些选项可以根据需要进行配置。

下面我们来看一下如何向Pulsar topic发送消息:

// 对Pulsar topic发送消息
$result = $producer->send('your-message');

send方法返回一个MessageId对象。如果消息之前已经发送过,则返回对应的MessageId。如果消息发送失败,则抛出PulsarClientException异常。

Consumer

与生产者一样,Pulsar Consumer的创建也是分为多个步骤。

// 导入命名空间
use ApachePulsarClientBuilder;

// 创建Pulsar client实例
$clientBuilder = new ClientBuilder();
$client = $clientBuilder->serviceUrl('pulsar://localhost:6650')->build();

// 创建Consumer对象
$consumer = $client->subscribe(
    [
        'topic' => 'your-topic',
        'subscriptionName' => 'your-subscription-name',
    ]
);

在创建Consumer时,我们需要设置订阅的Pulsar topic和订阅名称。另外有其他可选项,如设置“receiverQueueSize”、“ackTimeout”、“subscriptionType”等。

下面我们将看到如何从指定的Pulsar topic中获取消息:

// 从topic中消费消息
$message = $consumer->receive();

// 对消息进行处理
echo 'Received message with ID: ' . $message->getMessageId() . PHP_EOL;

// markAsReceived表示通知Pulsar这条消息已经被处理
$consumer->acknowledge($message);

在调用receive()方法时,程序会保持等待状态,直到有消息从指定的Pulsar topic中返回。当有消息返回时,程序会继续执行,对消息进行处理。

调用acknowledge()方法后,Pulsar才会将消息从该订阅的队列中删除。如果没有调用acknowledge()

Verwendung von Pulsar

Pulsar bietet zwei grundlegende Komponenten, Produzent und Verbraucher, um eine Echtzeit-Datenverarbeitung zu erreichen. Der Produzent wird verwendet, um Daten an das angegebene Pulsar-Thema zu senden, während der Verbraucher Daten aus dem Thema verbraucht. Im Folgenden stellen wir detailliert vor, wie diese beiden Komponenten verwendet werden, um die Datenverarbeitung in Echtzeit abzuschließen.

Produzent

Zuerst erstellen wir eine Produzenteninstanz durch die folgenden Schritte:

rrreee

Beim Erstellen eines Produzenten müssen Sie das Pulsar-Thema festlegen, zu dem der Produzent gehört. Darüber hinaus gibt es weitere Optionen wie „producerName“, „initialSequenceId“, „sendTimeout“ usw. Diese Optionen können nach Bedarf konfiguriert werden.

Sehen wir uns an, wie man eine Nachricht an das Pulsar-Thema sendet: 🎜rrreee🎜Die Sendemethode gibt ein MessageId-Objekt zurück. Wenn die Nachricht schon einmal gesendet wurde, wird die entsprechende MessageId zurückgegeben. Wenn die Nachricht nicht gesendet werden kann, wird eine PulsarClientException-Ausnahme ausgelöst. 🎜

Verbraucher

🎜Wie beim Produzenten ist die Erstellung des Pulsar-Verbrauchers in mehrere Schritte unterteilt. 🎜rrreee🎜Beim Erstellen eines Verbrauchers müssen wir das abonnierte Pulsar-Thema und den Abonnementnamen festlegen. Es gibt weitere Optionen, z. B. das Festlegen von „receiverQueueSize“, „ackTimeout“, „subscriptionType“ usw. 🎜🎜 Unten sehen wir, wie man Nachrichten vom angegebenen Pulsar-Thema erhält: 🎜rrreee🎜 Beim Aufruf der Methode receive() bleibt das Programm im Wartezustand, bis eine Nachricht vom angegebenen Thema eingeht Rückkehr zum Pulsar-Thema. Wenn eine Nachricht zurückgegeben wird, führt das Programm die Nachricht weiter aus und verarbeitet sie. 🎜🎜Pulsar löscht die Nachricht erst aus der Warteschlange des Abonnements, nachdem die Methode acknowledge() aufgerufen wurde. Wenn die Methode acknowledge() nicht aufgerufen wird, bleibt die Nachricht in der Warteschlange, bis die Nachricht abläuft (Standard ist 1 Stunde). 🎜🎜Zusammenfassung🎜🎜In diesem Artikel haben wir vorgestellt, wie man mit PHP die Echtzeit-Datenverarbeitung des Open-Source-Pulsar implementiert. Wir begannen mit der Einrichtung der Pulsar-Umgebung und beschrieben Schritt für Schritt, wie man die Producer- und Consumer-Komponenten von Pulsar nutzt, um eine Echtzeit-Datenverarbeitung zu implementieren. 🎜🎜Pulsar verwendet eine mehrschichtige Architektur und kann eine groß angelegte Echtzeit-Datenverarbeitung gut unterstützen. Derzeit wird Pulsar von vielen Internetunternehmen wie Alibaba, Meituan, Baidu usw. verwendet. 🎜🎜Wir glauben, dass Sie durch das Studium der in diesem Artikel vorgestellten Inhalte bereits verstehen können, wie Sie PHP und Pulsar nutzen können, um die Datenverarbeitung in Echtzeit effizienter und eleganter zu gestalten. 🎜

Das obige ist der detaillierte Inhalt vonPHP implementiert die Open-Source-Pulsar-Echtzeit-Datenverarbeitung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn