Skip to Content
32 CheatsheetsShell ScriptingPython Devops Cheatsheet

Python for DevOps Cheatsheet

Table of Contents

  1. File Operations
  2. System & Process Management
  3. HTTP Requests & APIs
  4. JSON & YAML
  5. AWS Automation (boto3)
  6. Azure Automation
  7. SSH & Remote Operations
  8. Docker SDK
  9. Kubernetes Client
  10. Logging & Monitoring
  11. Task Scheduling
  12. Configuration Management
  13. DevOps Scripts
  14. Interview Scenarios

File Operations

1. Pathlib (Modern File Handling)

from pathlib import Path # Create directory Path("/tmp/mydir").mkdir(parents=True, exist_ok=True) # Check if file exists if Path("/etc/config.conf").exists(): print("File exists") # Read file content = Path("/var/log/app.log").read_text() # Write file Path("/tmp/output.txt").write_text("Hello DevOps") # Iterate files for file in Path("/var/log").glob("*.log"): print(file.name, file.stat().st_size) # Get file info file_path = Path("/etc/hosts") print(f"Size: {file_path.stat().st_size}") print(f"Modified: {file_path.stat().st_mtime}") # Join paths log_file = Path("/var/log") / "application" / "app.log"

2. Traditional File Operations

import os import shutil # Read file with open('/etc/hosts', 'r') as f: content = f.read() # Write file with open('/tmp/output.txt', 'w') as f: f.write("Hello World\n") # Append to file with open('/var/log/app.log', 'a') as f: f.write(f"{datetime.now()}: Event logged\n") # Copy file shutil.copy('/etc/config.conf', '/backup/config.conf.bak') # Move file shutil.move('/tmp/old.txt', '/archive/old.txt') # Delete file os.remove('/tmp/old.txt') # Create directory os.makedirs('/tmp/mydir/subdir', exist_ok=True) # Remove directory shutil.rmtree('/tmp/mydir') # List directory for item in os.listdir('/var/log'): print(item) # Walk directory tree for root, dirs, files in os.walk('/etc'): for file in files: if file.endswith('.conf'): print(os.path.join(root, file))

System & Process Management

3. Subprocess (Execute Commands)

import subprocess # Run command and get output result = subprocess.run(['ls', '-la'], capture_output=True, text=True) print(result.stdout) print(result.returncode) # Run with shell result = subprocess.run('df -h | grep /dev/sda1', shell=True, capture_output=True, text=True) # Check command success try: subprocess.run(['systemctl', 'status', 'nginx'], check=True) print("Service is running") except subprocess.CalledProcessError: print("Service is not running") # Run with timeout try: subprocess.run(['long-running-command'], timeout=60) except subprocess.TimeoutExpired: print("Command timed out") # Pipe commands p1 = subprocess.Popen(['cat', '/var/log/syslog'], stdout=subprocess.PIPE) p2 = subprocess.Popen(['grep', 'error'], stdin=p1.stdout, stdout=subprocess.PIPE) output, _ = p2.communicate() # Environment variables env = os.environ.copy() env['MY_VAR'] = 'value' subprocess.run(['my-command'], env=env)

4. System Information

import platform import psutil # Platform info print(platform.system()) # Linux print(platform.release()) # 5.15.0 print(platform.machine()) # x86_64 # CPU info print(f"CPU cores: {psutil.cpu_count()}") print(f"CPU usage: {psutil.cpu_percent(interval=1)}%") # Memory info mem = psutil.virtual_memory() print(f"Total: {mem.total / (1024**3):.2f} GB") print(f"Available: {mem.available / (1024**3):.2f} GB") print(f"Used: {mem.percent}%") # Disk info disk = psutil.disk_usage('/') print(f"Total: {disk.total / (1024**3):.2f} GB") print(f"Used: {disk.used / (1024**3):.2f} GB") print(f"Free: {disk.free / (1024**3):.2f} GB") # Network stats net = psutil.net_io_counters() print(f"Bytes sent: {net.bytes_sent / (1024**2):.2f} MB") print(f"Bytes recv: {net.bytes_recv / (1024**2):.2f} MB") # Process management for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): if proc.info['cpu_percent'] > 50: print(f"{proc.info['name']}: CPU {proc.info['cpu_percent']}%")

HTTP Requests & APIs

5. Requests Library

