server_ip = $server_ip; $this->server_port = $server_port; $this->job_path = $job_path; $this->pipes = array(); $this->processes = array(); $this->log_file = $log_file; $this->share_mem_file = $share_mem_file; } //初始化socket public function init(){ // 开始监听 $sock = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if (!$sock){ echo "socket_create() failed.\n"; exit; } if (!socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1)){ echo "socket_set_option() failed.\n"; exit; } //绑定端口和ip if (!($ret = @socket_bind($sock, $this->server_ip, $this->server_port))){ echo "socket_bind() failed.\n"; exit; } //等待监听 if (!($ret = @socket_listen($sock, 5))){ echo "socket_listen() failed.\n"; exit; } //设置 $this->sock = $sock; //设置共享内存 if(!file_exists($this->share_mem_file)){ echo "share memory file not exists.\n"; exit; }else{ include_once($this->share_mem_file); } //初始化共享内存 $this->share_mem = new share_memory('shm_key_of_server_'.$this->server_ip.'_'.$this->server_port); if(!$this->share_mem->attach()){ echo "shm attach() failed.\n"; exit; } } //开始监听 public function start(){ // 循环处理 while (TRUE){ // 等待连接 $this->server_echo("Waiting for new command..."); $this->connect = @socket_accept($this->sock); if (!$this->connect){ $this->server_echo("socket_accept() failed.\n"); socket_write($this->connect, "socket_accept() failed.\n"); break; } // 读取输入 if (!($input = @socket_read($this->connect, 1024))) { $this->server_echo("socket_read() failed.\n"); socket_write($this->connect, "socket_read() failed.\n"); break; } // 分析并执行命令 $input_arr = explode(' ', trim($input)); if (count($input_arr) > 1){ list($cmd, $params) = explode(' ', trim($input), 2); }else{ $cmd = $input; $params = ''; } //输出服务器内容 $this->server_echo(date('Y-m-d H:i:s e')."\n$cmd $params\n"); //遍历功能 switch ($cmd){ case 'STATUS': // 获取进程状态 $jobname = $params; $res = $this->backend_status($jobname); socket_write($this->connect, $res['msg']); break; case 'START': // 开启进程 $params = explode(' ', $params); $params_len = count($params); if ($params_len == 1){ // 没有输入程序路径 socket_write($this->connect, 'PARAMS FAILED'); break; } $jobname = array_shift($params); $job_file = array_shift($params); $script_cmd = $this->job_path . $job_file; $res = $this->backend_start($jobname, $script_cmd); socket_write($this->connect, $res['msg']); break; case 'STOP': // 结束进程 STOP NAME 0 list($jobname, $graceful) = explode(' ', $params); $res = $this->backend_stop($jobname, $graceful); socket_write($this->connect, $res['msg']); break; case 'SERVERMEM': // 读取服务器内存占用情况 $mem = $this->my_memory_get_usage(); socket_write($this->connect, $mem); break; case 'READ': $jobname = $params; $res= $this->share_mem_read($jobname); socket_write($this->connect, $res['msg']); break; case 'SERVERREAD': socket_write($this->connect, implode('', $this->server_output_buffer)); break; } } } // 获取运行当前脚本的PHP解析器路径 private function get_php_path(){ return readlink('/proc/'.getmypid().'/exe'); } // 强制结束进程 private function force_stop_process($jobname){ $this->stop_process($jobname, FALSE); } // 优雅结束进程 private function graceful_stop_process($jobname){ $this->stop_process($jobname, TRUE); } // 结束进程,并释放相关资源 private function stop_process($jobname, $graceful){ if (!$graceful) { // 强制结束proc_open打开的进程 $status = proc_get_status($this->processes[$jobname]); exec('kill -9 '.$status['pid'].' 2>/dev/null >&- >/dev/null'); } proc_terminate($this->processes[$jobname]); proc_close($this->processes[$jobname]); unset($this->processes[$jobname]); } // 查看进程状态 private function backend_status($jobname){ if (!isset($this->processes[$jobname])){ // 进程不存在 $this->server_echo("DOWN. (process $jobname does not exist.)\n"); return array('status' => false, 'msg' => 'DOWN'); } $status = proc_get_status($this->processes[$jobname]); if (!$status){ $this->force_stop_process($jobname); $this->server_echo("DOWN. (proc_get_status failed.)\n"); return array('status' => false, 'msg' => 'DOWN'); } if ($status['running']){ $this->server_echo("UP\n"); return array('status' => true, 'msg' => 'UP'); }else{ $this->server_echo("DOWN\n"); return array('status' => false, 'msg' => 'DOWN'); } } // 开启进程 private function backend_start($jobname, $script_cmd){ // 检查进程名是否已经存在 if (isset($this->processes[$jobname])){ // 取进程状态 $status = proc_get_status($this->processes[$jobname]); if (!$status){ $this->force_stop_process($jobname); $this->server_echo("FAILED. (proc_get_status failed.)\n"); return array('status' => false, 'msg' => "FAILED. (proc_get_status failed.)\n"); } // 检查进程是否正在运行 if ($status['running']){ $this->server_echo("FAILED. (process $jobname has already exist.)\n"); return array('status' => false, 'msg' => "FAILED. (process $jobname has already exist.)\n"); }else{ // 停止 $this->force_stop_process($jobname); } } if (!file_exists($script_cmd)){ // 文件不存在 $this->server_echo("FAILED. ($script_cmd does not exist.)\n"); return array('status' => false, 'msg' => "FAILED. ($script_cmd does not exist.)\n"); } // 执行后台进程 $descriptorspec = array( 0 => array("pipe", "r"), 1 => array("pipe", "w"), 2 => array("file", $this->log_file, "a") ); $php_path = $this->get_php_path(); $this->processes[$jobname] = proc_open("{$php_path} {$script_cmd}", $descriptorspec, $this->pipes[$jobname], dirname($script_cmd)); if (!is_resource($this->processes[$jobname])){ $this->server_echo("FAILED. (proc_open failed.)\n"); return array('status' => false, 'msg' => 'FAILED. (proc_open failed.)'); } // 非阻塞模式读取 $output_pipe = $this->pipes[$jobname][1]; stream_set_blocking($output_pipe, 0); // 记录缓冲区行数 $extra_settings[$jobname] = array( 'bufferlines' => 10 ); // 创建共享变量用于存储输出缓冲 $output_buffer = array(); if (!$this->share_mem->put_var($jobname, $output_buffer)){ $this->server_echo("shm put_var() failed.\n"); return array('status' => false, 'msg' => "shm put_var() failed.\n"); } fclose($this->pipes[$jobname][0]); //新建一个子进程用于读取进程输出 $pid = pcntl_fork(); if ($pid == -1){ $this->server_echo("pcntl_fork() failed.\n"); return array('status' => false, 'msg' => "pcntl_fork() failed.\n"); }else if ($pid){ //父进程 $child_pids[$jobname] = $pid; pcntl_waitpid($t_pid, $status); $this->server_echo("START OK\n"); return array('status' => true, 'msg' => "SUCESS"); }else{ // 新建一个孙子进程用于避免僵尸进程 $t_pid = pcntl_fork(); if ($t_pid == -1){ $this->server_echo("pcntl_fork() failed.\n"); return array('status' => false, 'msg' => "pcntl_fork() failed.\n"); }else if ($t_pid){ // 父进程 exit; }else{ //取出共享内存中的输出缓冲 $output_buffer = $this->share_mem->get_var($jobname); while (TRUE){ $read = array($output_pipe); $write = NULL; $except = NULL; if (FALSE === ($num_changed_streams = stream_select($read, $write, $except, 3))){ continue; }elseif ($num_changed_streams > 0){ $output = stream_get_contents($output_pipe); // 缓存输出 if ($output !== ''){ $buffer_lines = $extra_settings[$jobname]['bufferlines'] + 1; $output_lines = explode("\n", $output); $old_len = count($output_buffer); if ($old_len > 0){ $output_buffer[$old_len-1] .= array_shift($output_lines); } $output_buffer = array_merge($output_buffer, $output_lines); $output_buffer = array_slice($output_buffer, -$buffer_lines, $buffer_lines); // 更新共享变量 if (!$this->share_mem->put_var($jobname, $output_buffer)){ $this->server_echo("shm put_var() failed.\n"); } }else{ break; } } } exit; } } } // 结束进程 // $is_restart 是否是重启进程,如果是,则SOCKET不输出 function backend_stop($jobname, $graceful=FALSE){ if (!isset($this->processes[$jobname])){ $this->server_echo("FAILED. (process $jobname does not exist.)\n"); return array('status' => false, 'msg' => "FAILED. (process $jobname does not exist.)\n"); } $status = proc_get_status($this->processes[$jobname]); if (!$status){ $this->force_stop_process($jobname); $this->server_echo("FAILED. (proc_get_status failed.)\n"); return array('status' => false, 'msg' => "FAILED. (proc_get_status failed.)\n"); } if ($graceful){ $this->graceful_stop_process($jobname); }else{ $this->force_stop_process($jobname); } $this->server_echo("OK\n"); return array('status' => true, 'msg' => 'SUCESS'); } // 服务器输出 private function server_echo($str){ $this->server_output_buffer[] = $str; $this->server_output_buffer = array_slice($this->server_output_buffer, -20, 20); echo $str . "\n"; } // 返回进程占用的实际内存值 private function my_memory_get_usage(){ $pid = getmypid(); $status = file_get_contents("/proc/{$pid}/status"); preg_match('/VmRSS\:\s+(\d+)\s+kB/', $status, $matches); $vmRSS = $matches[1]; return $vmRSS*1024; } // 读取进程输出缓冲区 private function share_mem_read($jobname){ if (!isset($this->processes[$jobname])){ // 进程不存在 $this->server_echo("NULL. (process does not exist.)\n"); return array('status' => false, 'msg' => 'process does not exist'); } $status = proc_get_status($this->processes[$jobname]); if (!$status){ $this->force_stop_process($jobname); $this->server_echo("NULL. (proc_get_status failed.)\n"); return array('status' => false, 'msg' => 'proc_get_status failed'); } // 取出共享内存中的输出缓冲 $output_buffer = $this->share_mem->get_var($jobname); if ($output_buffer){ $content = implode("\n", $output_buffer)."\n"; return array('status' => true, 'msg' => $content); }else{ return array('status' => false, 'msg' => 'there have not share mem!'); } } }
server_ip = $server_ip; $this->server_port = $server_port; } // 查询进程状态 返回:UP(正常)、DOWN(当机) public function status($jobname){ return $this->_cmd("STATUS {$jobname}"); } // 开启新进程 OK(成功)、FAILED(失败) public function start($jobname, $script_cmd){ return $this->_cmd("START {$jobname} {$script_cmd}"); } // 结束进程 返回:OK(成功)、FAILED(失败) public function stop($jobname, $graceful=FALSE){ $p2 = $graceful ? 1 : 0; return $this->_cmd("STOP {$jobname} {$p2}"); } // 重启进程 OK(成功)、FAILED(失败) public function restart($jobname, $graceful=FALSE){ $p2 = $graceful ? 1 : 0; return $this->_cmd("RESTART {$jobname} {$p2}"); } // 读取进程服务器的输出缓冲 返回:进程服务器使用的内存 public function servermem(){ return $this->_cmd("SERVERMEM"); } // 读取进程输出缓冲 // 返回:进程输出缓冲区内容 public function read($jobname){ return substr($this->_cmd("READ {$jobname}"), 0, -1); } // 读取进程服务器的输出缓冲 // 返回:进程服务器输出缓冲区内容 public function serverread(){ return $this->_cmd("SERVERREAD"); } // 执行命令并返回结果 private function _cmd($primitive){ if (!($sock = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP))){ return FALSE; } if (!@socket_connect($sock, $this->server_ip, $this->server_port)){ return FALSE; } socket_write($sock, $primitive); $rt = socket_read($sock, 1024); socket_close($sock); return $rt; } }
shm_key = $this->_gen_key($shm_name); $this->shm_size = $shm_size; } /* * 连接到共享内存 * * @return bool 成功TRUE,失败FALSE */ public function attach(){ try { $this->shm_id = shm_attach($this->shm_key, $this->shm_size); // 初始化信号量 $this->sem = sem_get($this->shm_key, 1); } catch (Exception $e) { return FALSE; } return TRUE; } /* * 从共享内存断开 * * @return bool 成功TRUE,失败FALSE */ public function detach(){ shm_detach($this->shm_id); return TRUE; } /* * 删除共享内存 * * @return bool 成功TRUE,失败FALSE */ public function remove(){ shm_remove($this->shm_id); return TRUE; } /* * 将变量名及对应值写入共享内存中 * * @param string $varname 变量名 * @param string $value 变量值 * * @return bool 写入成功TRUE,失败FALSE */ public function put_var($varname, $value){ $varkey = $this->_gen_key($varname); sem_acquire($this->sem); $result = shm_put_var($this->shm_id, $varkey, $value); sem_release($this->sem); if ($result) return TRUE; } /* * 从共享内存中取出变量值 * * @param string $varname 变量名 * * @return mixed 变量名对应的值,失败返回FALSE */ public function get_var($varname){ $varkey = $this->_gen_key($varname); return shm_get_var($this->shm_id, $varkey); } /* * 从共享内存中删除变量 * * @param string $varname 变量名 * * @return mixed 删除成功TRUE,失败FALSE */ public function remove_var($varname){ $varkey = $this->_gen_key($varname); sem_acquire($this->sem); $result = shm_remove_var($this->shm_id, $varkey); sem_release($this->sem); return $result; } /* * 生成指定字符串对应的键 * * @param string $name 字符串 * * @return int 键 */ private function _gen_key($str){ // 假设碰撞机率比较低 return hexdec(substr(md5($str), 8, 8)); } }
<?php error_reporting(E_ALL); ini_set('display_errors', true); date_default_timezone_set("Asia/Shanghai"); $p = mysql_connect('127.0.0.1', 'root', 'root'); mysql_select_db('test'); mysql_query('set names utf8'); while(true){ $time = time(); $sql = "insert into user(`time`) values('{$time}')"; $back = mysql_query($sql); if($back){ echo date('Y-m-d H:i:s') . ':插入成功' . "\n"; }else{ echo date('Y-m-d H:i:s') . ':插入失败' . "\n"; } sleep(10); }