首頁 >php教程 >PHP源码 >php 定时脚本管理器

php 定时脚本管理器

PHP中文网
PHP中文网原創
2016-05-25 17:01:071016瀏覽

跳至          

server_ip   = $server_ip;
		$this->server_port = $server_port;
		$this->job_path    = $job_path;
		$this->pipes       = array();
		$this->processes   = array();
		$this->log_file    = $log_file;
		$this->share_mem_file = $share_mem_file;
	}

	//初始化socket
	public function init(){
		// 开始监听
		$sock = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
		if (!$sock){
			echo "socket_create() failed.\n";
			exit;
		}

		if (!socket_set_option($sock, SOL_SOCKET, SO_REUSEADDR, 1)){
			echo "socket_set_option() failed.\n";
			exit;
		}

		//绑定端口和ip
		if (!($ret = @socket_bind($sock, $this->server_ip, $this->server_port))){
			echo "socket_bind() failed.\n";
			exit;
		}

		//等待监听
		if (!($ret = @socket_listen($sock, 5))){
			echo "socket_listen() failed.\n";
			exit;
		}

		//设置
		$this->sock = $sock;

		//设置共享内存
		if(!file_exists($this->share_mem_file)){
			echo "share memory file not exists.\n";
			exit;
		}else{
			include_once($this->share_mem_file);
		}

		//初始化共享内存
		$this->share_mem = new share_memory('shm_key_of_server_'.$this->server_ip.'_'.$this->server_port);
		if(!$this->share_mem->attach()){
			echo "shm attach() failed.\n";
			exit;
		}
	}

	//开始监听
	public function start(){
		// 循环处理
		while (TRUE){
			// 等待连接
			$this->server_echo("Waiting for new command...");

			$this->connect = @socket_accept($this->sock);
			if (!$this->connect){
				$this->server_echo("socket_accept() failed.\n");
				socket_write($this->connect, "socket_accept() failed.\n");
				break;
			}

			// 读取输入
			if (!($input = @socket_read($this->connect, 1024))) {
				$this->server_echo("socket_read() failed.\n");
				socket_write($this->connect, "socket_read() failed.\n");
				break;
			}

			// 分析并执行命令
			$input_arr = explode(' ', trim($input));
			if (count($input_arr) > 1){
				list($cmd, $params) = explode(' ', trim($input), 2);
			}else{
				$cmd = $input;
				$params = '';
			}

			//输出服务器内容
			$this->server_echo(date('Y-m-d H:i:s e')."\n$cmd $params\n");

			//遍历功能
			switch ($cmd){
				case 'STATUS':	// 获取进程状态
					$jobname = $params;
					$res = $this->backend_status($jobname);
					socket_write($this->connect, $res['msg']);
					break;
				case 'START':	// 开启进程
					$params = explode(' ', $params);
					$params_len = count($params);
					if ($params_len == 1){
						// 没有输入程序路径
						socket_write($this->connect, 'PARAMS FAILED');
						break;
					}
					$jobname 	= array_shift($params);
					$job_file 	= array_shift($params);
					$script_cmd = $this->job_path . $job_file;
					$res = $this->backend_start($jobname, $script_cmd);
					socket_write($this->connect, $res['msg']);
					break;
				case 'STOP':	// 结束进程 STOP NAME 0
					list($jobname, $graceful) = explode(' ', $params);
					$res = $this->backend_stop($jobname, $graceful);
					socket_write($this->connect, $res['msg']);
					break;
				case 'SERVERMEM':	// 读取服务器内存占用情况
					$mem = $this->my_memory_get_usage();
					socket_write($this->connect, $mem);
					break;
				case 'READ':
					$jobname = $params;
					$res= $this->share_mem_read($jobname);
					socket_write($this->connect, $res['msg']);
					break;
				case 'SERVERREAD':
					socket_write($this->connect, implode('', $this->server_output_buffer));
					break;
			}
		}
	}

	// 获取运行当前脚本的PHP解析器路径
	private function get_php_path(){
		return readlink('/proc/'.getmypid().'/exe');
	}

	// 强制结束进程
	private function force_stop_process($jobname){
		$this->stop_process($jobname, FALSE);
	}

	// 优雅结束进程
	private function graceful_stop_process($jobname){
		$this->stop_process($jobname, TRUE);
	}

	// 结束进程,并释放相关资源
	private function stop_process($jobname, $graceful){
		if (!$graceful) {
			// 强制结束proc_open打开的进程
			$status = proc_get_status($this->processes[$jobname]);
			exec('kill -9 '.$status['pid'].' 2>/dev/null >&- >/dev/null');
		}

		proc_terminate($this->processes[$jobname]);
		proc_close($this->processes[$jobname]);
		unset($this->processes[$jobname]);
	}

	// 查看进程状态
	private function backend_status($jobname){
		if (!isset($this->processes[$jobname])){
			// 进程不存在
			$this->server_echo("DOWN. (process $jobname does not exist.)\n");
			return  array('status' => false, 'msg' => 'DOWN');
		}

		$status = proc_get_status($this->processes[$jobname]);
		if (!$status){
			$this->force_stop_process($jobname);
			$this->server_echo("DOWN. (proc_get_status failed.)\n");
			return  array('status' => false, 'msg' => 'DOWN');
		}

		if ($status['running']){
			$this->server_echo("UP\n");
			return  array('status' => true, 'msg' => 'UP');
		}else{
			$this->server_echo("DOWN\n");
			return  array('status' => false, 'msg' => 'DOWN');
		}
	}

	// 开启进程
	private function backend_start($jobname, $script_cmd){
		// 检查进程名是否已经存在
		if (isset($this->processes[$jobname])){
			// 取进程状态
			$status = proc_get_status($this->processes[$jobname]);
			if (!$status){
				$this->force_stop_process($jobname);
				$this->server_echo("FAILED. (proc_get_status failed.)\n");
				return  array('status' => false, 'msg' => "FAILED. (proc_get_status failed.)\n");
			}

			// 检查进程是否正在运行
			if ($status['running']){
				$this->server_echo("FAILED. (process $jobname has already exist.)\n");
				return  array('status' => false, 'msg' => "FAILED. (process $jobname has already exist.)\n");
			}else{
				// 停止
				$this->force_stop_process($jobname);
			}
		}

		if (!file_exists($script_cmd)){
			// 文件不存在
			$this->server_echo("FAILED. ($script_cmd does not exist.)\n");
			return  array('status' => false, 'msg' => "FAILED. ($script_cmd does not exist.)\n");
		}

		// 执行后台进程
		$descriptorspec = array(
			0 => array("pipe", "r"),
			1 => array("pipe", "w"),
			2 => array("file", $this->log_file, "a")
		);

		$php_path = $this->get_php_path();
		$this->processes[$jobname] = proc_open("{$php_path} {$script_cmd}", $descriptorspec, $this->pipes[$jobname], dirname($script_cmd));

		if (!is_resource($this->processes[$jobname])){
			$this->server_echo("FAILED. (proc_open failed.)\n");
			return  array('status' => false, 'msg' => 'FAILED. (proc_open failed.)');
		}

		// 非阻塞模式读取
		$output_pipe = $this->pipes[$jobname][1];
		stream_set_blocking($output_pipe, 0);
		
		// 记录缓冲区行数
		$extra_settings[$jobname] = array(
			'bufferlines' => 10
		);

		// 创建共享变量用于存储输出缓冲
		$output_buffer = array();
		if (!$this->share_mem->put_var($jobname, $output_buffer)){
			$this->server_echo("shm put_var() failed.\n");
			return  array('status' => false, 'msg' => "shm put_var() failed.\n");
		}
		fclose($this->pipes[$jobname][0]);

		//新建一个子进程用于读取进程输出
		$pid = pcntl_fork();
		if ($pid == -1){
			$this->server_echo("pcntl_fork() failed.\n");
			return  array('status' => false, 'msg' => "pcntl_fork() failed.\n");
		}else if ($pid){
			//父进程
			$child_pids[$jobname] = $pid;
			pcntl_waitpid($t_pid, $status);
			$this->server_echo("START OK\n");
			return  array('status' => true, 'msg' => "SUCESS");
		}else{
			// 新建一个孙子进程用于避免僵尸进程
			$t_pid = pcntl_fork();
			if ($t_pid == -1){
				$this->server_echo("pcntl_fork() failed.\n");
				return  array('status' => false, 'msg' => "pcntl_fork() failed.\n");
			}else if ($t_pid){
				// 父进程
				exit;
			}else{
				//取出共享内存中的输出缓冲
				$output_buffer = $this->share_mem->get_var($jobname);
				while (TRUE){
					$read   = array($output_pipe);
					$write  = NULL;
					$except = NULL;

					if (FALSE === ($num_changed_streams = stream_select($read, $write, $except, 3))){
						continue;
					}elseif ($num_changed_streams > 0){
						$output = stream_get_contents($output_pipe);

						// 缓存输出
						if ($output !== ''){
							$buffer_lines = $extra_settings[$jobname]['bufferlines'] + 1;
							$output_lines = explode("\n", $output);
							$old_len = count($output_buffer);
							if ($old_len > 0){
								$output_buffer[$old_len-1] .= array_shift($output_lines);
							}
							$output_buffer = array_merge($output_buffer, $output_lines);
							$output_buffer = array_slice($output_buffer, -$buffer_lines, $buffer_lines);

							// 更新共享变量
							if (!$this->share_mem->put_var($jobname, $output_buffer)){
								$this->server_echo("shm put_var() failed.\n");
							}
						}else{
							break;
						}
					}
				}
				exit;
			}
		}
	}

	// 结束进程
	// $is_restart 是否是重启进程,如果是,则SOCKET不输出
	function backend_stop($jobname, $graceful=FALSE){
		if (!isset($this->processes[$jobname])){
			$this->server_echo("FAILED. (process $jobname does not exist.)\n");
			return  array('status' => false, 'msg' => "FAILED. (process $jobname does not exist.)\n");
		}

		$status = proc_get_status($this->processes[$jobname]);
		if (!$status){
			$this->force_stop_process($jobname);
			$this->server_echo("FAILED. (proc_get_status failed.)\n");
			return  array('status' => false, 'msg' => "FAILED. (proc_get_status failed.)\n");
		}

		if ($graceful){
			$this->graceful_stop_process($jobname);
		}else{
			$this->force_stop_process($jobname);
		}

		$this->server_echo("OK\n");
		return  array('status' => true, 'msg' => 'SUCESS');
	}



	// 服务器输出
	private function server_echo($str){
		$this->server_output_buffer[] = $str;
		$this->server_output_buffer = array_slice($this->server_output_buffer, -20, 20);
		echo $str . "\n";
	}

	// 返回进程占用的实际内存值
	private function my_memory_get_usage(){
		$pid = getmypid();
		$status = file_get_contents("/proc/{$pid}/status");
		preg_match('/VmRSS\:\s+(\d+)\s+kB/', $status, $matches);
		$vmRSS = $matches[1];
		return $vmRSS*1024;
	}

	// 读取进程输出缓冲区
	private function share_mem_read($jobname){
		if (!isset($this->processes[$jobname])){
			// 进程不存在
			$this->server_echo("NULL. (process does not exist.)\n");
			return  array('status' => false, 'msg' => 'process does not exist');
		}

		$status = proc_get_status($this->processes[$jobname]);
		if (!$status){
			$this->force_stop_process($jobname);
			$this->server_echo("NULL. (proc_get_status failed.)\n");
			return  array('status' => false, 'msg' => 'proc_get_status failed');
		}

		// 取出共享内存中的输出缓冲
		$output_buffer = $this->share_mem->get_var($jobname);
		if ($output_buffer){
			$content = implode("\n", $output_buffer)."\n";
			return  array('status' => true, 'msg' => $content);
		}else{
			return  array('status' => false, 'msg' => 'there have not share mem!');
		}
	}
}

                                       

                   

