Rumah > Artikel > pembangunan bahagian belakang > PHP melaksanakan pemprosesan data masa nyata Kafka Stream sumber terbuka
Kafka Stream, sebagai enjin pengkomputeran strim, boleh memproses data masa nyata dengan cepat dan menyediakan keupayaan pemprosesan yang diedarkan di luar kotak. Sebagai bahasa pembangunan yang popular, PHP juga boleh menggunakan ciri bahasa yang baik dan perpustakaan sambungan untuk melaksanakan pemprosesan data Kafka Stream.
Artikel ini akan memperkenalkan cara menggunakan PHP untuk membangunkan pemprosesan data masa nyata Kafka Stream, dan menggunakan contoh untuk menunjukkan cara menggunakan PHP untuk menganalisis data masa nyata yang dijana oleh mod pemerhati.
Kafka Stream ialah enjin pengkomputeran aliran yang pantas dan stabil yang boleh memproses data masa nyata dengan pasti dan menyediakan pengedaran di luar kotak kuasa pemprosesan. Kafka Stream ialah kaedah pemprosesan data yang cekap dan fleksibel dengan menggunakan mesej dalam topik Kafka, menghantarnya ke aplikasi untuk diproses, dan kemudian menghantar hasil yang diproses kembali ke topik Kafka.
Dalam PHP, melalui perpustakaan Kafka-PHP yang disediakan secara rasmi oleh Kafka Stream, kami boleh dengan mudah mengintegrasikan aplikasi PHP dengan Kafka Stream Untuk menyepadukan . Berikut ialah versi Kafka Stream yang disokong oleh pustaka Kafka-PHP:
Kafka-PH perpustakaan menyediakan ciri teras berikut:
Selain itu, perpustakaan Kafka-PHP juga menyediakan sokongan untuk sambungan Swoole PHP Prestasi aplikasi PHP boleh dipertingkatkan lagi dengan menggunakan sambungan Swoole.
Corak Pemerhati ialah corak reka bentuk tingkah laku yang mentakrifkan hubungan pergantungan satu-ke-banyak antara objek Apabila keadaannya berubah, semua objek yang bergantung padanya diberitahu dan dikemas kini secara automatik. Corak pemerhati digunakan secara meluas dalam pemantauan acara, pengaturcaraan UI dan bidang lain, dan boleh mencapai penghantaran dan pemprosesan mesej yang cekap.
Yang berikut akan menggunakan kod sampel untuk menunjukkan cara menggunakan PHP untuk membangunkan pemprosesan data masa nyata Kafka Stream dan memohon mod pemerhati untuk analisis data.
4.1 Melaksanakan penerbit Kafka
Pertama, kita perlu mencipta penerbit untuk menghantar mesej kepada topik Kafka. Berikut ialah kod contoh pengeluar Kafka yang ringkas:
<?php require_once __DIR__ . '/vendor/autoload.php'; use RdKafkaConf; use RdKafkaProducer; use RdKafkaProducerTopic; $conf = new Conf(); $conf->set('metadata.broker.list', 'kafka:9092'); $producer = new Producer($conf); $topic = $producer->newTopic('topic1'); for ($i = 0; $i < 10; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i"); } ?>
Dalam kod di atas, kami menggunakan kelas Producer yang disediakan oleh pustaka sambungan RdKafka untuk melaksanakan pengeluar Kafka dan menghantar mesej kepada Kafka bernama 'topic1' dalam topik. Apabila melaksanakan pengeluar Kafka, kita perlu memberi perhatian untuk menyediakan konfigurasi sambungan gugusan Kafka untuk memastikan bahawa gugusan Kafka boleh disambungkan dengan betul.
4.2 Melaksanakan pengguna Kafka
Seterusnya, kita perlu mencipta pengguna Kafka untuk menggunakan data daripada topik Kafka. Berikut ialah kod sampel pengguna Kafka yang ringkas:
<?php require_once __DIR__ . '/vendor/autoload.php'; use RdKafkaConf; use RdKafkaConsumer; use RdKafkaTopicPartition; $conf = new Conf(); $conf->set('metadata.broker.list', 'kafka:9092'); $consumer = new Consumer($conf); $consumer->addBrokers('kafka:9092'); $topic = $consumer->newTopic('topic1'); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 1000); if ($message === null) { continue; } if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) { echo "Received message: {$message->payload} "; } } $consumer->close(); ?>
Dalam kod di atas, kami menggunakan kelas Pengguna yang disediakan oleh perpustakaan sambungan RdKafka untuk melaksanakan pengguna Kafka, menggunakan daripada topik Kafka bernama data 'topic1' dan mencetak data ke konsol. Ambil perhatian bahawa apabila melaksanakan pengguna Kafka, kita perlu menetapkan topik penggunaan dan offset untuk memulakan penggunaan.
4.3 Melaksanakan Corak Pemerhati
Kini kita boleh menggunakan data daripada topik Kafka, tetapi bagaimana untuk menggunakan corak pemerhati untuk menganalisis data? Berikut ialah kod contoh corak pemerhati mudah:
<?php require_once __DIR__ . '/vendor/autoload.php'; use SplObserver; use SplSubject; class Producer implements SplSubject { private array $observers = []; public function attach(SplObserver $observer):void { array_push($this->observers, $observer); } public function detach(SplObserver $observer):void { if (($key = array_search($observer, $this->observers, true)) !== false) { array_splice($this->observers, $key, 1); } } public function notify():void { foreach ($this->observers as $observer) { $observer->update($this); } } public function produce(string $message):void { echo "Producing message: {$message} "; $this->notify(); } } class Consumer implements SplObserver { public function update(SplSubject $subject):void { echo "Consuming message: {$subject} "; } } $producer = new Producer(); $producer->attach(new Consumer()); $producer->produce('Message 1'); ?>
Dalam kod di atas, kami mentakrifkan kelas utama bernama Producer, melaksanakan antara muka SplSubject dan menyediakan kaedah pengurusan pemerhati yang dilampirkan, tanggalkan, maklumkan dan hasilkan. Kami juga menentukan kelas pemerhati bernama Pengguna, melaksanakan antara muka SplObserver dan menyediakan kaedah kemas kini untuk memproses mesej. Akhir sekali, kami mencipta tika Pengeluar dan melampirkan tika Pengguna sebagai pemerhati, melaksanakan kaedah hasil sekali dan mencetuskan kaedah kemas kini Pengguna.
4.4 Laksanakan pemprosesan data mod pemerhati Kafka Stream
Akhir sekali, kami menggabungkan kod dalam tiga langkah sebelumnya untuk melaksanakan pemprosesan data mod pemerhati Kafka Stream. Berikut ialah kod sampel pemprosesan data Kafka Stream yang mudah:
<?php require_once __DIR__ . '/vendor/autoload.php'; use RdKafkaConf; use RdKafkaConsumer; use RdKafkaProducer; use RdKafkaTopicPartition; use SplSubject; use SplObserver; class KafkaStream implements SplSubject { private array $observers; private Conf $conf; private Producer $producer; private Consumer $consumer; public function __construct(string $bootstrap_servers) { $this->conf = new Conf(); $this->conf->set('metadata.broker.list', $bootstrap_servers); $this->producer = new Producer($this->conf); $this->consumer = new Consumer($this->conf); $this->observers = []; } public function attach(SplObserver $observer):void { array_push($this->observers, $observer); } public function detach(SplObserver $observer):void { if (($key = array_search($observer, $this->observers, true)) !== false) { array_splice($this->observers, $key, 1); } } public function notify():void { foreach ($this->observers as $observer) { $observer->update($this); } } public function produce(string $message, string $topic):void { echo "Producing message: {$message} "; $this->producer->newTopic($topic)->produce(RD_KAFKA_PARTITION_UA, 0, $message); $this->notify(); } public function consume(string $topic):void { $topic_partition = new TopicPartition($topic, 0); $this->consumer->assign([$topic_partition]); $this->consumer->seek($topic_partition, 0); while (true) { $message = $this->consumer->consume(0, 1000); if ($message === null) { continue; } if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) { echo "Error: {$message->errstr()}, exiting. "; break; } echo "Consuming message: {$message->payload} "; } $this->consumer->close(); } } class Consumer implements SplObserver { public function update(SplSubject $subject):void { echo "Processing message: {$subject} "; } } $bootstrap_servers = 'kafka:9092'; $kafka_stream = new KafkaStream($bootstrap_servers); $kafka_stream->attach(new Consumer()); $kafka_stream->produce('Message 1', 'topic1'); $kafka_stream->consume('topic1'); ?>
Dalam kod di atas, kami mentakrifkan kelas bernama KafkaStream, melaksanakan antara muka SplSubject dan menyediakan kaedah teras pemprosesan Kafka Stream menghasilkan dan menggunakan, serta sebagai kaedah pengurusan pemerhati lampirkan, tanggalkan dan maklumkan. Kami juga menentukan kelas pemerhati bernama Pengguna, melaksanakan antara muka SplObserver dan menyediakan kaedah kemas kini untuk memproses mesej. Akhir sekali, kami mencipta tika KafkaStream dan melampirkan tika Pengguna sebagai pemerhati, melaksanakan kaedah hasil sekali untuk menghasilkan mesej, dan menggunakan dan memproses mesej dalam kaedah penggunaan.
Artikel ini memperkenalkan cara menggunakan PHP untuk membangunkan pemprosesan data masa nyata Kafka Stream dan menunjukkan cara menggunakan corak pemerhati untuk menganalisis data masa nyata. Kafka Stream dan corak Pemerhati ialah gabungan alatan yang berkuasa yang boleh membantu kami memproses data masa nyata berskala besar dengan cepat dan mencapai penghantaran dan pemprosesan mesej yang cekap.
Atas ialah kandungan terperinci PHP melaksanakan pemprosesan data masa nyata Kafka Stream sumber terbuka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!