ACME Protocol Implementation - Free Certificates
ACME protocol implementation enables automated certificate issuance (Let’s Encrypt–style) for internal or public PKI. This page covers what it takes to deploy an ACME server, integrate clients, and run certificate automation so renewals are zero-touch and reliable.
ACME Protocol Implementation
Section titled “ACME Protocol Implementation”Why This Matters
Section titled “Why This Matters”For executives: ACME is why Let’s Encrypt can issue 300+ million certificates at essentially zero cost. For organizations with internal PKI, implementing ACME server eliminates manual certificate operations that cost $50-200 per certificate in labor. This is strategic infrastructure that transforms PKI from operational burden to invisible automation.
For security leaders: Manual certificate operations don’t scale and create security risks (expired certificates, weak processes, no audit trail). ACME-based automation forces consistent security processes, provides complete audit trails, and eliminates human error from certificate lifecycle. This is how you achieve secure PKI at scale.
For engineers: You implement ACME client integration constantly (cert-manager, certbot, Kubernetes, service mesh). Understanding ACME protocol helps you debug validation failures, implement custom challenge types, and build internal automation. When certificate renewal fails at 3 AM, understanding ACME helps you fix it.
Common scenario: Your organization needs to issue thousands of certificates across cloud environments. Manual processes don’t scale. Public CAs don’t meet internal use cases (air-gapped networks, custom validation, regulatory requirements). You need internal ACME server providing Let’s Encrypt-style automation for internal infrastructure.
ACME (Automated Certificate Management Environment) is the protocol that revolutionized PKI by enabling fully automated certificate issuance and renewal without human intervention. Implementing an ACME server involves building a complete certificate authority with account management, order processing, challenge validation (DNS-01, HTTP-01, TLS-ALPN-01), certificate issuance, and renewal workflows. The reference implementation (Boulder, used by Let’s Encrypt) demonstrates production-grade architecture with multi-tier validation, rate limiting, database design for high availability, monitoring, and security controls. Organizations implement ACME servers for private PKI, regulatory compliance requiring private CAs, custom validation logic, specialized certificate types, and air-gapped environments. The protocol’s elegant design separates concerns (account ↔ order ↔ authorization ↔ challenge), uses cryptographic account binding, and supports automated domain validation at scale.
Key Insight: ACME succeeds because it inverts traditional PKI assumptions—instead of building for human-in-the-loop manual processes with automation as an afterthought, it assumes fully automated machine-driven workflows with human intervention as the exception. This fundamental design choice enables modern cloud-native architectures and short-lived certificate strategies.
Overview
Section titled “Overview”ACME (RFC 8555) defines a protocol between certificate applicants and certificate authorities that enables fully automated certificate lifecycle management. The protocol uses HTTPS + JSON and cryptographic signatures for all operations.
Core ACME Concepts:
- Account - Identity used across all ACME operations, bound to public key
- Order - Request for certificate covering specific identifiers
- Authorization - Proof of control required for each identifier
- Challenge - Method used to prove control (DNS, HTTP, TLS-ALPN)
- Certificate - Final artifact delivered after successful validation
Protocol Flow:
Client Server | | |--- Create Account (with public key) ------------------->| |<-- Account URL + status --------------------------------| | | |--- Create Order (with identifiers) -------------------->| |<-- Order URL + authorization URLs ----------------------| | | |--- Fetch Authorization (for each identifier) ---------->| |<-- Challenge options (DNS-01, HTTP-01, TLS-ALPN-01) ----| | | |--- Complete Challenge (place validation token) -------->| |<-- Challenge accepted ----------------------------------| | | |--- Notify CA (challenge ready) ------------------------>| |<-- Server validates asynchronously ---------------------| | | |--- Poll Authorization (check status) ------------------>| |<-- Status: valid ---------------------------------------| | | |--- Finalize Order (submit CSR) ------------------------>| |<-- Order status: processing ----------------------------| | | |--- Poll Order (wait for certificate) ------------------>| |<-- Certificate URL -------------------------------------| | | |--- Download Certificate ------------------------------->| |<-- Certificate chain -----------------------------------|Server Architecture
Section titled “Server Architecture”Component Overview
Section titled “Component Overview”Production ACME servers consist of multiple specialized components:
┌─────────────────┐ │ Load Balancer │ └────────┬────────┘ │ ┌──────────────┼──────────────┐ │ │ │ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │ ACME │ │ ACME │ │ ACME │ │ API │ │ API │ │ API │ │ Server │ │ Server │ │ Server │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ └──────────────┼──────────────┘ │ ┌────────▼────────┐ │ Message Queue │ │ (RabbitMQ) │ └────────┬────────┘ │ ┌──────────────┼──────────────┐ │ │ │ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │Challenge│ │Challenge│ │Challenge│ │Validator│ │Validator│ │Validator│ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ └──────────────┼──────────────┘ │ ┌────────▼────────┐ │ PostgreSQL │ │ (Primary + │ │ Replicas) │ └────────┬────────┘ │ ┌────────▼────────┐ │ CA Signer │ │ (with HSM) │ └─────────────────┘Components:
- API Server - Handles ACME requests, enforces rate limits, manages sessions
- Validator - Performs challenge validation asynchronously
- Signer - Interfaces with HSM for certificate signing
- Database - Stores accounts, orders, authorizations, certificates
- Message Queue - Decouples API from validation for scalability
- Monitoring - Tracks metrics, alerts, audit logs
Database Schema
Section titled “Database Schema”Core tables for ACME server:
-- Accounts tableCREATE TABLE accounts ( id BIGSERIAL PRIMARY KEY, key_id TEXT NOT NULL UNIQUE, -- JWK thumbprint jwk JSONB NOT NULL, -- Account public key contact TEXT[], -- Email addresses status VARCHAR(20) NOT NULL, -- valid, deactivated, revoked terms_agreed BOOLEAN NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), CONSTRAINT valid_status CHECK (status IN ('valid', 'deactivated', 'revoked')));
CREATE INDEX idx_accounts_key_id ON accounts(key_id);CREATE INDEX idx_accounts_status ON accounts(status);
-- Orders tableCREATE TABLE orders ( id BIGSERIAL PRIMARY KEY, account_id BIGINT NOT NULL REFERENCES accounts(id), status VARCHAR(20) NOT NULL, -- pending, ready, processing, valid, invalid expires TIMESTAMP NOT NULL, identifiers JSONB NOT NULL, -- Array of {type, value} not_before TIMESTAMP, not_after TIMESTAMP, error JSONB, certificate_serial TEXT, -- Once issued created_at TIMESTAMP NOT NULL DEFAULT NOW(), CONSTRAINT valid_status CHECK (status IN ('pending', 'ready', 'processing', 'valid', 'invalid')));
CREATE INDEX idx_orders_account ON orders(account_id);CREATE INDEX idx_orders_status ON orders(status);CREATE INDEX idx_orders_expires ON orders(expires);
-- Authorizations tableCREATE TABLE authorizations ( id BIGSERIAL PRIMARY KEY, account_id BIGINT NOT NULL REFERENCES accounts(id), identifier_type VARCHAR(10) NOT NULL, -- dns, ip identifier_value TEXT NOT NULL, status VARCHAR(20) NOT NULL, -- pending, valid, invalid, deactivated, expired, revoked expires TIMESTAMP NOT NULL, wildcard BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMP NOT NULL DEFAULT NOW(), validated_at TIMESTAMP, CONSTRAINT valid_status CHECK (status IN ('pending', 'valid', 'invalid', 'deactivated', 'expired', 'revoked')));
CREATE INDEX idx_authz_account ON authorizations(account_id);CREATE INDEX idx_authz_identifier ON authorizations(identifier_type, identifier_value);CREATE INDEX idx_authz_status ON authorizations(status);CREATE UNIQUE INDEX idx_authz_unique ON authorizations(account_id, identifier_type, identifier_value, wildcard) WHERE status IN ('pending', 'valid');
-- Challenges tableCREATE TABLE challenges ( id BIGSERIAL PRIMARY KEY, authorization_id BIGINT NOT NULL REFERENCES authorizations(id), type VARCHAR(20) NOT NULL, -- http-01, dns-01, tls-alpn-01 status VARCHAR(20) NOT NULL, -- pending, processing, valid, invalid token TEXT NOT NULL, validated TIMESTAMP, error JSONB, validation_record JSONB, -- Store validation details CONSTRAINT valid_type CHECK (type IN ('http-01', 'dns-01', 'tls-alpn-01')), CONSTRAINT valid_status CHECK (status IN ('pending', 'processing', 'valid', 'invalid')));
CREATE INDEX idx_challenges_authz ON challenges(authorization_id);CREATE INDEX idx_challenges_status ON challenges(status);CREATE INDEX idx_challenges_type ON challenges(type);
-- Order-Authorization junctionCREATE TABLE order_authorizations ( order_id BIGINT NOT NULL REFERENCES orders(id), authorization_id BIGINT NOT NULL REFERENCES authorizations(id), PRIMARY KEY (order_id, authorization_id));
-- Certificates tableCREATE TABLE certificates ( serial TEXT PRIMARY KEY, der BYTEA NOT NULL, -- Certificate in DER format issued_at TIMESTAMP NOT NULL DEFAULT NOW(), expires TIMESTAMP NOT NULL, account_id BIGINT NOT NULL REFERENCES accounts(id), order_id BIGINT REFERENCES orders(id), revoked BOOLEAN NOT NULL DEFAULT FALSE, revoked_at TIMESTAMP, revocation_reason INT);
CREATE INDEX idx_certificates_account ON certificates(account_id);CREATE INDEX idx_certificates_expires ON certificates(expires);CREATE INDEX idx_certificates_revoked ON certificates(revoked);
-- Rate limiting tableCREATE TABLE rate_limits ( id BIGSERIAL PRIMARY KEY, account_id BIGINT REFERENCES accounts(id), ip_address INET, limit_name VARCHAR(50) NOT NULL, count INT NOT NULL DEFAULT 1, window_start TIMESTAMP NOT NULL, window_end TIMESTAMP NOT NULL);
CREATE INDEX idx_rate_limits_account ON rate_limits(account_id, limit_name, window_end);CREATE INDEX idx_rate_limits_ip ON rate_limits(ip_address, limit_name, window_end);Account Management
Section titled “Account Management”Account Creation
Section titled “Account Creation”ACME accounts are bound to public keys, not usernames/passwords:
from josepy import jwkfrom acme import messages, clientimport json
class ACMEAccountManager: """Handle ACME account operations"""
def create_account(self, jwk_key, contact_emails, terms_agreed): """Create new ACME account"""
# Calculate key ID (JWK thumbprint) key_id = self.calculate_key_id(jwk_key)
# Check for existing account existing = self.db.execute( "SELECT id FROM accounts WHERE key_id = %s", (key_id,) ).fetchone()
if existing: raise ConflictError(f"Account already exists: {key_id}")
# Verify terms agreed if not terms_agreed: raise ValueError("Must agree to terms of service")
# Insert account account_id = self.db.execute( """ INSERT INTO accounts (key_id, jwk, contact, status, terms_agreed) VALUES (%s, %s, %s, 'valid', TRUE) RETURNING id """, (key_id, json.dumps(jwk_key.to_json()), contact_emails) ).fetchone()[0]
self.db.commit()
# Generate account URL account_url = f"{self.base_url}/acme/acct/{account_id}"
self.audit_log.log_account_created(account_id, key_id, contact_emails)
return { 'id': account_id, 'key': jwk_key, 'contact': contact_emails, 'status': 'valid', 'orders': f"{account_url}/orders" }, account_url
def calculate_key_id(self, jwk_key): """Calculate JWK thumbprint for account identification""" import hashlib import base64
# Get canonical JWK representation canonical = jwk_key.thumbprint()
# SHA-256 hash digest = hashlib.sha256(canonical).digest()
# Base64url encode key_id = base64.urlsafe_b64encode(digest).decode().rstrip('=')
return key_idAccount Key Rollover
Section titled “Account Key Rollover”Allowing account key changes while maintaining account history:
def rollover_account_key(self, account_id, old_jwk, new_jwk, signed_request): """Roll over account key to new key"""
# Verify request signed by old key if not self.verify_signature(signed_request, old_jwk): raise UnauthorizedError("Invalid signature with old key")
# Verify inner request signed by new key inner = json.loads(signed_request['payload']) if not self.verify_signature(inner, new_jwk): raise UnauthorizedError("Invalid signature with new key")
# Calculate new key ID new_key_id = self.calculate_key_id(new_jwk)
# Check new key not already in use existing = self.db.execute( "SELECT id FROM accounts WHERE key_id = %s", (new_key_id,) ).fetchone()
if existing: raise ConflictError("New key already associated with account")
# Update account self.db.execute( """ UPDATE accounts SET key_id = %s, jwk = %s WHERE id = %s """, (new_key_id, json.dumps(new_jwk.to_json()), account_id) )
self.db.commit() self.audit_log.log_key_rollover(account_id, old_jwk, new_jwk)
return {'status': 'valid', 'key': new_jwk}Order Processing
Section titled “Order Processing”Creating Orders
Section titled “Creating Orders”class ACMEOrderProcessor: """Process ACME orders"""
def create_order(self, account_id, identifiers, not_before=None, not_after=None): """Create new certificate order"""
# Validate identifiers for identifier in identifiers: if identifier['type'] not in ['dns', 'ip']: raise ValueError(f"Unsupported identifier type: {identifier['type']}")
# Validate DNS name format if identifier['type'] == 'dns': if not self.is_valid_domain(identifier['value']): raise ValueError(f"Invalid domain: {identifier['value']}")
# Check for rate limits self.check_rate_limits(account_id, 'orders_per_account')
# Set expiry (7 days standard) expires = datetime.utcnow() + timedelta(days=7)
# Create order order_id = self.db.execute( """ INSERT INTO orders (account_id, status, expires, identifiers, not_before, not_after) VALUES (%s, 'pending', %s, %s, %s, %s) RETURNING id """, (account_id, expires, json.dumps(identifiers), not_before, not_after) ).fetchone()[0]
# Create authorizations for each unique identifier authz_ids = [] for identifier in identifiers: authz_id = self.create_authorization( account_id, identifier['type'], identifier['value'] ) authz_ids.append(authz_id)
# Link to order self.db.execute( "INSERT INTO order_authorizations (order_id, authorization_id) VALUES (%s, %s)", (order_id, authz_id) )
self.db.commit()
# Generate order URL order_url = f"{self.base_url}/acme/order/{order_id}"
return { 'status': 'pending', 'expires': expires.isoformat(), 'identifiers': identifiers, 'authorizations': [ f"{self.base_url}/acme/authz/{authz_id}" for authz_id in authz_ids ], 'finalize': f"{order_url}/finalize" }, order_url
def create_authorization(self, account_id, identifier_type, identifier_value): """Create authorization with challenges"""
# Check for existing valid authorization existing = self.db.execute( """ SELECT id FROM authorizations WHERE account_id = %s AND identifier_type = %s AND identifier_value = %s AND status = 'valid' AND expires > NOW() """, (account_id, identifier_type, identifier_value) ).fetchone()
if existing: return existing[0]
# Create new authorization expires = datetime.utcnow() + timedelta(days=7)
authz_id = self.db.execute( """ INSERT INTO authorizations (account_id, identifier_type, identifier_value, status, expires) VALUES (%s, %s, %s, 'pending', %s) RETURNING id """, (account_id, identifier_type, identifier_value, expires) ).fetchone()[0]
# Create challenges self.create_challenges_for_authorization(authz_id, identifier_type)
return authz_id
def create_challenges_for_authorization(self, authz_id, identifier_type): """Create appropriate challenges for identifier type"""
import secrets
# Generate random token token = secrets.token_urlsafe(32)
if identifier_type == 'dns': # DNS identifiers get all challenge types challenge_types = ['http-01', 'dns-01', 'tls-alpn-01'] elif identifier_type == 'ip': # IP identifiers only get http-01 and tls-alpn-01 challenge_types = ['http-01', 'tls-alpn-01']
for challenge_type in challenge_types: self.db.execute( """ INSERT INTO challenges (authorization_id, type, status, token) VALUES (%s, %s, 'pending', %s) """, (authz_id, challenge_type, token) )Finalizing Orders
Section titled “Finalizing Orders”def finalize_order(self, order_id, account_id, csr_der): """Finalize order by submitting CSR"""
# Load order order = self.db.execute( "SELECT * FROM orders WHERE id = %s AND account_id = %s", (order_id, account_id) ).fetchone()
if not order: raise NotFoundError("Order not found")
if order['status'] != 'ready': raise OrderNotReadyError(f"Order status is {order['status']}, must be 'ready'")
# Parse CSR from cryptography import x509 from cryptography.hazmat.backends import default_backend
csr = x509.load_der_x509_csr(csr_der, default_backend())
# Verify CSR identifiers match order csr_names = self.extract_identifiers_from_csr(csr) order_names = set( identifier['value'] for identifier in json.loads(order['identifiers']) )
if csr_names != order_names: raise ValueError("CSR identifiers don't match order")
# Update order status self.db.execute( "UPDATE orders SET status = 'processing' WHERE id = %s", (order_id,) ) self.db.commit()
# Queue certificate generation self.queue_certificate_generation(order_id, csr_der)
return {'status': 'processing'}
def generate_certificate(self, order_id, csr_der): """Generate and sign certificate"""
from cryptography import x509 from cryptography.hazmat.primitives import hashes from datetime import datetime, timedelta
# Load order order = self.db.execute("SELECT * FROM orders WHERE id = %s", (order_id,)).fetchone()
# Parse CSR csr = x509.load_der_x509_csr(csr_der, default_backend())
# Build certificate builder = x509.CertificateBuilder() builder = builder.subject_name(csr.subject) builder = builder.issuer_name(self.issuer_cert.subject) builder = builder.public_key(csr.public_key()) builder = builder.serial_number(x509.random_serial_number())
# Set validity not_before = order.get('not_before') or datetime.utcnow() not_after = order.get('not_after') or (datetime.utcnow() + timedelta(days=90)) builder = builder.not_valid_before(not_before) builder = builder.not_valid_after(not_after)
# Copy extensions from CSR for extension in csr.extensions: builder = builder.add_extension( extension.value, critical=extension.critical )
# Add required extensions builder = self.add_required_extensions(builder)
# Sign certificate certificate = builder.sign( private_key=self.issuer_key, algorithm=hashes.SHA256(), backend=default_backend() )
# Store certificate serial = format(certificate.serial_number, 'x') self.db.execute( """ INSERT INTO certificates (serial, der, expires, account_id, order_id) VALUES (%s, %s, %s, %s, %s) """, (serial, certificate.public_bytes(serialization.Encoding.DER), not_after, order['account_id'], order_id) )
# Update order self.db.execute( """ UPDATE orders SET status = 'valid', certificate_serial = %s WHERE id = %s """, (serial, order_id) )
self.db.commit()
return certificateChallenge Validation
Section titled “Challenge Validation”DNS-01 Challenge
Section titled “DNS-01 Challenge”The most secure and versatile validation method:
import dns.resolverimport hashlibimport base64
class DNS01Validator: """Validate DNS-01 challenges"""
def validate(self, challenge, authorization, account_jwk): """Perform DNS-01 validation"""
domain = authorization['identifier_value'] token = challenge['token']
# Calculate expected TXT record value expected_value = self.calculate_key_authorization(token, account_jwk)
# Hash the key authorization digest = hashlib.sha256(expected_value.encode()).digest() expected_record = base64.urlsafe_b64encode(digest).decode().rstrip('=')
# Query DNS record_name = f"_acme-challenge.{domain}"
try: # Query multiple nameservers for redundancy nameservers = ['8.8.8.8', '1.1.1.1', '9.9.9.9'] found = False
for nameserver in nameservers: resolver = dns.resolver.Resolver() resolver.nameservers = [nameserver] resolver.timeout = 5 resolver.lifetime = 5
try: answers = resolver.resolve(record_name, 'TXT')
for rdata in answers: txt_value = rdata.strings[0].decode()
if txt_value == expected_record: found = True break
if found: break
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): continue
if not found: return False, f"Expected TXT record '{expected_record}' not found at {record_name}"
# Store validation record validation_record = { 'nameservers_queried': nameservers, 'record_name': record_name, 'expected_value': expected_record, 'validated_at': datetime.utcnow().isoformat() }
return True, validation_record
except dns.exception.Timeout: return False, f"DNS query timeout for {record_name}"
except Exception as e: return False, f"DNS validation error: {str(e)}"
def calculate_key_authorization(self, token, account_jwk): """Calculate key authorization for challenge"""
# JWK thumbprint thumbprint = account_jwk.thumbprint()
# key_authorization = token || '.' || base64url(thumbprint) key_auth = f"{token}.{base64.urlsafe_b64encode(thumbprint).decode().rstrip('=')}"
return key_authHTTP-01 Challenge
Section titled “HTTP-01 Challenge”For validation via web server:
import requests
class HTTP01Validator: """Validate HTTP-01 challenges"""
def validate(self, challenge, authorization, account_jwk): """Perform HTTP-01 validation"""
domain = authorization['identifier_value'] token = challenge['token']
# Calculate expected response expected_response = self.calculate_key_authorization(token, account_jwk)
# Build validation URL url = f"http://{domain}/.well-known/acme-challenge/{token}"
try: # Fetch with redirects allowed (up to 10) response = requests.get( url, timeout=10, allow_redirects=True, headers={'User-Agent': 'ACME-Server/1.0'} )
if response.status_code != 200: return False, f"HTTP {response.status_code} when fetching {url}"
# Verify content type content_type = response.headers.get('Content-Type', '') if not content_type.startswith('text/plain') and not content_type.startswith('application/octet-stream'): return False, f"Invalid content type: {content_type}"
# Check response body body = response.text.strip()
if body != expected_response: return False, f"Response mismatch. Expected: {expected_response}, Got: {body}"
# Validation successful validation_record = { 'url': url, 'status_code': response.status_code, 'expected_response': expected_response, 'validated_at': datetime.utcnow().isoformat(), 'redirect_chain': [resp.url for resp in response.history] + [response.url] }
return True, validation_record
except requests.Timeout: return False, f"Timeout fetching {url}"
except requests.ConnectionError as e: return False, f"Connection error: {str(e)}"
except Exception as e: return False, f"Validation error: {str(e)}"TLS-ALPN-01 Challenge
Section titled “TLS-ALPN-01 Challenge”For systems that can only serve HTTPS:
import sslimport socketfrom cryptography import x509from cryptography.hazmat.primitives import hashes
class TLSALPN01Validator: """Validate TLS-ALPN-01 challenges"""
ACME_TLS_1_PROTOCOL = 'acme-tls/1'
def validate(self, challenge, authorization, account_jwk): """Perform TLS-ALPN-01 validation"""
domain = authorization['identifier_value'] token = challenge['token']
# Calculate expected value key_auth = self.calculate_key_authorization(token, account_jwk) key_auth_hash = hashlib.sha256(key_auth.encode()).digest()
try: # Create SSL context with ALPN context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE context.set_alpn_protocols([self.ACME_TLS_1_PROTOCOL])
# Connect with socket.create_connection((domain, 443), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock:
# Verify ALPN protocol selected_protocol = ssock.selected_alpn_protocol() if selected_protocol != self.ACME_TLS_1_PROTOCOL: return False, f"Wrong ALPN protocol: {selected_protocol}"
# Get certificate cert_der = ssock.getpeercert(binary_form=True) cert = x509.load_der_x509_certificate(cert_der, default_backend())
# Verify certificate contains ACME extension try: acme_ext = cert.extensions.get_extension_for_oid( x509.ObjectIdentifier('1.3.6.1.5.5.7.1.31') # id-pe-acmeIdentifier )
# Extension value should be SHA-256 hash of key authorization if acme_ext.value.value != key_auth_hash: return False, "ACME extension value mismatch"
except x509.ExtensionNotFound: return False, "ACME extension not found in certificate"
# Verify SAN matches domain try: san_ext = cert.extensions.get_extension_for_class( x509.SubjectAlternativeName )
san_names = [name.value for name in san_ext.value] if domain not in san_names: return False, f"Domain {domain} not in certificate SANs"
except x509.ExtensionNotFound: return False, "No SAN extension in certificate"
# Validation successful validation_record = { 'domain': domain, 'alpn_protocol': selected_protocol, 'certificate_fingerprint': cert.fingerprint(hashes.SHA256()).hex(), 'validated_at': datetime.utcnow().isoformat() }
return True, validation_record
except socket.timeout: return False, f"Connection timeout to {domain}:443"
except ssl.SSLError as e: return False, f"SSL error: {str(e)}"
except Exception as e: return False, f"Validation error: {str(e)}"Validation Worker
Section titled “Validation Worker”Asynchronous validation processing:
import pikaimport json
class ValidationWorker: """Process challenge validations asynchronously"""
def __init__(self, rabbitmq_url, db_connection): self.db = db_connection self.validators = { 'dns-01': DNS01Validator(), 'http-01': HTTP01Validator(), 'tls-alpn-01': TLSALPN01Validator() }
# Setup RabbitMQ params = pika.URLParameters(rabbitmq_url) self.connection = pika.BlockingConnection(params) self.channel = self.connection.channel() self.channel.queue_declare(queue='challenge_validations', durable=True)
def start(self): """Start consuming validation requests"""
self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume( queue='challenge_validations', on_message_callback=self.handle_validation )
print('Validation worker started...') self.channel.start_consuming()
def handle_validation(self, ch, method, properties, body): """Process single validation request"""
try: request = json.loads(body)
challenge_id = request['challenge_id'] authz_id = request['authorization_id'] account_jwk = request['account_jwk']
# Load challenge and authorization challenge = self.load_challenge(challenge_id) authorization = self.load_authorization(authz_id)
# Update status to processing self.update_challenge_status(challenge_id, 'processing')
# Perform validation validator = self.validators[challenge['type']] success, result = validator.validate(challenge, authorization, account_jwk)
if success: # Mark challenge as valid self.update_challenge_status( challenge_id, 'valid', validation_record=result )
# Check if all challenges for authorization are valid if self.all_challenges_valid(authz_id): self.update_authorization_status(authz_id, 'valid')
# Check if all authorizations for orders are valid self.check_order_readiness(authz_id) else: # Mark challenge as invalid self.update_challenge_status( challenge_id, 'invalid', error=result )
# Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e: print(f"Validation error: {e}") # Requeue for retry ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def check_order_readiness(self, authz_id): """Check if any orders are now ready for finalization"""
# Find orders using this authorization orders = self.db.execute( """ SELECT DISTINCT o.id FROM orders o JOIN order_authorizations oa ON o.id = oa.order_id WHERE oa.authorization_id = %s AND o.status = 'pending' """, (authz_id,) ).fetchall()
for order_row in orders: order_id = order_row[0]
# Check if all authorizations are valid all_valid = self.db.execute( """ SELECT COUNT(*) = COUNT(CASE WHEN a.status = 'valid' THEN 1 END) FROM order_authorizations oa JOIN authorizations a ON oa.authorization_id = a.id WHERE oa.order_id = %s """, (order_id,) ).fetchone()[0]
if all_valid: self.db.execute( "UPDATE orders SET status = 'ready' WHERE id = %s", (order_id,) ) self.db.commit()Rate Limiting
Section titled “Rate Limiting”Rate Limit Implementation
Section titled “Rate Limit Implementation”from datetime import datetime, timedelta
class RateLimiter: """Enforce ACME rate limits"""
def __init__(self, db): self.db = db self.limits = { 'new_accounts_per_ip': {'limit': 10, 'window': timedelta(hours=3)}, 'orders_per_account': {'limit': 300, 'window': timedelta(hours=3)}, 'certificates_per_domain': {'limit': 50, 'window': timedelta(days=7)}, 'failed_validations': {'limit': 5, 'window': timedelta(hours=1)}, 'new_orders_per_ip': {'limit': 20, 'window': timedelta(minutes=10)} }
def check_limit(self, limit_name, identifier_value, identifier_type='account_id'): """Check if rate limit exceeded"""
config = self.limits[limit_name] window_start = datetime.utcnow() - config['window']
# Clean old entries self.db.execute( "DELETE FROM rate_limits WHERE window_end < NOW()" )
# Count current usage if identifier_type == 'account_id': current_count = self.db.execute( """ SELECT COALESCE(SUM(count), 0) FROM rate_limits WHERE account_id = %s AND limit_name = %s AND window_start >= %s """, (identifier_value, limit_name, window_start) ).fetchone()[0] else: # IP address current_count = self.db.execute( """ SELECT COALESCE(SUM(count), 0) FROM rate_limits WHERE ip_address = %s AND limit_name = %s AND window_start >= %s """, (identifier_value, limit_name, window_start) ).fetchone()[0]
if current_count >= config['limit']: retry_after = self.calculate_retry_after( identifier_value, identifier_type, limit_name ) raise RateLimitExceededError( f"Rate limit exceeded for {limit_name}", retry_after=retry_after )
# Increment counter self.increment_counter(identifier_value, identifier_type, limit_name, config['window'])
def increment_counter(self, identifier_value, identifier_type, limit_name, window): """Increment rate limit counter"""
window_end = datetime.utcnow() + window
if identifier_type == 'account_id': self.db.execute( """ INSERT INTO rate_limits (account_id, limit_name, count, window_start, window_end) VALUES (%s, %s, 1, NOW(), %s) """, (identifier_value, limit_name, window_end) ) else: self.db.execute( """ INSERT INTO rate_limits (ip_address, limit_name, count, window_start, window_end) VALUES (%s, %s, 1, NOW(), %s) """, (identifier_value, limit_name, window_end) )
self.db.commit()
def calculate_retry_after(self, identifier_value, identifier_type, limit_name): """Calculate when limit will reset"""
if identifier_type == 'account_id': oldest = self.db.execute( """ SELECT MIN(window_end) FROM rate_limits WHERE account_id = %s AND limit_name = %s """, (identifier_value, limit_name) ).fetchone()[0] else: oldest = self.db.execute( """ SELECT MIN(window_end) FROM rate_limits WHERE ip_address = %s AND limit_name = %s """, (identifier_value, limit_name) ).fetchone()[0]
if oldest: return int((oldest - datetime.utcnow()).total_seconds()) return 60High Availability Deployment
Section titled “High Availability Deployment”Multi-Region Architecture
Section titled “Multi-Region Architecture”apiVersion: apps/v1kind: Deploymentmetadata: name: acme-api-serverspec: replicas: 3 selector: matchLabels: app: acme-api template: metadata: labels: app: acme-api spec: containers: - name: acme-api image: acme-server:latest ports: - containerPort: 8080 env: - name: DATABASE_URL valueFrom: secretKeyRef: name: acme-secrets key: database-url - name: RABBITMQ_URL valueFrom: secretKeyRef: name: acme-secrets key: rabbitmq-url resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8080 initialDelaySeconds: 10 periodSeconds: 5---apiVersion: v1kind: Servicemetadata: name: acme-api-servicespec: selector: app: acme-api ports: - protocol: TCP port: 443 targetPort: 8080 type: LoadBalancer---apiVersion: apps/v1kind: Deploymentmetadata: name: acme-validatorspec: replicas: 5 selector: matchLabels: app: acme-validator template: metadata: labels: app: acme-validator spec: containers: - name: validator image: acme-validator:latest env: - name: DATABASE_URL valueFrom: secretKeyRef: name: acme-secrets key: database-url - name: RABBITMQ_URL valueFrom: secretKeyRef: name: acme-secrets key: rabbitmq-url resources: requests: memory: "128Mi" cpu: "100m" limits: memory: "256Mi" cpu: "200m"Database Replication
Section titled “Database Replication”-- PostgreSQL streaming replication setup
-- On primary:-- postgresql.confwal_level = replicamax_wal_senders = 3max_replication_slots = 3synchronous_commit = onsynchronous_standby_names = 'standby1'
-- pg_hba.confhost replication replicator standby1_ip/32 md5host replication replicator standby2_ip/32 md5
-- Create replication userCREATE USER replicator REPLICATION LOGIN ENCRYPTED PASSWORD 'secure_password';
-- On replicas:-- recovery.confstandby_mode = onprimary_conninfo = 'host=primary_ip port=5432 user=replicator password=secure_password'trigger_file = '/tmp/postgresql.trigger'Monitoring and Observability
Section titled “Monitoring and Observability”Metrics Collection
Section titled “Metrics Collection”from prometheus_client import Counter, Histogram, Gaugeimport time
# Define metricsorders_created = Counter('acme_orders_created_total', 'Total orders created')certificates_issued = Counter('acme_certificates_issued_total', 'Total certificates issued')validations_attempted = Counter('acme_validations_attempted_total', 'Total validation attempts', ['type', 'result'])validation_duration = Histogram('acme_validation_duration_seconds', 'Validation duration', ['type'])active_orders = Gauge('acme_active_orders', 'Currently active orders')rate_limit_hits = Counter('acme_rate_limit_hits_total', 'Rate limit exceeded', ['limit_name'])
class InstrumentedACMEServer: """ACME server with Prometheus metrics"""
def create_order(self, account_id, identifiers): """Create order with metrics"""
orders_created.inc() active_orders.inc()
try: order = super().create_order(account_id, identifiers) return order except RateLimitExceededError as e: rate_limit_hits.labels(limit_name='orders_per_account').inc() raise
def validate_challenge(self, challenge_id): """Validate with timing metrics"""
challenge = self.load_challenge(challenge_id) challenge_type = challenge['type']
start = time.time()
try: result = super().validate_challenge(challenge_id)
duration = time.time() - start validation_duration.labels(type=challenge_type).observe(duration)
if result: validations_attempted.labels(type=challenge_type, result='success').inc() else: validations_attempted.labels(type=challenge_type, result='failure').inc()
return result
except Exception as e: validations_attempted.labels(type=challenge_type, result='error').inc() raise
def issue_certificate(self, order_id): """Issue certificate with metrics"""
cert = super().issue_certificate(order_id) certificates_issued.inc() active_orders.dec() return certLogging
Section titled “Logging”import loggingimport json
class StructuredLogger: """JSON structured logging for ACME server"""
def __init__(self, name): self.logger = logging.getLogger(name) handler = logging.StreamHandler() handler.setFormatter(JsonFormatter()) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO)
def log_order_created(self, order_id, account_id, identifiers): self.logger.info('order_created', extra={ 'order_id': order_id, 'account_id': account_id, 'identifier_count': len(identifiers), 'identifiers': identifiers })
def log_validation_attempt(self, challenge_id, challenge_type, domain, result): self.logger.info('validation_attempted', extra={ 'challenge_id': challenge_id, 'challenge_type': challenge_type, 'domain': domain, 'result': result })
def log_certificate_issued(self, order_id, serial, common_name): self.logger.info('certificate_issued', extra={ 'order_id': order_id, 'serial': serial, 'common_name': common_name })
def log_rate_limit_exceeded(self, account_id, limit_name, ip_address): self.logger.warning('rate_limit_exceeded', extra={ 'account_id': account_id, 'limit_name': limit_name, 'ip_address': ip_address })
class JsonFormatter(logging.Formatter): """Format logs as JSON"""
def format(self, record): log_data = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'message': record.getMessage(), 'logger': record.name }
# Add extra fields if hasattr(record, 'extra'): log_data.update(record.extra)
return json.dumps(log_data)Security Considerations
Section titled “Security Considerations”Account Security
Section titled “Account Security”- Account keys should be at least 2048-bit RSA or 256-bit ECDSA
- Implement account key rollover to allow key rotation
- Rate limit account creation per IP to prevent abuse
- Log all account operations for audit
Validation Security
Section titled “Validation Security”- Perform validation from multiple vantage points
- Implement validation timeout (maximum 60 seconds)
- Store validation records for compliance
- Use DNS resolvers that support DNSSEC when possible
Certificate Security
Section titled “Certificate Security”- Never issue certificates longer than 398 days
- Enforce key size minimums (2048-bit RSA, 256-bit ECDSA)
- Check Certificate Transparency logs
- Implement CAA checking
- Use OCSP Must-Staple extension
Infrastructure Security
Section titled “Infrastructure Security”- HSM for CA signing keys
- Database encryption at rest
- TLS 1.3 for all communications
- Regular security audits and penetration testing
- Separate validation network from API network
Common Pitfalls
Section titled “Common Pitfalls”Validation Race Conditions
Section titled “Validation Race Conditions”Problem: Client removes challenge response before all validators check
Solution: Multiple validators with staggered timing, validation record storage
Database Hotspots
Section titled “Database Hotspots”Problem: Single order table becomes bottleneck at scale
Solution: Partition by time range, archive completed orders, use read replicas
Challenge Token Reuse
Section titled “Challenge Token Reuse”Problem: Using same token for multiple challenges enables attacks
Solution: Generate unique token per challenge, expire after use
Missing CAA Checks
Section titled “Missing CAA Checks”Problem: Issuing certificates for domains with CAA records forbidding issuance
Solution: Check CAA records before every issuance, respect iodef reporting
Insufficient Rate Limiting
Section titled “Insufficient Rate Limiting”Problem: Single account can overwhelm validation infrastructure
Solution: Multiple rate limit tiers, exponential backoff, IP-based limits
Real-World Examples
Section titled “Real-World Examples”Let’s Encrypt (Boulder)
Section titled “Let’s Encrypt (Boulder)”The largest ACME CA, issuing 3+ million certificates daily:
- Go-based implementation for performance
- Multi-region deployment with anycast
- Comprehensive rate limiting (50 orders/account/hour, 300 pending/account)
- Multiple validation perspectives (4+ geographically distributed validators)
- Integration with Certificate Transparency
- Publicly audited code: Github - Boulder
Lessons: Horizontal scaling critical, validation must be geographically diverse, rate limiting essential, observability non-negotiable.
Sectigo (SCM)
Section titled “Sectigo (SCM)”Enterprise ACME CA with private deployments:
- Customizable validation workflows for internal networks
- Integration with enterprise directory services (LDAP, AD)
- Custom challenge types for air-gapped environments
- Policy-driven issuance with approval gates
- Support for client certificates and code signing
Lessons: ACME protocol extensible for enterprise needs, internal validation methods necessary, policy layer enables governance.
HashiCorp Vault PKI
Section titled “HashiCorp Vault PKI”ACME implementation for internal certificates:
- Integrated with Vault’s authentication methods
- Dynamic certificate lifetimes based on requester
- Automated renewal via Vault agents
- Multi-tenancy with namespace isolation
- Audit logging through Vault audit devices
Lessons: ACME works for private PKI, integration with existing auth simplifies adoption, short lifetimes reduce operational burden.
Further Reading
Section titled “Further Reading”Standards and RFCs
Section titled “Standards and RFCs”- RFC 8555: ACME Protocol
- RFC 8657: CAA Record Extensions for ACME
- RFC 8737: ACME TLS ALPN Challenge Extension
- RFC 8738: ACME IP Identifier Validation Extension
- Let’s Encrypt Integration Guide: Letsencrypt - Integration Guide
Related Pages
Section titled “Related Pages”- Certificate Issuance Workflows - Complete workflow patterns
- ACME Protocol - ACME standard deep dive
- Certificate Lifecycle Management - Lifecycle automation
- CA Architecture - CA design principles
- HSM Integration - Hardware security for CA keys
Implementation Resources
Section titled “Implementation Resources”- Boulder (Let’s Encrypt): Github - Boulder
- Certbot (ACME client): Github - Certbot
- acme.sh (Bash ACME client): Github - Acme.Sh
- Pebble (Test ACME server): Github - Pebble
- ACME Protocol Specification: Ietf - Rfc8555
Last Updated: 2025-11-09
Maintenance Notes: Monitor for ACME protocol updates (new challenge types, extensions), update Boulder implementation examples, add emerging validation methods, track Let’s Encrypt operational metrics