2. [文件] client.class.php ~ 2KB    

server_ip = $server_ip;
		$this->server_port = $server_port;
	}

	// 查询进程状态 返回:UP(正常)、DOWN(当机)
	public function status($jobname){
		return $this->_cmd("STATUS {$jobname}");
	}

	// 开启新进程 OK(成功)、FAILED(失败)
	public function start($jobname, $script_cmd){
		return $this->_cmd("START {$jobname} {$script_cmd}");
	}

	// 结束进程 返回:OK(成功)、FAILED(失败)
	public function stop($jobname, $graceful=FALSE){
		$p2 = $graceful ? 1 : 0;
		return $this->_cmd("STOP {$jobname} {$p2}");
	}

	// 重启进程 OK(成功)、FAILED(失败)
	public function restart($jobname, $graceful=FALSE){
		$p2 = $graceful ? 1 : 0;
		return $this->_cmd("RESTART {$jobname} {$p2}");
	}

	// 读取进程服务器的输出缓冲 返回:进程服务器使用的内存
	public function servermem(){
		return $this->_cmd("SERVERMEM");
	}

	// 读取进程输出缓冲
	// 返回:进程输出缓冲区内容
	public function read($jobname){
		return substr($this->_cmd("READ {$jobname}"), 0, -1);
	}

	// 读取进程服务器的输出缓冲
	// 返回:进程服务器输出缓冲区内容
	public function serverread(){
		return $this->_cmd("SERVERREAD");
	}


	// 执行命令并返回结果
	private function _cmd($primitive){
		if (!($sock = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP))){
			return FALSE;
		}

		if (!@socket_connect($sock, $this->server_ip, $this->server_port)){
			return FALSE;
		}

		socket_write($sock, $primitive);
		$rt = socket_read($sock, 1024);
		socket_close($sock);
		return $rt;
	}
}

                           

           

                   