import requests import json # GET request response = requests.get('https://api.github.com/users/octocat') print(response.status_code) print(response.json()) # POST request data = {'key': 'value'} response = requests.post('https://api.example.com/data', json=data) # Headers headers = { 'Authorization': 'Bearer token123', 'Content-Type': 'application/json' } response = requests.get('https://api.example.com', headers=headers) # Query parameters params = {'page': 1, 'per_page': 100} response = requests.get('https://api.example.com/items', params=params) # Error handling try: response = requests.get('https://api.example.com', timeout=5) response.raise_for_status() # Raise exception for 4xx/5xx data = response.json() except requests.exceptions.HTTPError as e: print(f"HTTP error: {e}") except requests.exceptions.ConnectionError: print("Connection failed") except requests.exceptions.Timeout: print("Request timed out") # Upload file files = {'file': open('report.pdf', 'rb')} response = requests.post('https://api.example.com/upload', files=files) # Session (persist cookies) session = requests.Session() session.get('https://api.example.com/login') session.post('https://api.example.com/data', json=data)

JSON & YAML

6. JSON Operations

import json # Parse JSON string json_string = '{"name": "server1", "ip": "10.0.0.1"}' data = json.loads(json_string) print(data['name']) # Convert to JSON string data = {'servers': [{'name': 'web1', 'ip': '10.0.0.1'}]} json_string = json.dumps(data, indent=2) # Read JSON file with open('config.json', 'r') as f: config = json.load(f) # Write JSON file with open('output.json', 'w') as f: json.dump(data, f, indent=2) # Pretty print JSON from pprint import pprint pprint(data)

7. YAML Operations

import yaml # Read YAML file with open('config.yaml', 'r') as f: config = yaml.safe_load(f) # Write YAML file data = { 'database': { 'host': 'localhost', 'port': 5432, 'users': ['admin', 'app'] } } with open('output.yaml', 'w') as f: yaml.dump(data, f, default_flow_style=False) # Parse YAML string yaml_string = """ servers: - name: web1 ip: 10.0.0.1 - name: web2 ip: 10.0.0.2 """ config = yaml.safe_load(yaml_string)

AWS Automation (boto3)

8. EC2 Operations

import boto3 # Create EC2 client ec2 = boto3.client('ec2', region_name='us-east-1') # List instances response = ec2.describe_instances() for reservation in response['Reservations']: for instance in reservation['Instances']: print(f"{instance['InstanceId']}: {instance['State']['Name']}") # Start instance ec2.start_instances(InstanceIds=['i-1234567890abcdef0']) # Stop instance ec2.stop_instances(InstanceIds=['i-1234567890abcdef0']) # Create instance response = ec2.run_instances( ImageId='ami-0c55b159cbfafe1f0', InstanceType='t2.micro', KeyName='MyKeyPair', MinCount=1, MaxCount=1, SecurityGroupIds=['sg-12345678'], SubnetId='subnet-12345678', TagSpecifications=[{ 'ResourceType': 'instance', 'Tags': [{'Key': 'Name', 'Value': 'MyServer'}] }] ) # Using resource interface (higher level) ec2_resource = boto3.resource('ec2') instances = ec2_resource.instances.filter( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) for instance in instances: print(f"{instance.id}: {instance.instance_type}")

9. S3 Operations

# Create S3 client s3 = boto3.client('s3') # List buckets response = s3.list_buckets() for bucket in response['Buckets']: print(bucket['Name']) # Upload file s3.upload_file('local.txt', 'mybucket', 'remote.txt') # Download file s3.download_file('mybucket', 'remote.txt', 'local.txt') # List objects response = s3.list_objects_v2(Bucket='mybucket', Prefix='logs/') for obj in response.get('Contents', []): print(obj['Key']) # Delete object s3.delete_object(Bucket='mybucket', Key='old-file.txt') # Using resource interface s3_resource = boto3.resource('s3') bucket = s3_resource.Bucket('mybucket') # Upload with metadata bucket.upload_file( 'local.txt', 'remote.txt', ExtraArgs={'Metadata': {'author': 'devops-team'}} ) # Iterate objects for obj in bucket.objects.filter(Prefix='logs/2024/'): print(f"{obj.key}: {obj.size} bytes")

10. Lambda Operations

