Maison >Opération et maintenance >Nginx >Comment Python obtient-il le log Nginx correspondant à la demande de tâche en temps réel ?
Pendant le processus de test des exigences du projet, vous devez envoyer des demandes de cas d'utilisation au serveur Nginx, puis vérifier le journal Nginx correspondant pour déterminer s'il existe un contenu caractéristique à déterminer si la tâche est exécutée avec succès. Afin d'améliorer l'efficacité, ce processus doit être automatisé.
Python 3.6.5
Conception et mise en œuvre du code
#!/usr/bin/env python # -*- coding:utf-8 -*- """ @CreateTime: 2021/06/26 9:05 @Author : shouke """ import time import threading import subprocess from collections import deque def collect_nginx_log(): global nginx_log_queue global is_tasks_compete global task_status args = "tail -0f /usr/local/openresty/nginx/logs/access.log" while task_status != "req_log_got": with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc: log_for_req = "" outs, errs = "", "" try: outs, errs = proc.communicate(timeout=2) except subprocess.TimeoutExpired: print("获取nginx日志超时,正在重试") proc.kill() try: outs, errs = proc.communicate(timeout=5) except subprocess.TimeoutExpired: print("获取nginx日志超时,再次超时,停止重试") break finally: for line in outs.split(" "): flag = ""client_ip":"10.118.0.77"" # 特征 if flag in line: # 查找包含特征内容的日志 log_for_req += line if task_status == "req_finished": nginx_log_queue.append(log_for_req) task_status = "req_log_got" def run_tasks(task_list): """ 运行任务 :param task_list 任务列表 """ global nginx_log_queue global is_tasks_compete global task_status for task in task_list: thread = threading.Thread(target=collect_nginx_log, name="collect_nginx_log") thread.start() time.sleep(1) # 执行任务前,让收集日志线程先做好准备 print("正在执行任务:%s" % task.get("name")) # 执行Nginx任务请求 # ... task_status = "req_finished" time_to_wait = 0.1 while task_status != "req_log_got": # 请求触发的nginx日志收集未完成 time.sleep(time_to_wait) time_to_wait += 0.01 else:# 获取到用例请求触发的nginx日志 if nginx_log_queue: nginx_log = nginx_log_queue.popleft() task_status = "req_ready" # 解析日志 # do something here # ... else: print("存储请求日志的队列为空") # do something here # ... if __name__ == "__main__": nginx_log_queue = deque() is_tasks_compete = False # 所有任务是否执行完成 task_status = "req_ready" # req_ready,req_finished,req_log_got # 存放执行次任务任务的一些状态 print("###########################任务开始###########################") tast_list = [{"name":"test_task", "other":"..."}] run_tasks(tast_list) is_tasks_compete = True current_active_thread_num = len(threading.enumerate()) while current_active_thread_num != 1: time.sleep(2) current_active_thread_num = len(threading.enumerate()) print("###########################任务完成###########################")
Remarque :
# 🎜🎜#1. Pourquoi les codes ci-dessus ne sont-ils pas complétés en une seule étape et directementtail -0f /usr/local/openresty/nginx/logs/access.log | grep "feature content"
? En effet, le journal Nginx ne peut pas être obtenu en procédant ainsi tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征内容"
呢?这是因为这样做无法获取到Nginx的日志
2、实践时发现,第一次执行proc.communicate(timeout=2)
获取日志时,总是无法获取,会超时,需要二次获取,并且timeout
设置太小时(实践时尝试过设置为1秒
2 En pratique, il a été constaté que lorsque proc.communicate(timeout=2)
est exécuté pour la première fois. Il est temps d'obtenir le journal, il est toujours Impossible d'obtenir, il expirera et une deuxième acquisition sera nécessaire. Si le timeout
est trop petit (en pratique, j'ai essayé de le régler sur 1 seconde), cela entraînera également l'échec de la deuxième exécution. Obtenez les journaux Nginx.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!