Skip to content

Scripting and Automation for Kubernetes/OpenShift

1. Shell Scripting Essentials

Kubernetes Deployment Script

#!/bin/bash
set -euo pipefail  # Exit on error, undefined vars, pipe failures

# Configuration
NAMESPACE="${NAMESPACE:-production}"
DEPLOYMENT_NAME="${DEPLOYMENT_NAME:-myapp}"
IMAGE="${IMAGE:-myapp:latest}"
REPLICAS="${REPLICAS:-3}"
TIMEOUT="${TIMEOUT:-300}"

# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

# Logging functions
log_info() {
    echo -e "${GREEN}[INFO]${NC} $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# Check prerequisites
check_prerequisites() {
    log_info "Checking prerequisites..."

    if ! command -v kubectl &> /dev/null; then
        log_error "kubectl not found. Please install kubectl."
        exit 1
    fi

    if ! kubectl cluster-info &> /dev/null; then
        log_error "Cannot connect to Kubernetes cluster."
        exit 1
    fi

    log_info "Prerequisites check passed."
}

# Create namespace if not exists
ensure_namespace() {
    log_info "Ensuring namespace ${NAMESPACE} exists..."

    if kubectl get namespace "${NAMESPACE}" &> /dev/null; then
        log_info "Namespace ${NAMESPACE} already exists."
    else
        kubectl create namespace "${NAMESPACE}"
        log_info "Namespace ${NAMESPACE} created."
    fi
}

# Deploy application
deploy_application() {
    log_info "Deploying ${DEPLOYMENT_NAME} to ${NAMESPACE}..."

    # Check if deployment exists
    if kubectl get deployment "${DEPLOYMENT_NAME}" -n "${NAMESPACE}" &> /dev/null; then
        log_info "Updating existing deployment..."
        kubectl set image deployment/"${DEPLOYMENT_NAME}" \
            "${DEPLOYMENT_NAME}=${IMAGE}" \
            -n "${NAMESPACE}"
    else
        log_info "Creating new deployment..."
        kubectl create deployment "${DEPLOYMENT_NAME}" \
            --image="${IMAGE}" \
            --replicas="${REPLICAS}" \
            -n "${NAMESPACE}"
    fi
}

# Wait for rollout
wait_for_rollout() {
    log_info "Waiting for rollout to complete (timeout: ${TIMEOUT}s)..."

    if kubectl rollout status deployment/"${DEPLOYMENT_NAME}" \
        -n "${NAMESPACE}" \
        --timeout="${TIMEOUT}s"; then
        log_info "Rollout completed successfully."
    else
        log_error "Rollout failed or timed out."
        return 1
    fi
}

# Verify deployment
verify_deployment() {
    log_info "Verifying deployment..."

    local ready_replicas=$(kubectl get deployment "${DEPLOYMENT_NAME}" \
        -n "${NAMESPACE}" \
        -o jsonpath='{.status.readyReplicas}')

    local desired_replicas=$(kubectl get deployment "${DEPLOYMENT_NAME}" \
        -n "${NAMESPACE}" \
        -o jsonpath='{.spec.replicas}')

    if [ "${ready_replicas}" -eq "${desired_replicas}" ]; then
        log_info "All ${ready_replicas} replicas are ready."
        return 0
    else
        log_error "Only ${ready_replicas}/${desired_replicas} replicas are ready."
        return 1
    fi
}

# Rollback on failure
rollback() {
    log_warn "Rolling back deployment..."
    kubectl rollout undo deployment/"${DEPLOYMENT_NAME}" -n "${NAMESPACE}"
    log_info "Rollback initiated."
}

# Main execution
main() {
    log_info "Starting deployment process..."

    check_prerequisites
    ensure_namespace
    deploy_application

    if wait_for_rollout && verify_deployment; then
        log_info "Deployment successful!"
        exit 0
    else
        log_error "Deployment failed!"
        rollback
        exit 1
    fi
}

# Trap errors and cleanup
trap 'log_error "Script failed at line $LINENO"' ERR

# Run main function
main "$@"

Backup Script for etcd

#!/bin/bash
set -euo pipefail

# Configuration
BACKUP_DIR="${BACKUP_DIR:-/var/backups/etcd}"
RETENTION_DAYS="${RETENTION_DAYS:-7}"
ETCD_ENDPOINTS="${ETCD_ENDPOINTS:-https://127.0.0.1:2379}"
ETCD_CACERT="${ETCD_CACERT:-/etc/kubernetes/pki/etcd/ca.crt}"
ETCD_CERT="${ETCD_CERT:-/etc/kubernetes/pki/etcd/server.crt}"
ETCD_KEY="${ETCD_KEY:-/etc/kubernetes/pki/etcd/server.key}"

# Create backup directory
mkdir -p "${BACKUP_DIR}"

# Generate backup filename with timestamp
BACKUP_FILE="${BACKUP_DIR}/etcd-backup-$(date +%Y%m%d-%H%M%S).db"

# Perform backup
echo "Starting etcd backup..."
ETCDCTL_API=3 etcdctl snapshot save "${BACKUP_FILE}" \
    --endpoints="${ETCD_ENDPOINTS}" \
    --cacert="${ETCD_CACERT}" \
    --cert="${ETCD_CERT}" \
    --key="${ETCD_KEY}"

# Verify backup
echo "Verifying backup..."
ETCDCTL_API=3 etcdctl snapshot status "${BACKUP_FILE}" \
    --write-out=table

# Compress backup
echo "Compressing backup..."
gzip "${BACKUP_FILE}"

# Remove old backups
echo "Removing backups older than ${RETENTION_DAYS} days..."
find "${BACKUP_DIR}" -name "etcd-backup-*.db.gz" -mtime +${RETENTION_DAYS} -delete

# Upload to S3 (optional)
if [ -n "${AWS_S3_BUCKET:-}" ]; then
    echo "Uploading to S3..."
    aws s3 cp "${BACKUP_FILE}.gz" "s3://${AWS_S3_BUCKET}/etcd-backups/"
fi

echo "Backup completed successfully: ${BACKUP_FILE}.gz"

Health Check Script

#!/bin/bash

# Health check for Kubernetes cluster
check_cluster_health() {
    echo "=== Cluster Health Check ==="

    # Check API server
    echo -n "API Server: "
    if kubectl cluster-info &> /dev/null; then
        echo "✓ Healthy"
    else
        echo "✗ Unhealthy"
        return 1
    fi

    # Check nodes
    echo -n "Nodes: "
    local not_ready=$(kubectl get nodes --no-headers | grep -v " Ready" | wc -l)
    if [ "$not_ready" -eq 0 ]; then
        echo "✓ All nodes ready"
    else
        echo "✗ ${not_ready} nodes not ready"
        kubectl get nodes
    fi

    # Check system pods
    echo -n "System Pods: "
    local failing_pods=$(kubectl get pods -n kube-system --no-headers | \
        grep -v "Running\|Completed" | wc -l)
    if [ "$failing_pods" -eq 0 ]; then
        echo "✓ All system pods running"
    else
        echo "✗ ${failing_pods} pods not running"
        kubectl get pods -n kube-system | grep -v "Running\|Completed"
    fi

    # Check component status
    echo "Component Status:"
    kubectl get componentstatuses

    # Check resource usage
    echo -e "\nResource Usage:"
    kubectl top nodes 2>/dev/null || echo "Metrics server not available"
}

check_cluster_health

2. Python Automation Scripts

Kubernetes Resource Manager

#!/usr/bin/env python3
"""
Kubernetes Resource Manager
Automates common Kubernetes operations
"""

import sys
import argparse
import logging
from kubernetes import client, config
from kubernetes.client.rest import ApiException

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class KubernetesManager:
    """Manage Kubernetes resources"""

    def __init__(self, namespace='default'):
        """Initialize Kubernetes client"""
        try:
            config.load_kube_config()
        except:
            config.load_incluster_config()

        self.namespace = namespace
        self.apps_v1 = client.AppsV1Api()
        self.core_v1 = client.CoreV1Api()
        self.batch_v1 = client.BatchV1Api()

    def list_deployments(self):
        """List all deployments in namespace"""
        try:
            deployments = self.apps_v1.list_namespaced_deployment(
                namespace=self.namespace
            )

            logger.info(f"Deployments in {self.namespace}:")
            for dep in deployments.items:
                replicas = dep.status.replicas or 0
                ready = dep.status.ready_replicas or 0
                logger.info(f"  {dep.metadata.name}: {ready}/{replicas} ready")

            return deployments.items
        except ApiException as e:
            logger.error(f"Error listing deployments: {e}")
            return []

    def scale_deployment(self, name, replicas):
        """Scale a deployment"""
        try:
            # Get current deployment
            deployment = self.apps_v1.read_namespaced_deployment(
                name=name,
                namespace=self.namespace
            )

            # Update replicas
            deployment.spec.replicas = replicas

            # Patch deployment
            self.apps_v1.patch_namespaced_deployment(
                name=name,
                namespace=self.namespace,
                body=deployment
            )

            logger.info(f"Scaled {name} to {replicas} replicas")
            return True
        except ApiException as e:
            logger.error(f"Error scaling deployment: {e}")
            return False

    def restart_deployment(self, name):
        """Restart a deployment by updating annotation"""
        try:
            from datetime import datetime

            # Get current deployment
            deployment = self.apps_v1.read_namespaced_deployment(
                name=name,
                namespace=self.namespace
            )

            # Add restart annotation
            if deployment.spec.template.metadata.annotations is None:
                deployment.spec.template.metadata.annotations = {}

            deployment.spec.template.metadata.annotations[
                'kubectl.kubernetes.io/restartedAt'
            ] = datetime.utcnow().isoformat()

            # Patch deployment
            self.apps_v1.patch_namespaced_deployment(
                name=name,
                namespace=self.namespace,
                body=deployment
            )

            logger.info(f"Restarted deployment {name}")
            return True
        except ApiException as e:
            logger.error(f"Error restarting deployment: {e}")
            return False

    def get_pod_logs(self, pod_name, container=None, tail_lines=100):
        """Get logs from a pod"""
        try:
            logs = self.core_v1.read_namespaced_pod_log(
                name=pod_name,
                namespace=self.namespace,
                container=container,
                tail_lines=tail_lines
            )
            return logs
        except ApiException as e:
            logger.error(f"Error getting pod logs: {e}")
            return None

    def delete_failed_pods(self):
        """Delete all failed pods in namespace"""
        try:
            pods = self.core_v1.list_namespaced_pod(
                namespace=self.namespace
            )

            deleted_count = 0
            for pod in pods.items:
                if pod.status.phase in ['Failed', 'Unknown']:
                    logger.info(f"Deleting failed pod: {pod.metadata.name}")
                    self.core_v1.delete_namespaced_pod(
                        name=pod.metadata.name,
                        namespace=self.namespace
                    )
                    deleted_count += 1

            logger.info(f"Deleted {deleted_count} failed pods")
            return deleted_count
        except ApiException as e:
            logger.error(f"Error deleting failed pods: {e}")
            return 0

    def create_configmap(self, name, data):
        """Create a ConfigMap"""
        try:
            configmap = client.V1ConfigMap(
                metadata=client.V1ObjectMeta(name=name),
                data=data
            )

            self.core_v1.create_namespaced_config_map(
                namespace=self.namespace,
                body=configmap
            )

            logger.info(f"Created ConfigMap: {name}")
            return True
        except ApiException as e:
            logger.error(f"Error creating ConfigMap: {e}")
            return False

    def run_job(self, name, image, command):
        """Run a one-time job"""
        try:
            job = client.V1Job(
                metadata=client.V1ObjectMeta(name=name),
                spec=client.V1JobSpec(
                    template=client.V1PodTemplateSpec(
                        spec=client.V1PodSpec(
                            containers=[
                                client.V1Container(
                                    name=name,
                                    image=image,
                                    command=command
                                )
                            ],
                            restart_policy='Never'
                        )
                    ),
                    backoff_limit=3
                )
            )

            self.batch_v1.create_namespaced_job(
                namespace=self.namespace,
                body=job
            )

            logger.info(f"Created job: {name}")
            return True
        except ApiException as e:
            logger.error(f"Error creating job: {e}")
            return False


def main():
    """Main function"""
    parser = argparse.ArgumentParser(description='Kubernetes Resource Manager')
    parser.add_argument('-n', '--namespace', default='default',
                       help='Kubernetes namespace')
    parser.add_argument('action', choices=[
        'list', 'scale', 'restart', 'logs', 'cleanup', 'job'
    ], help='Action to perform')
    parser.add_argument('--name', help='Resource name')
    parser.add_argument('--replicas', type=int, help='Number of replicas')
    parser.add_argument('--image', help='Container image')
    parser.add_argument('--command', nargs='+', help='Command to run')

    args = parser.parse_args()

    manager = KubernetesManager(namespace=args.namespace)

    if args.action == 'list':
        manager.list_deployments()
    elif args.action == 'scale':
        if not args.name or args.replicas is None:
            logger.error("--name and --replicas required for scale")
            sys.exit(1)
        manager.scale_deployment(args.name, args.replicas)
    elif args.action == 'restart':
        if not args.name:
            logger.error("--name required for restart")
            sys.exit(1)
        manager.restart_deployment(args.name)
    elif args.action == 'logs':
        if not args.name:
            logger.error("--name required for logs")
            sys.exit(1)
        logs = manager.get_pod_logs(args.name)
        if logs:
            print(logs)
    elif args.action == 'cleanup':
        manager.delete_failed_pods()
    elif args.action == 'job':
        if not all([args.name, args.image, args.command]):
            logger.error("--name, --image, and --command required for job")
            sys.exit(1)
        manager.run_job(args.name, args.image, args.command)


if __name__ == '__main__':
    main()

Resource Monitoring Script

#!/usr/bin/env python3
"""
Monitor Kubernetes resource usage and send alerts
"""

import time
import smtplib
from email.mime.text import MIMEText
from kubernetes import client, config

config.load_kube_config()

# Thresholds
CPU_THRESHOLD = 80  # percent
MEMORY_THRESHOLD = 85  # percent
DISK_THRESHOLD = 90  # percent


def get_node_metrics():
    """Get node resource metrics"""
    custom_api = client.CustomObjectsApi()

    try:
        metrics = custom_api.list_cluster_custom_object(
            group="metrics.k8s.io",
            version="v1beta1",
            plural="nodes"
        )
        return metrics['items']
    except Exception as e:
        print(f"Error getting metrics: {e}")
        return []


def check_resource_usage():
    """Check resource usage and return alerts"""
    alerts = []

    metrics = get_node_metrics()
    core_v1 = client.CoreV1Api()

    for metric in metrics:
        node_name = metric['metadata']['name']

        # Get node capacity
        node = core_v1.read_node(node_name)
        capacity = node.status.capacity

        # Calculate usage percentages
        cpu_usage = int(metric['usage']['cpu'].rstrip('n')) / 1000000000
        cpu_capacity = float(capacity['cpu'])
        cpu_percent = (cpu_usage / cpu_capacity) * 100

        memory_usage = int(metric['usage']['memory'].rstrip('Ki')) / 1024
        memory_capacity = int(capacity['memory'].rstrip('Ki')) / 1024
        memory_percent = (memory_usage / memory_capacity) * 100

        # Check thresholds
        if cpu_percent > CPU_THRESHOLD:
            alerts.append(
                f"Node {node_name}: CPU usage {cpu_percent:.1f}% "
                f"exceeds threshold {CPU_THRESHOLD}%"
            )

        if memory_percent > MEMORY_THRESHOLD:
            alerts.append(
                f"Node {node_name}: Memory usage {memory_percent:.1f}% "
                f"exceeds threshold {MEMORY_THRESHOLD}%"
            )

    return alerts


def send_alert(alerts):
    """Send alert email"""
    if not alerts:
        return

    msg = MIMEText('\n'.join(alerts))
    msg['Subject'] = 'Kubernetes Resource Alert'
    msg['From'] = 'alerts@example.com'
    msg['To'] = 'admin@example.com'

    try:
        with smtplib.SMTP('localhost') as server:
            server.send_message(msg)
        print("Alert sent successfully")
    except Exception as e:
        print(f"Error sending alert: {e}")


def main():
    """Main monitoring loop"""
    print("Starting resource monitoring...")

    while True:
        alerts = check_resource_usage()
        if alerts:
            print(f"Found {len(alerts)} alerts:")
            for alert in alerts:
                print(f"  - {alert}")
            send_alert(alerts)
        else:
            print("All resources within thresholds")

        time.sleep(300)  # Check every 5 minutes


if __name__ == '__main__':
    main()

3. Ansible Playbooks

Deploy Application Playbook

---
- name: Deploy Application to Kubernetes
  hosts: localhost
  gather_facts: no

  vars:
    namespace: production
    app_name: myapp
    image: myapp:latest
    replicas: 3

  tasks:
    - name: Ensure namespace exists
      kubernetes.core.k8s:
        state: present
        definition:
          apiVersion: v1
          kind: Namespace
          metadata:
            name: "{{ namespace }}"

    - name: Create ConfigMap
      kubernetes.core.k8s:
        state: present
        definition:
          apiVersion: v1
          kind: ConfigMap
          metadata:
            name: "{{ app_name }}-config"
            namespace: "{{ namespace }}"
          data:
            DATABASE_URL: "postgres://db:5432/mydb"
            LOG_LEVEL: "info"

    - name: Create Secret
      kubernetes.core.k8s:
        state: present
        definition:
          apiVersion: v1
          kind: Secret
          metadata:
            name: "{{ app_name }}-secret"
            namespace: "{{ namespace }}"
          type: Opaque
          stringData:
            DB_PASSWORD: "{{ db_password }}"

    - name: Deploy application
      kubernetes.core.k8s:
        state: present
        definition:
          apiVersion: apps/v1
          kind: Deployment
          metadata:
            name: "{{ app_name }}"
            namespace: "{{ namespace }}"
          spec:
            replicas: "{{ replicas }}"
            selector:
              matchLabels:
                app: "{{ app_name }}"
            template:
              metadata:
                labels:
                  app: "{{ app_name }}"
              spec:
                containers:
                - name: "{{ app_name }}"
                  image: "{{ image }}"
                  ports:
                  - containerPort: 8080
                  envFrom:
                  - configMapRef:
                      name: "{{ app_name }}-config"
                  - secretRef:
                      name: "{{ app_name }}-secret"
                  resources:
                    requests:
                      memory: "256Mi"
                      cpu: "250m"
                    limits:
                      memory: "512Mi"
                      cpu: "500m"
                  livenessProbe:
                    httpGet:
                      path: /health
                      port: 8080
                    initialDelaySeconds: 30
                    periodSeconds: 10
                  readinessProbe:
                    httpGet:
                      path: /ready
                      port: 8080
                    initialDelaySeconds: 5
                    periodSeconds: 5

    - name: Create Service
      kubernetes.core.k8s:
        state: present
        definition:
          apiVersion: v1
          kind: Service
          metadata:
            name: "{{ app_name }}-service"
            namespace: "{{ namespace }}"
          spec:
            selector:
              app: "{{ app_name }}"
            ports:
            - protocol: TCP
              port: 80
              targetPort: 8080
            type: LoadBalancer

    - name: Wait for deployment to be ready
      kubernetes.core.k8s_info:
        kind: Deployment
        name: "{{ app_name }}"
        namespace: "{{ namespace }}"
      register: deployment
      until: deployment.resources[0].status.readyReplicas == replicas
      retries: 30
      delay: 10

    - name: Get service endpoint
      kubernetes.core.k8s_info:
        kind: Service
        name: "{{ app_name }}-service"
        namespace: "{{ namespace }}"
      register: service

    - name: Display service endpoint
      debug:
        msg: "Application deployed at: {{ service.resources[0].status.loadBalancer.ingress[0].ip }}"

Cluster Maintenance Playbook

---
- name: Kubernetes Cluster Maintenance
  hosts: k8s_masters
  become: yes

  tasks:
    - name: Drain node for maintenance
      command: kubectl drain {{ inventory_hostname }} --ignore-daemonsets --delete-emptydir-data
      delegate_to: localhost
      when: maintenance_mode | default(false)

    - name: Update system packages
      apt:
        update_cache: yes
        upgrade: dist
      when: ansible_os_family == "Debian"

    - name: Restart kubelet
      systemd:
        name: kubelet
        state: restarted
        daemon_reload: yes

    - name: Wait for node to be ready
      command: kubectl wait --for=condition=Ready node/{{ inventory_hostname }} --timeout=300s
      delegate_to: localhost

    - name: Uncordon node
      command: kubectl uncordon {{ inventory_hostname }}
      delegate_to: localhost
      when: maintenance_mode | default(false)

    - name: Verify node status
      command: kubectl get node {{ inventory_hostname }}
      delegate_to: localhost
      register: node_status

    - name: Display node status
      debug:
        var: node_status.stdout_lines

4. Perl Scripts (Legacy Systems)

Log Parser

#!/usr/bin/perl
use strict;
use warnings;
use JSON;

# Parse Kubernetes pod logs for errors
my $namespace = $ARGV[0] || 'default';
my $error_count = 0;
my %error_types;

# Get all pods in namespace
my @pods = `kubectl get pods -n $namespace -o json`;
my $pods_json = decode_json(join('', @pods));

foreach my $pod (@{$pods_json->{items}}) {
    my $pod_name = $pod->{metadata}{name};
    print "Checking pod: $pod_name\n";

    # Get pod logs
    my @logs = `kubectl logs $pod_name -n $namespace --tail=1000 2>/dev/null`;

    foreach my $line (@logs) {
        if ($line =~ /(ERROR|FATAL|Exception|failed)/i) {
            $error_count++;
            my $error_type = $1;
            $error_types{$error_type}++;
            print "  Found error: $line";
        }
    }
}

print "\n=== Summary ===\n";
print "Total errors found: $error_count\n";
print "Error types:\n";
foreach my $type (sort keys %error_types) {
    print "  $type: $error_types{$type}\n";
}

5. Configuration Management

Ansible Inventory

[k8s_masters]
master1 ansible_host=192.168.1.10
master2 ansible_host=192.168.1.11
master3 ansible_host=192.168.1.12

[k8s_workers]
worker1 ansible_host=192.168.1.20
worker2 ansible_host=192.168.1.21
worker3 ansible_host=192.168.1.22

[k8s_cluster:children]
k8s_masters
k8s_workers

[k8s_cluster:vars]
ansible_user=ubuntu
ansible_ssh_private_key_file=~/.ssh/id_rsa
kubernetes_version=1.28.0

Ansible Role Structure

roles/
└── kubernetes/
    ├── tasks/
    │   ├── main.yml
    │   ├── install.yml
    │   ├── configure.yml
    │   └── deploy.yml
    ├── templates/
    │   ├── deployment.yml.j2
    │   ├── service.yml.j2
    │   └── configmap.yml.j2
    ├── vars/
    │   └── main.yml
    ├── defaults/
    │   └── main.yml
    └── handlers/
        └── main.yml

6. Interview Preparation - Scripting & Automation

Key Talking Points

1. Scripting Best Practices - Error handling (set -e, try-catch) - Logging and debugging - Idempotency - Input validation - Documentation - Testing

2. Automation Tools Comparison - Shell: Quick tasks, simple automation - Python: Complex logic, API interactions - Ansible: Configuration management, orchestration - Terraform: Infrastructure as Code

3. Common Automation Tasks - Deployment automation - Backup and restore - Health checks and monitoring - Resource cleanup - Certificate rotation - Log aggregation

Common Interview Questions

Q: How do you ensure script idempotency? A: 1. Check current state before making changes 2. Use declarative approaches 3. Implement proper error handling 4. Use tools like Ansible that are idempotent by design 5. Test scripts multiple times

Q: Explain your approach to error handling in scripts. A: 1. Use set -euo pipefail in bash 2. Implement try-catch blocks 3. Log errors with context 4. Provide meaningful error messages 5. Implement rollback mechanisms 6. Set up alerting for failures

Q: How do you manage secrets in automation scripts? A: 1. Never hardcode secrets 2. Use environment variables 3. Integrate with secret managers (Vault, AWS Secrets Manager) 4. Use Kubernetes Secrets 5. Implement proper access controls 6. Audit secret access

Q: Describe your experience with Ansible for Kubernetes. A: Be ready to discuss: - Playbook structure - kubernetes.core collection - Inventory management - Variable precedence - Role organization - Integration with CI/CD

Hands-on Scenarios

Scenario 1: Write a deployment script - Check prerequisites - Deploy application - Wait for readiness - Verify deployment - Rollback on failure

Scenario 2: Create monitoring script - Check resource usage - Alert on thresholds - Log metrics - Send notifications

Scenario 3: Automate backup - Backup etcd - Backup persistent volumes - Upload to remote storage - Implement retention policy


Quick Reference

Useful Shell Commands

# Get all pods with high memory usage
kubectl top pods --all-namespaces --sort-by=memory

# Find pods in CrashLoopBackOff
kubectl get pods --all-namespaces --field-selector=status.phase=Failed

# Delete all evicted pods
kubectl get pods --all-namespaces -o json | \
  jq -r '.items[] | select(.status.reason=="Evicted") | 
  "kubectl delete pod \(.metadata.name) -n \(.metadata.namespace)"' | sh

# Get pod resource requests/limits
kubectl get pods -o custom-columns=\
NAME:.metadata.name,\
CPU_REQ:.spec.containers[*].resources.requests.cpu,\
MEM_REQ:.spec.containers[*].resources.requests.memory

Python Kubernetes Client Examples

```python

List pods

pods = core_v1.list_namespaced_pod(namespace='default')

Create deployment

apps_v1.create_namespaced_deployment(namespace='default', body=deployment)

Scale deployment

apps_v1.patch_namespaced_deployment_scale( name='myapp', namespace='default', body={'spec': {'replicas': 5}} )

Watch events

w = watch.Watch() for event in w.stream(core_v1.list_namespaced_pod, namespace='default'): print(f"Event: {event['type']} {event['object'].metadata.name}")