# Create Lambda client lambda_client = boto3.client('lambda') # Invoke function response = lambda_client.invoke( FunctionName='my-function', InvocationType='RequestResponse', Payload=json.dumps({'key': 'value'}) ) result = json.loads(response['Payload'].read()) # List functions response = lambda_client.list_functions() for func in response['Functions']: print(f"{func['FunctionName']}: {func['Runtime']}") # Update function code with open('function.zip', 'rb') as f: lambda_client.update_function_code( FunctionName='my-function', ZipFile=f.read() )

Azure Automation

11. Azure SDK

from azure.identity import DefaultAzureCredential from azure.mgmt.compute import ComputeManagementClient from azure.mgmt.storage import StorageManagementClient # Authenticate credential = DefaultAzureCredential() subscription_id = 'your-subscription-id' # Create compute client compute_client = ComputeManagementClient(credential, subscription_id) # List VMs resource_group = 'myResourceGroup' vms = compute_client.virtual_machines.list(resource_group) for vm in vms: print(f"{vm.name}: {vm.hardware_profile.vm_size}") # Start VM async_vm_start = compute_client.virtual_machines.begin_start( resource_group, 'myVM' ) async_vm_start.wait() # Stop VM async_vm_stop = compute_client.virtual_machines.begin_deallocate( resource_group, 'myVM' ) async_vm_stop.wait() # Storage operations storage_client = StorageManagementClient(credential, subscription_id) # List storage accounts accounts = storage_client.storage_accounts.list() for account in accounts: print(account.name)

SSH & Remote Operations

12. Paramiko (SSH Client)

import paramiko # Connect to server ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('10.0.0.1', username='admin', password='password') # Execute command stdin, stdout, stderr = ssh.exec_command('uptime') print(stdout.read().decode()) # Multiple commands commands = [ 'sudo systemctl status nginx', 'df -h', 'free -h' ] for cmd in commands: stdin, stdout, stderr = ssh.exec_command(cmd) print(f"\n{cmd}:") print(stdout.read().decode()) ssh.close() # Using SSH key key = paramiko.RSAKey.from_private_key_file('/home/user/.ssh/id_rsa') ssh.connect('10.0.0.1', username='admin', pkey=key) # SFTP file transfer sftp = ssh.open_sftp() sftp.put('local.txt', '/remote/path/file.txt') sftp.get('/remote/path/file.txt', 'local.txt') sftp.close()

Docker SDK

13. Docker Operations

import docker # Create Docker client client = docker.from_env() # List containers for container in client.containers.list(): print(f"{container.name}: {container.status}") # List all containers (including stopped) for container in client.containers.list(all=True): print(container.name) # Run container container = client.containers.run( 'nginx:latest', name='my-nginx', ports={'80/tcp': 8080}, detach=True, environment={'ENV': 'production'} ) # Stop container container.stop() # Remove container container.remove() # Execute command in container result = container.exec_run('nginx -t') print(result.output.decode()) # Get logs logs = container.logs(tail=100) print(logs.decode()) # List images for image in client.images.list(): print(image.tags) # Build image image, logs = client.images.build( path='.', tag='myapp:latest', rm=True ) for line in logs: print(line) # Pull image image = client.images.pull('nginx:alpine') # Push image client.images.push('myregistry/myapp:latest')

Kubernetes Client

14. Kubernetes Python Client

from kubernetes import client, config # Load kubeconfig config.load_kube_config() # Create API client v1 = client.CoreV1Api() # List pods pods = v1.list_pod_for_all_namespaces() for pod in pods.items: print(f"{pod.metadata.namespace}/{pod.metadata.name}: {pod.status.phase}") # List pods in namespace pods = v1.list_namespaced_pod('default') for pod in pods.items: print(pod.metadata.name) # Create deployment apps_v1 = client.AppsV1Api() deployment = client.V1Deployment( metadata=client.V1ObjectMeta(name='nginx-deployment'), spec=client.V1DeploymentSpec( replicas=3, selector=client.V1LabelSelector( match_labels={'app': 'nginx'} ), template=client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={'app': 'nginx'}), spec=client.V1PodSpec( containers=[ client.V1Container( name='nginx', image='nginx:latest', ports=[client.V1ContainerPort(container_port=80)] ) ] ) ) ) ) apps_v1.create_namespaced_deployment('default', deployment) # Delete deployment apps_v1.delete_namespaced_deployment('nginx-deployment', 'default') # Get pod logs logs = v1.read_namespaced_pod_log('pod-name', 'default', tail_lines=100) print(logs) # Execute command in pod from kubernetes.stream import stream exec_command = ['sh', '-c', 'echo Hello from pod'] resp = stream( v1.connect_get_namespaced_pod_exec, 'pod-name', 'default', command=exec_command, stderr=True, stdin=False, stdout=True, tty=False ) print(resp)

