最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:
<code> Watches broker state, if broker changes, the client will refresh broker and topic metadata stored in the client? </code>
请使用过该工具的赐教思路,不甚感激!
补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao
现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:
// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);
read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
<code> if ($readable > 0) { $remainingBytes = $len; $data = $chunk = ''; while ($remainingBytes > 0) { $chunk = fread($this->stream, $remainingBytes); if ($chunk === false) { $this->close(); throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)'); } if (strlen($chunk) === 0) { // Zero bytes because of EOF? if (feof($this->stream)) { $this->close(); throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)'); } // Otherwise wait for bytes $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); if ($readable !== 1) { throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go'); } continue; // attempt another read } $data .= $chunk; $remainingBytes -= strlen($chunk); }</code>
通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码
呼高手!
最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:
<code> Watches broker state, if broker changes, the client will refresh broker and topic metadata stored in the client? </code>
请使用过该工具的赐教思路,不甚感激!
补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao
现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:
// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);
read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
<code> if ($readable > 0) { $remainingBytes = $len; $data = $chunk = ''; while ($remainingBytes > 0) { $chunk = fread($this->stream, $remainingBytes); if ($chunk === false) { $this->close(); throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)'); } if (strlen($chunk) === 0) { // Zero bytes because of EOF? if (feof($this->stream)) { $this->close(); throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)'); } // Otherwise wait for bytes $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); if ($readable !== 1) { throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go'); } continue; // attempt another read } $data .= $chunk; $remainingBytes -= strlen($chunk); }</code>
通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码
呼高手!
consumer应该只能通过循环来判断broker是否有变化,然后更新说。