3. [文件] share_memory.php ~ 3KB     

shm_key 	= $this->_gen_key($shm_name);
		$this->shm_size = $shm_size;
    }

	/*
	 * 连接到共享内存
	 *
	 * @return bool 成功TRUE,失败FALSE
	 */
	public function attach(){
		try {
			$this->shm_id = shm_attach($this->shm_key, $this->shm_size);
			// 初始化信号量
			$this->sem = sem_get($this->shm_key, 1);
		} catch (Exception $e) {
			return FALSE;
		}
		return TRUE;
	}

	/*
	 * 从共享内存断开
	 *
	 * @return bool 成功TRUE,失败FALSE
	 */
	public function detach(){
		shm_detach($this->shm_id);
		return TRUE;
	}

	/*
	 * 删除共享内存
	 *
	 * @return bool 成功TRUE,失败FALSE
	 */
	public function remove(){
		shm_remove($this->shm_id);
		return TRUE;
	}
	
	/*
	 * 将变量名及对应值写入共享内存中
	 *
	 * @param string $varname 变量名
	 * @param string $value 变量值
	 *
	 * @return bool 写入成功TRUE,失败FALSE
	 */
	public function put_var($varname, $value){
		$varkey = $this->_gen_key($varname);

		sem_acquire($this->sem);
		$result = shm_put_var($this->shm_id, $varkey, $value);
		sem_release($this->sem);

		if ($result) 
			return TRUE;
	}

	/*
	 * 从共享内存中取出变量值
	 *
	 * @param string $varname 变量名
	 *
	 * @return mixed 变量名对应的值,失败返回FALSE
	 */
	public function get_var($varname){
		$varkey = $this->_gen_key($varname);
		return shm_get_var($this->shm_id, $varkey);
	}

	/*
	 * 从共享内存中删除变量
	 *
	 * @param string $varname 变量名
	 *
	 * @return mixed 删除成功TRUE,失败FALSE
	 */
	public function remove_var($varname){
		$varkey = $this->_gen_key($varname);
		sem_acquire($this->sem);
		$result = shm_remove_var($this->shm_id, $varkey);
		sem_release($this->sem);
		return $result;
	}
	
	/*
	 * 生成指定字符串对应的键
	 *
	 * @param string $name 字符串
	 *
	 * @return int 键
	 */
	private function _gen_key($str){
		// 假设碰撞机率比较低
		return hexdec(substr(md5($str), 8, 8));
	}
}

                           

           

                   

4. [文件] job_test.php ~ 507B     

<?php

error_reporting(E_ALL);
ini_set(&#39;display_errors&#39;, true); 
date_default_timezone_set("Asia/Shanghai"); 
$p = mysql_connect(&#39;127.0.0.1&#39;, &#39;root&#39;, &#39;root&#39;);
mysql_select_db(&#39;test&#39;);
mysql_query(&#39;set names utf8&#39;);

while(true){
	$time = time();
	$sql = "insert into user(`time`) values(&#39;{$time}&#39;)";
	$back = mysql_query($sql);
	if($back){
		echo date(&#39;Y-m-d H:i:s&#39;) . &#39;:插入成功&#39; . "\n";
	}else{
		echo date(&#39;Y-m-d H:i:s&#39;) . &#39;:插入失败&#39; . "\n";
	}

	sleep(10);
}

                           

       

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn