Home >Backend Development >Python Tutorial >Python for devops
Here are some important Python modules used for DevOps automation:
os module: The os module provides a way to interact with the operating system, including file operations, process management, and system information.
Requests and urllib3 modules: The Requests and urllib3 modules are used to send HTTP requests and handle HTTP responses.
Logging module: The logging module provides a way to log messages from Python applications.
boto3 module: The boto3 module provides an interface to the Amazon Web Services (AWS) SDK for Python.
paramiko module : The paramiko module is a Python implementation of SSH protocol, used for secure remote connections.
JSON module : The JSON module is used to encode and decode JSON data.
PyYAML module : The PyYAML module provides a way to parse and generate YAML data.
pandas module: The pandas module provides data analysis tools, including data manipulation and data visualization.
smtplib module: The smtplib module provides a way to send email messages from Python applications.
Python Use Cases in DevOps
example code:
import boto3 def lambda_handler(event, context): ec2 = boto3.client('ec2') # Get all EBS snapshots response = ec2.describe_snapshots(OwnerIds=['self']) # Get all active EC2 instance IDs instances_response = ec2.describe_instances(Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]) active_instance_ids = set() for reservation in instances_response['Reservations']: for instance in reservation['Instances']: active_instance_ids.add(instance['InstanceId']) # Iterate through each snapshot and delete if it's not attached to any volume or the volume is not attached to a running instance for snapshot in response['Snapshots']: snapshot_id = snapshot['SnapshotId'] volume_id = snapshot.get('VolumeId') if not volume_id: # Delete the snapshot if it's not attached to any volume ec2.delete_snapshot(SnapshotId=snapshot_id) print(f"Deleted EBS snapshot {snapshot_id} as it was not attached to any volume.") else: # Check if the volume still exists try: volume_response = ec2.describe_volumes(VolumeIds=[volume_id]) if not volume_response['Volumes'][0]['Attachments']: ec2.delete_snapshot(SnapshotId=snapshot_id) print(f"Deleted EBS snapshot {snapshot_id} as it was taken from a volume not attached to any running instance.") except ec2.exceptions.ClientError as e: if e.response['Error']['Code'] == 'InvalidVolume.NotFound': # The volume associated with the snapshot is not found (it might have been deleted) ec2.delete_snapshot(SnapshotId=snapshot_id) print(f"Deleted EBS snapshot {snapshot_id} as its associated volume was not found.")
repo:https://github.com/PRATIKNALAWADE/AWS-Cost-Optimization/blob/main/ebs_snapshots.py
In a CI/CD pipeline, automation is key to ensuring that code changes are built, tested, and deployed consistently and reliably. Python can be used to interact with CI/CD tools like Jenkins, GitLab CI, or CircleCI, either by triggering jobs, handling webhook events, or interacting with various APIs to deploy applications.
Below is an example of how you can use Python to automate certain aspects of a CI/CD pipeline using Jenkins.
Scenario:
You have a Python script that needs to trigger a Jenkins job whenever a new commit is pushed to the main branch of a GitHub repository. The script will also pass some parameters to the Jenkins job, such as the Git commit ID and the branch name.
First, ensure that you have a Jenkins job configured to accept parameters. You will need the job name, Jenkins URL, and an API token for authentication.
Below is a Python script that triggers the Jenkins job with specific parameters:
import requests import json # Jenkins server details jenkins_url = 'http://your-jenkins-server.com' job_name = 'your-job-name' username = 'your-username' api_token = 'your-api-token' # Parameters to pass to the Jenkins job branch_name = 'main' commit_id = 'abc1234def5678' # Construct the job URL job_url = f'{jenkins_url}/job/{job_name}/buildWithParameters' # Define the parameters to pass params = { 'BRANCH_NAME': branch_name, 'COMMIT_ID': commit_id } # Trigger the Jenkins job response = requests.post(job_url, auth=(username, api_token), params=params) # Check the response if response.status_code == 201: print('Jenkins job triggered successfully.') else: print(f'Failed to trigger Jenkins job: {response.status_code}, {response.text}')
Jenkins Details:
Parameters:
Requests Library:
Response Handling:
To trigger this Python script automatically whenever a new commit is pushed to the main branch, you can configure a GitHub webhook that sends a POST request to your server (where this Python script is running) whenever a push event occurs.
GitHub Webhook Configuration:
Handling the Webhook:
from flask import Flask, request, jsonify import requests app = Flask(__name__) # Jenkins server details jenkins_url = 'http://your-jenkins-server.com' job_name = 'your-job-name' username = 'your-username' api_token = 'your-api-token' @app.route('/webhook', methods=['POST']) def github_webhook(): payload = request.json # Extract branch name and commit ID from the payload branch_name = payload['ref'].split('/')[-1] # Get the branch name commit_id = payload['after'] # Only trigger the job if it's the main branch if branch_name == 'main': job_url = f'{jenkins_url}/job/{job_name}/buildWithParameters' params = { 'BRANCH_NAME': branch_name, 'COMMIT_ID': commit_id } response = requests.post(job_url, auth=(username, api_token), params=params) if response.status_code == 201: return jsonify({'message': 'Jenkins job triggered successfully.'}), 201 else: return jsonify({'message': 'Failed to trigger Jenkins job.'}), response.status_code return jsonify({'message': 'No action taken.'}), 200 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)
Deploy this Flask app on a server and ensure it is accessible via the public internet, so GitHub's webhook can send data to it.
This example illustrates how Python can be integrated into a CI/CD pipeline, interacting with tools like Jenkins to automate essential tasks.
In this example, we'll use Python to manage server configurations with Ansible. The script will run Ansible playbooks to ensure servers are configured consistently and orchestrate the deployment of multiple services.
Scenario:
You need to configure a set of servers to ensure they have the latest version of a web application, along with necessary dependencies and configurations. You want to use Ansible for configuration management and Python to trigger and manage Ansible playbooks.
playbooks/setup.yml:
This Ansible playbook installs necessary packages and configures the web server.
--- - name: Configure web servers hosts: web_servers become: yes tasks: - name: Install nginx apt: name: nginx state: present - name: Deploy web application copy: src: /path/to/local/webapp dest: /var/www/html/webapp owner: www-data group: www-data mode: '0644' - name: Ensure nginx is running service: name: nginx state: started enabled: yes
inventory/hosts:
Define your servers in the Ansible inventory file.
[web_servers] server1.example.com server2.example.com
The Python script will use the subprocess module to run Ansible commands and manage playbook execution.
import subprocess def run_ansible_playbook(playbook_path, inventory_path): """ Run an Ansible playbook using the subprocess module. :param playbook_path: Path to the Ansible playbook file. :param inventory_path: Path to the Ansible inventory file. :return: None """ try: result = subprocess.run( ['ansible-playbook', '-i', inventory_path, playbook_path], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) print('Ansible playbook executed successfully.') print(result.stdout) except subprocess.CalledProcessError as e: print('Ansible playbook execution failed.') print(e.stderr) if __name__ == '__main__': # Paths to the playbook and inventory files playbook_path = 'playbooks/setup.yml' inventory_path = 'inventory/hosts' # Run the Ansible playbook run_ansible_playbook(playbook_path, inventory_path)
Ansible Playbook (setup.yml):
Inventory File (hosts):
Python Script (run_ansible_playbook function):
python3 your_script_name.py
By integrating Python with Ansible, you can automate server configuration and orchestration tasks efficiently. Python scripts can manage and trigger Ansible playbooks, ensuring that server configurations are consistent and deployments are orchestrated seamlessly.
In a modern monitoring setup, you often need to collect metrics and logs from various services, analyze them, and push them to monitoring systems like Prometheus or Elasticsearch. Python can be used to gather and process this data, and set up automated alerts based on specific conditions.
Scenario:
You want to collect custom metrics and logs from your application and push them to Prometheus and Elasticsearch. Additionally, you'll set up automated alerts based on specific conditions.
To collect and expose custom metrics from your application, you can use the prometheus_client library in Python.
Install prometheus_client:
pip install prometheus_client
Python Script to Expose Metrics (metrics_server.py):
from prometheus_client import start_http_server, Gauge import random import time # Create a metric to track the number of requests REQUESTS = Gauge('app_requests_total', 'Total number of requests processed by the application') def process_request(): """Simulate processing a request.""" REQUESTS.inc() # Increment the request count if __name__ == '__main__': # Start up the server to expose metrics start_http_server(8000) # Metrics will be available at http://localhost:8000/metrics # Simulate processing requests while True: process_request() time.sleep(random.uniform(0.5, 1.5)) # Simulate random request intervals
To push logs to Elasticsearch, you can use the elasticsearch Python client.
Install elasticsearch:
pip install elasticsearch
Python Script to Send Logs (log_collector.py):
from elasticsearch import Elasticsearch import logging import time # Elasticsearch client setup es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) index_name = 'application-logs' # Configure Python logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger('log_collector') def log_message(message): """Log a message and send it to Elasticsearch.""" logger.info(message) es.index(index=index_name, body={'message': message, 'timestamp': time.time()}) if __name__ == '__main__': while True: log_message('This is a sample log message.') time.sleep(5) # Log every 5 seconds
To set up alerts, you need to define alerting rules based on the metrics and logs collected. Here’s an example of how you can configure alerts with Prometheus.
Prometheus Alerting Rules (prometheus_rules.yml):
groups: - name: example_alerts rules: - alert: HighRequestRate expr: rate(app_requests_total[1m]) > 5 for: 2m labels: severity: critical annotations: summary: "High request rate detected" description: "Request rate is above 5 requests per minute for the last 2 minutes."
Deploying Alerts:
rule_files: - 'prometheus_rules.yml'
kill -HUP $(pgrep prometheus)
Grafana Setup:
Add Prometheus as a Data Source:
Go to Grafana's data source settings and add Prometheus.
Create Dashboards:
Create dashboards in Grafana to visualize the metrics exposed by your application. You can set up alerts in Grafana as well, based on the metrics from Prometheus.
Elasticsearch Alerting:
Install Elastic Stack Alerting Plugin:
If you're using Elasticsearch with Kibana, you can use Kibana's alerting features to create alerts based on log data. You can set thresholds and get notifications via email, Slack, or other channels.
Define Alert Conditions:
Use Kibana to define alert conditions based on your log data indices.
By using Python scripts to collect and process metrics and logs, and integrating them with tools like Prometheus and Elasticsearch, you can create a robust monitoring and alerting system. The examples provided show how to expose custom metrics, push logs, and set up alerts for various conditions. This setup ensures you can proactively monitor your application, respond to issues quickly, and maintain system reliability.
Routine maintenance tasks like backups, system updates, and log rotation are essential for keeping your infrastructure healthy. You can automate these tasks using Python scripts and schedule them with cron jobs. Below are examples of Python scripts for common routine maintenance tasks and how to set them up with cron.
Scenario:
Create a Python script to back up a directory to a backup location. This script will be scheduled to run daily to ensure that your data is regularly backed up.
Backup Script (backup_script.py):
import shutil import os from datetime import datetime # Define source and backup directories source_dir = '/path/to/source_directory' backup_dir = '/path/to/backup_directory' # Create a timestamped backup file name timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') backup_file = f'{backup_dir}/backup_{timestamp}.tar.gz' def create_backup(): """Create a backup of the source directory.""" shutil.make_archive(backup_file.replace('.tar.gz', ''), 'gztar', source_dir) print(f'Backup created at {backup_file}') if __name__ == '__main__': create_backup()
Scenario:
Create a Python script to update the system packages. This script will ensure that the system is kept up-to-date with the latest security patches and updates.
System Update Script (system_update.py):
import subprocess def update_system(): """Update the system packages.""" try: subprocess.run(['sudo', 'apt-get', 'update'], check=True) subprocess.run(['sudo', 'apt-get', 'upgrade', '-y'], check=True) print('System updated successfully.') except subprocess.CalledProcessError as e: print(f'Failed to update the system: {e}') if __name__ == '__main__': update_system()
Scenario:
Create a Python script to rotate log files, moving old logs to an archive directory and compressing them.
Log Rotation Script (log_rotation.py):
import os import shutil from datetime import datetime # Define log directory and archive directory log_dir = '/path/to/log_directory' archive_dir = '/path/to/archive_directory' def rotate_logs(): """Rotate log files by moving and compressing them.""" for log_file in os.listdir(log_dir): log_path = os.path.join(log_dir, log_file) if os.path.isfile(log_path): timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') archive_file = f'{archive_dir}/{log_file}_{timestamp}.gz' shutil.copy(log_path, archive_file) shutil.make_archive(archive_file.replace('.gz', ''), 'gztar', root_dir=archive_dir, base_dir=log_file) os.remove(log_path) print(f'Log rotated: {archive_file}') if __name__ == '__main__': rotate_logs()
You need to set up cron jobs to schedule these scripts to run at specific intervals. Use the crontab command to edit the cron schedule.
crontab -e
Daily Backup at 2 AM:
0 2 * * * /usr/bin/python3 /path/to/backup_script.py
Weekly System Update on Sunday at 3 AM:
0 3 * * 0 /usr/bin/python3 /path/to/system_update.py
Log Rotation Every Day at Midnight:
0 0 * * * /usr/bin/python3 /path/to/log_rotation.py
Explanation:
Using Python scripts for routine tasks and maintenance helps automate critical processes such as backups, system updates, and log rotation. By scheduling these scripts with cron jobs, you ensure that these tasks are performed consistently and without manual intervention. This approach enhances the reliability and stability of your infrastructure, keeping it healthy and up-to-date.
The above is the detailed content of Python for devops. For more information, please follow other related articles on the PHP Chinese website!