Logging & Monitoring

15. Logging

import logging from logging.handlers import RotatingFileHandler # Basic configuration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) logger.info("Application started") logger.warning("Warning message") logger.error("Error occurred") # File logging with rotation handler = RotatingFileHandler( '/var/log/myapp.log', maxBytes=10*1024*1024, # 10MB backupCount=5 ) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) logger.addHandler(handler) # Multiple handlers console_handler = logging.StreamHandler() file_handler = logging.FileHandler('/var/log/app.log') logger.addHandler(console_handler) logger.addHandler(file_handler) # Structured logging (JSON) import json class JsonFormatter(logging.Formatter): def format(self, record): log_data = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'message': record.getMessage(), 'module': record.module } return json.dumps(log_data) handler.setFormatter(JsonFormatter())

16. Prometheus Metrics

from prometheus_client import Counter, Gauge, Histogram, start_http_server import time # Create metrics requests_total = Counter('requests_total', 'Total requests') active_connections = Gauge('active_connections', 'Active connections') request_duration = Histogram('request_duration_seconds', 'Request duration') # Update metrics requests_total.inc() active_connections.set(42) with request_duration.time(): # Your code here time.sleep(1) # Start metrics server start_http_server(8000) # Metrics available at http://localhost:8000/metrics

Task Scheduling

17. Schedule Library

import schedule import time def backup_databases(): print("Running database backup...") # Backup logic here def cleanup_logs(): print("Cleaning up old logs...") # Cleanup logic here # Schedule tasks schedule.every().day.at("02:00").do(backup_databases) schedule.every().hour.do(cleanup_logs) schedule.every(10).minutes.do(lambda: print("Health check")) # Run scheduler while True: schedule.run_pending() time.sleep(60)

18. APScheduler (Advanced)

from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger scheduler = BlockingScheduler() @scheduler.scheduled_job('interval', minutes=5) def check_health(): print("Health check...") @scheduler.scheduled_job(CronTrigger(hour=2, minute=0)) def nightly_backup(): print("Running backup...") scheduler.start()

Configuration Management

19. ConfigParser (INI Files)

import configparser # Read config config = configparser.ConfigParser() config.read('/etc/myapp/config.ini') db_host = config['database']['host'] db_port = config.getint('database', 'port') # Write config config['database'] = { 'host': 'localhost', 'port': '5432', 'user': 'admin' } with open('config.ini', 'w') as f: config.write(f)

20. Environment Variables

import os from dotenv import load_dotenv # Load from .env file load_dotenv() # Get environment variables db_host = os.getenv('DB_HOST', 'localhost') db_port = int(os.getenv('DB_PORT', 5432)) api_key = os.environ['API_KEY'] # Raises error if not found # Set environment variable os.environ['MY_VAR'] = 'value'

DevOps Scripts

21. Health Check Script

#!/usr/bin/env python3 import requests import sys def check_service(url): try: response = requests.get(url, timeout=5) if response.status_code == 200: print(f"✓ {url} is healthy") return True else: print(f"✗ {url} returned {response.status_code}") return False except Exception as e: print(f"✗ {url} failed: {str(e)}") return False services = [ 'http://localhost:8080/health', 'http://localhost:9090/metrics', 'http://database:5432' ] results = [check_service(url) for url in services] if not all(results): sys.exit(1)

22. Log Analyzer

#!/usr/bin/env python3 from collections import Counter import re def analyze_logs(log_file): error_count = 0 status_codes = Counter() ip_addresses = Counter() with open(log_file, 'r') as f: for line in f: if 'ERROR' in line: error_count += 1 # Parse nginx logs match = re.search(r'(\d+\.\d+\.\d+\.\d+).*"[A-Z]+ .* HTTP/\d\.\d" (\d+)', line) if match: ip = match.group(1) status = match.group(2) ip_addresses[ip] += 1 status_codes[status] += 1 print(f"Total errors: {error_count}") print("\nTop 10 IPs:") for ip, count in ip_addresses.most_common(10): print(f" {ip}: {count}") print("\nStatus codes:") for status, count in status_codes.most_common(): print(f" {status}: {count}") analyze_logs('/var/log/nginx/access.log')

23. Deployment Script

#!/usr/bin/env python3 import subprocess import sys import time def run_command(cmd, description): print(f"\n>>> {description}") result = subprocess.run(cmd, shell=True, capture_output=True, text=True) print(result.stdout) if result.returncode != 0: print(f"ERROR: {result.stderr}") sys.exit(1) return result.stdout def deploy_application(version): # Pull latest code run_command('git pull origin main', 'Pulling latest code') # Build Docker image run_command( f'docker build -t myapp:{version} .', f'Building Docker image version {version}' ) # Run tests run_command( f'docker run --rm myapp:{version} pytest', 'Running tests' ) # Stop old container run_command( 'docker stop myapp || true', 'Stopping old container' ) # Start new container run_command( f'docker run -d --name myapp -p 8080:8080 myapp:{version}', 'Starting new container' ) # Health check time.sleep(5) try: import requests response = requests.get('http://localhost:8080/health') if response.status_code == 200: print("\n✓ Deployment successful!") else: print("\n✗ Health check failed!") sys.exit(1) except Exception as e: print(f"\n✗ Deployment failed: {e}") sys.exit(1) if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: deploy.py <version>") sys.exit(1) deploy_application(sys.argv[1])

Interview Scenarios

Scenario 1: Automated EC2 Snapshot Backup

import boto3 from datetime import datetime, timedelta def backup_ec2_volumes(): ec2 = boto3.client('ec2') # Get volumes with Backup tag volumes = ec2.describe_volumes( Filters=[{'Name': 'tag:Backup', 'Values': ['true']}] ) for volume in volumes['Volumes']: volume_id = volume['VolumeId'] # Create snapshot snapshot = ec2.create_snapshot( VolumeId=volume_id, Description=f'Automated backup {datetime.now().isoformat()}' ) # Tag snapshot ec2.create_tags( Resources=[snapshot['SnapshotId']], Tags=[ {'Key': 'AutomatedBackup', 'Value': 'true'}, {'Key': 'VolumeId', 'Value': volume_id} ] ) print(f"Created snapshot {snapshot['SnapshotId']} for {volume_id}") # Delete snapshots older than 7 days snapshots = ec2.describe_snapshots( OwnerIds=['self'], Filters=[{'Name': 'tag:AutomatedBackup', 'Values': ['true']}] ) cutoff_date = datetime.now() - timedelta(days=7) for snapshot in snapshots['Snapshots']: start_time = snapshot['StartTime'].replace(tzinfo=None) if start_time &lt; cutoff_date: print(f"Deleting old snapshot {snapshot['SnapshotId']}") ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId']) backup_ec2_volumes()

Scenario 2: Monitor and Restart Failed Containers

import docker import time import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def monitor_containers(): client = docker.from_env() while True: containers = client.containers.list(all=True) for container in containers: if container.status != 'running': logger.warning(f"Container {container.name} is {container.status}") # Check if it should be running (has restart policy) if container.attrs['HostConfig']['RestartPolicy']['Name'] != 'no': logger.info(f"Attempting to restart {container.name}") try: container.restart() logger.info(f"Successfully restarted {container.name}") except Exception as e: logger.error(f"Failed to restart {container.name}: {e}") time.sleep(60) # Check every minute monitor_containers()

Scenario 3: Kubernetes Pod Scaler Based on Custom Metrics

from kubernetes import client, config import time config.load_kube_config() apps_v1 = client.AppsV1Api() v1 = client.CoreV1Api() def get_queue_depth(): # Get queue depth from external system # This is a placeholder import random return random.randint(0, 100) def scale_deployment(deployment_name, namespace, replicas): deployment = apps_v1.read_namespaced_deployment(deployment_name, namespace) deployment.spec.replicas = replicas apps_v1.patch_namespaced_deployment(deployment_name, namespace, deployment) print(f"Scaled {deployment_name} to {replicas} replicas") def autoscale_based_on_queue(): deployment_name = 'worker' namespace = 'default' while True: queue_depth = get_queue_depth() # Scale logic if queue_depth > 50: target_replicas = 10 elif queue_depth > 20: target_replicas = 5 else: target_replicas = 2 # Get current replicas deployment = apps_v1.read_namespaced_deployment(deployment_name, namespace) current_replicas = deployment.spec.replicas if current_replicas != target_replicas: scale_deployment(deployment_name, namespace, target_replicas) time.sleep(30) autoscale_based_on_queue()

Total Examples: 60+ Python DevOps scenarios

Last updated on