Python for DevOps Cheatsheet
Table of Contents
- File Operations
- System & Process Management
- HTTP Requests & APIs
- JSON & YAML
- AWS Automation (boto3)
- Azure Automation
- SSH & Remote Operations
- Docker SDK
- Kubernetes Client
- Logging & Monitoring
- Task Scheduling
- Configuration Management
- DevOps Scripts
- Interview Scenarios
File Operations
1. Pathlib (Modern File Handling)
from pathlib import Path
# Create directory
Path("/tmp/mydir").mkdir(parents=True, exist_ok=True)
# Check if file exists
if Path("/etc/config.conf").exists():
print("File exists")
# Read file
content = Path("/var/log/app.log").read_text()
# Write file
Path("/tmp/output.txt").write_text("Hello DevOps")
# Iterate files
for file in Path("/var/log").glob("*.log"):
print(file.name, file.stat().st_size)
# Get file info
file_path = Path("/etc/hosts")
print(f"Size: {file_path.stat().st_size}")
print(f"Modified: {file_path.stat().st_mtime}")
# Join paths
log_file = Path("/var/log") / "application" / "app.log"2. Traditional File Operations
import os
import shutil
# Read file
with open('/etc/hosts', 'r') as f:
content = f.read()
# Write file
with open('/tmp/output.txt', 'w') as f:
f.write("Hello World\n")
# Append to file
with open('/var/log/app.log', 'a') as f:
f.write(f"{datetime.now()}: Event logged\n")
# Copy file
shutil.copy('/etc/config.conf', '/backup/config.conf.bak')
# Move file
shutil.move('/tmp/old.txt', '/archive/old.txt')
# Delete file
os.remove('/tmp/old.txt')
# Create directory
os.makedirs('/tmp/mydir/subdir', exist_ok=True)
# Remove directory
shutil.rmtree('/tmp/mydir')
# List directory
for item in os.listdir('/var/log'):
print(item)
# Walk directory tree
for root, dirs, files in os.walk('/etc'):
for file in files:
if file.endswith('.conf'):
print(os.path.join(root, file))System & Process Management
3. Subprocess (Execute Commands)
import subprocess
# Run command and get output
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)
print(result.returncode)
# Run with shell
result = subprocess.run('df -h | grep /dev/sda1', shell=True, capture_output=True, text=True)
# Check command success
try:
subprocess.run(['systemctl', 'status', 'nginx'], check=True)
print("Service is running")
except subprocess.CalledProcessError:
print("Service is not running")
# Run with timeout
try:
subprocess.run(['long-running-command'], timeout=60)
except subprocess.TimeoutExpired:
print("Command timed out")
# Pipe commands
p1 = subprocess.Popen(['cat', '/var/log/syslog'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'error'], stdin=p1.stdout, stdout=subprocess.PIPE)
output, _ = p2.communicate()
# Environment variables
env = os.environ.copy()
env['MY_VAR'] = 'value'
subprocess.run(['my-command'], env=env)4. System Information
import platform
import psutil
# Platform info
print(platform.system()) # Linux
print(platform.release()) # 5.15.0
print(platform.machine()) # x86_64
# CPU info
print(f"CPU cores: {psutil.cpu_count()}")
print(f"CPU usage: {psutil.cpu_percent(interval=1)}%")
# Memory info
mem = psutil.virtual_memory()
print(f"Total: {mem.total / (1024**3):.2f} GB")
print(f"Available: {mem.available / (1024**3):.2f} GB")
print(f"Used: {mem.percent}%")
# Disk info
disk = psutil.disk_usage('/')
print(f"Total: {disk.total / (1024**3):.2f} GB")
print(f"Used: {disk.used / (1024**3):.2f} GB")
print(f"Free: {disk.free / (1024**3):.2f} GB")
# Network stats
net = psutil.net_io_counters()
print(f"Bytes sent: {net.bytes_sent / (1024**2):.2f} MB")
print(f"Bytes recv: {net.bytes_recv / (1024**2):.2f} MB")
# Process management
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
if proc.info['cpu_percent'] > 50:
print(f"{proc.info['name']}: CPU {proc.info['cpu_percent']}%")HTTP Requests & APIs
5. Requests Library
import requests
import json
# GET request
response = requests.get('https://api.github.com/users/octocat')
print(response.status_code)
print(response.json())
# POST request
data = {'key': 'value'}
response = requests.post('https://api.example.com/data', json=data)
# Headers
headers = {
'Authorization': 'Bearer token123',
'Content-Type': 'application/json'
}
response = requests.get('https://api.example.com', headers=headers)
# Query parameters
params = {'page': 1, 'per_page': 100}
response = requests.get('https://api.example.com/items', params=params)
# Error handling
try:
response = requests.get('https://api.example.com', timeout=5)
response.raise_for_status() # Raise exception for 4xx/5xx
data = response.json()
except requests.exceptions.HTTPError as e:
print(f"HTTP error: {e}")
except requests.exceptions.ConnectionError:
print("Connection failed")
except requests.exceptions.Timeout:
print("Request timed out")
# Upload file
files = {'file': open('report.pdf', 'rb')}
response = requests.post('https://api.example.com/upload', files=files)
# Session (persist cookies)
session = requests.Session()
session.get('https://api.example.com/login')
session.post('https://api.example.com/data', json=data)JSON & YAML
6. JSON Operations
import json
# Parse JSON string
json_string = '{"name": "server1", "ip": "10.0.0.1"}'
data = json.loads(json_string)
print(data['name'])
# Convert to JSON string
data = {'servers': [{'name': 'web1', 'ip': '10.0.0.1'}]}
json_string = json.dumps(data, indent=2)
# Read JSON file
with open('config.json', 'r') as f:
config = json.load(f)
# Write JSON file
with open('output.json', 'w') as f:
json.dump(data, f, indent=2)
# Pretty print JSON
from pprint import pprint
pprint(data)7. YAML Operations
import yaml
# Read YAML file
with open('config.yaml', 'r') as f:
config = yaml.safe_load(f)
# Write YAML file
data = {
'database': {
'host': 'localhost',
'port': 5432,
'users': ['admin', 'app']
}
}
with open('output.yaml', 'w') as f:
yaml.dump(data, f, default_flow_style=False)
# Parse YAML string
yaml_string = """
servers:
- name: web1
ip: 10.0.0.1
- name: web2
ip: 10.0.0.2
"""
config = yaml.safe_load(yaml_string)AWS Automation (boto3)
8. EC2 Operations
import boto3
# Create EC2 client
ec2 = boto3.client('ec2', region_name='us-east-1')
# List instances
response = ec2.describe_instances()
for reservation in response['Reservations']:
for instance in reservation['Instances']:
print(f"{instance['InstanceId']}: {instance['State']['Name']}")
# Start instance
ec2.start_instances(InstanceIds=['i-1234567890abcdef0'])
# Stop instance
ec2.stop_instances(InstanceIds=['i-1234567890abcdef0'])
# Create instance
response = ec2.run_instances(
ImageId='ami-0c55b159cbfafe1f0',
InstanceType='t2.micro',
KeyName='MyKeyPair',
MinCount=1,
MaxCount=1,
SecurityGroupIds=['sg-12345678'],
SubnetId='subnet-12345678',
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [{'Key': 'Name', 'Value': 'MyServer'}]
}]
)
# Using resource interface (higher level)
ec2_resource = boto3.resource('ec2')
instances = ec2_resource.instances.filter(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
)
for instance in instances:
print(f"{instance.id}: {instance.instance_type}")9. S3 Operations
# Create S3 client
s3 = boto3.client('s3')
# List buckets
response = s3.list_buckets()
for bucket in response['Buckets']:
print(bucket['Name'])
# Upload file
s3.upload_file('local.txt', 'mybucket', 'remote.txt')
# Download file
s3.download_file('mybucket', 'remote.txt', 'local.txt')
# List objects
response = s3.list_objects_v2(Bucket='mybucket', Prefix='logs/')
for obj in response.get('Contents', []):
print(obj['Key'])
# Delete object
s3.delete_object(Bucket='mybucket', Key='old-file.txt')
# Using resource interface
s3_resource = boto3.resource('s3')
bucket = s3_resource.Bucket('mybucket')
# Upload with metadata
bucket.upload_file(
'local.txt',
'remote.txt',
ExtraArgs={'Metadata': {'author': 'devops-team'}}
)
# Iterate objects
for obj in bucket.objects.filter(Prefix='logs/2024/'):
print(f"{obj.key}: {obj.size} bytes")10. Lambda Operations
# Create Lambda client
lambda_client = boto3.client('lambda')
# Invoke function
response = lambda_client.invoke(
FunctionName='my-function',
InvocationType='RequestResponse',
Payload=json.dumps({'key': 'value'})
)
result = json.loads(response['Payload'].read())
# List functions
response = lambda_client.list_functions()
for func in response['Functions']:
print(f"{func['FunctionName']}: {func['Runtime']}")
# Update function code
with open('function.zip', 'rb') as f:
lambda_client.update_function_code(
FunctionName='my-function',
ZipFile=f.read()
)Azure Automation
11. Azure SDK
from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.storage import StorageManagementClient
# Authenticate
credential = DefaultAzureCredential()
subscription_id = 'your-subscription-id'
# Create compute client
compute_client = ComputeManagementClient(credential, subscription_id)
# List VMs
resource_group = 'myResourceGroup'
vms = compute_client.virtual_machines.list(resource_group)
for vm in vms:
print(f"{vm.name}: {vm.hardware_profile.vm_size}")
# Start VM
async_vm_start = compute_client.virtual_machines.begin_start(
resource_group,
'myVM'
)
async_vm_start.wait()
# Stop VM
async_vm_stop = compute_client.virtual_machines.begin_deallocate(
resource_group,
'myVM'
)
async_vm_stop.wait()
# Storage operations
storage_client = StorageManagementClient(credential, subscription_id)
# List storage accounts
accounts = storage_client.storage_accounts.list()
for account in accounts:
print(account.name)SSH & Remote Operations
12. Paramiko (SSH Client)
import paramiko
# Connect to server
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('10.0.0.1', username='admin', password='password')
# Execute command
stdin, stdout, stderr = ssh.exec_command('uptime')
print(stdout.read().decode())
# Multiple commands
commands = [
'sudo systemctl status nginx',
'df -h',
'free -h'
]
for cmd in commands:
stdin, stdout, stderr = ssh.exec_command(cmd)
print(f"\n{cmd}:")
print(stdout.read().decode())
ssh.close()
# Using SSH key
key = paramiko.RSAKey.from_private_key_file('/home/user/.ssh/id_rsa')
ssh.connect('10.0.0.1', username='admin', pkey=key)
# SFTP file transfer
sftp = ssh.open_sftp()
sftp.put('local.txt', '/remote/path/file.txt')
sftp.get('/remote/path/file.txt', 'local.txt')
sftp.close()Docker SDK
13. Docker Operations
import docker
# Create Docker client
client = docker.from_env()
# List containers
for container in client.containers.list():
print(f"{container.name}: {container.status}")
# List all containers (including stopped)
for container in client.containers.list(all=True):
print(container.name)
# Run container
container = client.containers.run(
'nginx:latest',
name='my-nginx',
ports={'80/tcp': 8080},
detach=True,
environment={'ENV': 'production'}
)
# Stop container
container.stop()
# Remove container
container.remove()
# Execute command in container
result = container.exec_run('nginx -t')
print(result.output.decode())
# Get logs
logs = container.logs(tail=100)
print(logs.decode())
# List images
for image in client.images.list():
print(image.tags)
# Build image
image, logs = client.images.build(
path='.',
tag='myapp:latest',
rm=True
)
for line in logs:
print(line)
# Pull image
image = client.images.pull('nginx:alpine')
# Push image
client.images.push('myregistry/myapp:latest')Kubernetes Client
14. Kubernetes Python Client
from kubernetes import client, config
# Load kubeconfig
config.load_kube_config()
# Create API client
v1 = client.CoreV1Api()
# List pods
pods = v1.list_pod_for_all_namespaces()
for pod in pods.items:
print(f"{pod.metadata.namespace}/{pod.metadata.name}: {pod.status.phase}")
# List pods in namespace
pods = v1.list_namespaced_pod('default')
for pod in pods.items:
print(pod.metadata.name)
# Create deployment
apps_v1 = client.AppsV1Api()
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(name='nginx-deployment'),
spec=client.V1DeploymentSpec(
replicas=3,
selector=client.V1LabelSelector(
match_labels={'app': 'nginx'}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'app': 'nginx'}),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name='nginx',
image='nginx:latest',
ports=[client.V1ContainerPort(container_port=80)]
)
]
)
)
)
)
apps_v1.create_namespaced_deployment('default', deployment)
# Delete deployment
apps_v1.delete_namespaced_deployment('nginx-deployment', 'default')
# Get pod logs
logs = v1.read_namespaced_pod_log('pod-name', 'default', tail_lines=100)
print(logs)
# Execute command in pod
from kubernetes.stream import stream
exec_command = ['sh', '-c', 'echo Hello from pod']
resp = stream(
v1.connect_get_namespaced_pod_exec,
'pod-name',
'default',
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False
)
print(resp)Logging & Monitoring
15. Logging
import logging
from logging.handlers import RotatingFileHandler
# Basic configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.info("Application started")
logger.warning("Warning message")
logger.error("Error occurred")
# File logging with rotation
handler = RotatingFileHandler(
'/var/log/myapp.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)
# Multiple handlers
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('/var/log/app.log')
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# Structured logging (JSON)
import json
class JsonFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module
}
return json.dumps(log_data)
handler.setFormatter(JsonFormatter())16. Prometheus Metrics
from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time
# Create metrics
requests_total = Counter('requests_total', 'Total requests')
active_connections = Gauge('active_connections', 'Active connections')
request_duration = Histogram('request_duration_seconds', 'Request duration')
# Update metrics
requests_total.inc()
active_connections.set(42)
with request_duration.time():
# Your code here
time.sleep(1)
# Start metrics server
start_http_server(8000)
# Metrics available at http://localhost:8000/metricsTask Scheduling
17. Schedule Library
import schedule
import time
def backup_databases():
print("Running database backup...")
# Backup logic here
def cleanup_logs():
print("Cleaning up old logs...")
# Cleanup logic here
# Schedule tasks
schedule.every().day.at("02:00").do(backup_databases)
schedule.every().hour.do(cleanup_logs)
schedule.every(10).minutes.do(lambda: print("Health check"))
# Run scheduler
while True:
schedule.run_pending()
time.sleep(60)18. APScheduler (Advanced)
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', minutes=5)
def check_health():
print("Health check...")
@scheduler.scheduled_job(CronTrigger(hour=2, minute=0))
def nightly_backup():
print("Running backup...")
scheduler.start()Configuration Management
19. ConfigParser (INI Files)
import configparser
# Read config
config = configparser.ConfigParser()
config.read('/etc/myapp/config.ini')
db_host = config['database']['host']
db_port = config.getint('database', 'port')
# Write config
config['database'] = {
'host': 'localhost',
'port': '5432',
'user': 'admin'
}
with open('config.ini', 'w') as f:
config.write(f)20. Environment Variables
import os
from dotenv import load_dotenv
# Load from .env file
load_dotenv()
# Get environment variables
db_host = os.getenv('DB_HOST', 'localhost')
db_port = int(os.getenv('DB_PORT', 5432))
api_key = os.environ['API_KEY'] # Raises error if not found
# Set environment variable
os.environ['MY_VAR'] = 'value'DevOps Scripts
21. Health Check Script
#!/usr/bin/env python3
import requests
import sys
def check_service(url):
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
print(f"✓ {url} is healthy")
return True
else:
print(f"✗ {url} returned {response.status_code}")
return False
except Exception as e:
print(f"✗ {url} failed: {str(e)}")
return False
services = [
'http://localhost:8080/health',
'http://localhost:9090/metrics',
'http://database:5432'
]
results = [check_service(url) for url in services]
if not all(results):
sys.exit(1)22. Log Analyzer
#!/usr/bin/env python3
from collections import Counter
import re
def analyze_logs(log_file):
error_count = 0
status_codes = Counter()
ip_addresses = Counter()
with open(log_file, 'r') as f:
for line in f:
if 'ERROR' in line:
error_count += 1
# Parse nginx logs
match = re.search(r'(\d+\.\d+\.\d+\.\d+).*"[A-Z]+ .* HTTP/\d\.\d" (\d+)', line)
if match:
ip = match.group(1)
status = match.group(2)
ip_addresses[ip] += 1
status_codes[status] += 1
print(f"Total errors: {error_count}")
print("\nTop 10 IPs:")
for ip, count in ip_addresses.most_common(10):
print(f" {ip}: {count}")
print("\nStatus codes:")
for status, count in status_codes.most_common():
print(f" {status}: {count}")
analyze_logs('/var/log/nginx/access.log')23. Deployment Script
#!/usr/bin/env python3
import subprocess
import sys
import time
def run_command(cmd, description):
print(f"\n>>> {description}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
if result.returncode != 0:
print(f"ERROR: {result.stderr}")
sys.exit(1)
return result.stdout
def deploy_application(version):
# Pull latest code
run_command('git pull origin main', 'Pulling latest code')
# Build Docker image
run_command(
f'docker build -t myapp:{version} .',
f'Building Docker image version {version}'
)
# Run tests
run_command(
f'docker run --rm myapp:{version} pytest',
'Running tests'
)
# Stop old container
run_command(
'docker stop myapp || true',
'Stopping old container'
)
# Start new container
run_command(
f'docker run -d --name myapp -p 8080:8080 myapp:{version}',
'Starting new container'
)
# Health check
time.sleep(5)
try:
import requests
response = requests.get('http://localhost:8080/health')
if response.status_code == 200:
print("\n✓ Deployment successful!")
else:
print("\n✗ Health check failed!")
sys.exit(1)
except Exception as e:
print(f"\n✗ Deployment failed: {e}")
sys.exit(1)
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: deploy.py <version>")
sys.exit(1)
deploy_application(sys.argv[1])Interview Scenarios
Scenario 1: Automated EC2 Snapshot Backup
import boto3
from datetime import datetime, timedelta
def backup_ec2_volumes():
ec2 = boto3.client('ec2')
# Get volumes with Backup tag
volumes = ec2.describe_volumes(
Filters=[{'Name': 'tag:Backup', 'Values': ['true']}]
)
for volume in volumes['Volumes']:
volume_id = volume['VolumeId']
# Create snapshot
snapshot = ec2.create_snapshot(
VolumeId=volume_id,
Description=f'Automated backup {datetime.now().isoformat()}'
)
# Tag snapshot
ec2.create_tags(
Resources=[snapshot['SnapshotId']],
Tags=[
{'Key': 'AutomatedBackup', 'Value': 'true'},
{'Key': 'VolumeId', 'Value': volume_id}
]
)
print(f"Created snapshot {snapshot['SnapshotId']} for {volume_id}")
# Delete snapshots older than 7 days
snapshots = ec2.describe_snapshots(
OwnerIds=['self'],
Filters=[{'Name': 'tag:AutomatedBackup', 'Values': ['true']}]
)
cutoff_date = datetime.now() - timedelta(days=7)
for snapshot in snapshots['Snapshots']:
start_time = snapshot['StartTime'].replace(tzinfo=None)
if start_time < cutoff_date:
print(f"Deleting old snapshot {snapshot['SnapshotId']}")
ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
backup_ec2_volumes()Scenario 2: Monitor and Restart Failed Containers
import docker
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def monitor_containers():
client = docker.from_env()
while True:
containers = client.containers.list(all=True)
for container in containers:
if container.status != 'running':
logger.warning(f"Container {container.name} is {container.status}")
# Check if it should be running (has restart policy)
if container.attrs['HostConfig']['RestartPolicy']['Name'] != 'no':
logger.info(f"Attempting to restart {container.name}")
try:
container.restart()
logger.info(f"Successfully restarted {container.name}")
except Exception as e:
logger.error(f"Failed to restart {container.name}: {e}")
time.sleep(60) # Check every minute
monitor_containers()Scenario 3: Kubernetes Pod Scaler Based on Custom Metrics
from kubernetes import client, config
import time
config.load_kube_config()
apps_v1 = client.AppsV1Api()
v1 = client.CoreV1Api()
def get_queue_depth():
# Get queue depth from external system
# This is a placeholder
import random
return random.randint(0, 100)
def scale_deployment(deployment_name, namespace, replicas):
deployment = apps_v1.read_namespaced_deployment(deployment_name, namespace)
deployment.spec.replicas = replicas
apps_v1.patch_namespaced_deployment(deployment_name, namespace, deployment)
print(f"Scaled {deployment_name} to {replicas} replicas")
def autoscale_based_on_queue():
deployment_name = 'worker'
namespace = 'default'
while True:
queue_depth = get_queue_depth()
# Scale logic
if queue_depth > 50:
target_replicas = 10
elif queue_depth > 20:
target_replicas = 5
else:
target_replicas = 2
# Get current replicas
deployment = apps_v1.read_namespaced_deployment(deployment_name, namespace)
current_replicas = deployment.spec.replicas
if current_replicas != target_replicas:
scale_deployment(deployment_name, namespace, target_replicas)
time.sleep(30)
autoscale_based_on_queue()Total Examples: 60+ Python DevOps scenarios
Last updated on