I’ve been attempting to implement certificate pinning for an internal service mesh using a self-signed root CA, but the pins are not being enforced. The goal is to restrict egress from certain agents to only services presenting certificates from our specific CA, thereby adding a layer of control against unauthorized proxies or TLS inspection points.
My current setup involves a self-signed root CA (`internal-ca.crt`) used to issue server certificates. I am pinning the public key hash of the CA certificate itself (SPKI pin). The agent is a Python application using the `requests` library with a custom `HTTPAdapter`. However, connections are still succeeding when I present a server certificate signed by a different, unauthorized CA.
Here is the pinning logic I’ve implemented:
```python
import hashlib
import ssl
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager
class PinnedAdapter(HTTPAdapter):
def __init__(self, pin):
self.pin = pin
super().__init__()
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self._create_ssl_context()
super().init_poolmanager(*args, **kwargs)
def _create_ssl_context(self):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_REQUIRED
# Custom verify callback to check pin
def verify_callback(conn, cert, err):
# Calculate SPKI hash
der = cert.public_bytes(encoding=ssl.Encoding.DER)
spki_hash = hashlib.sha256(der).digest()
return spki_hash == self.pin
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.set_verify_callback(verify_callback)
return ctx
```
I am loading the expected pin as follows:
```python
from cryptography import x509
from cryptography.hazmat.primitives import serialization
with open("internal-ca.crt", "rb") as f:
pem_data = f.read()
cert = x509.load_pem_x509_certificate(pem_data)
public_key = cert.public_key()
spki = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
expected_pin = hashlib.sha256(spki).digest()
```
Key questions:
1. Is the `verify_callback` the correct place for pin validation, or should pinning occur after the TLS handshake completes?
2. Does disabling `check_hostname` interfere with the pinning mechanism in this context?
3. How should key management be handled for pin rotation? Is there a standard pattern for distributing new pins to agents before CA renewal?
The broader context is agent identity and attestation: I want to ensure that only my authorized services, with certificates from my specific CA, can communicate with these agents, even if another trusted CA (like the system root store) attempts to sign a rogue certificate.
Don't roll your own crypto. Unless you have a spec.
You're likely hitting a trust-on-first-use issue with the default SSL context. Your adapter's `_create_ssl_context` method is missing. More importantly, pinning the CA's SPKI alone doesn't automatically reject certificates from other CAs unless you've also modified the trust store. The default `ssl.create_default_context()` trusts public root CAs. A server presenting a cert from, say, Let's Encrypt, will still be validated by the system's trust chain.
You need to create a custom SSL context that loads *only* your `internal-ca.crt` as the trusted CA. Then layer your pinning check on top of that. Try something like this in your `_create_ssl_context`:
```python
def _create_ssl_context(self):
ctx = ssl.create_default_context()
ctx.load_verify_locations('internal-ca.crt')
ctx.verify_mode = ssl.CERT_REQUIRED
# You'll also need to add a verify callback to check the pin.
return ctx
```
Without that, your pin is just an unused variable. The connection succeeds because the TLS handshake itself is still trusting the public CA that signed the illegitimate certificate.
segment first
That's precisely the root issue. user375 is correct about the trust store, but the proposed fix is incomplete. `ssl.create_default_context()` still loads the system's default CA certificates. You must call `ctx.load_default_certs()` *before* `load_verify_locations` to clear them, otherwise you're just adding your CA to the existing bundle.
Here's the adjusted context creation:
```python
def _create_ssl_context(self):
ctx = ssl.create_default_context()
ctx.load_default_certs() # This loads the system CAs
ctx.load_verify_locations('internal-ca.crt') # This replaces the trust store
ctx.verify_mode = ssl.CERT_REQUIRED
return ctx
```
Even then, you're only enforcing CA trust, not pinning. The pin check needs to be in a `verify_callback` function attached to the context, otherwise the SPKI hash is just a decorative variable. The callback receives the server certificate chain; you must extract the issuing CA's SPKI there and match it against your pin.
Audit everything, trust no syscall.
Close, but still wrong. `load_default_certs()` doesn't clear the store, it just loads the default system CAs. You're adding your CA on top again. You need to create a blank context.
```python
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.load_verify_locations('internal-ca.crt')
ctx.verify_mode = ssl.CERT_REQUIRED
```
That's it. The `Purpose` flag gives you an empty store. All this dancing around with callbacks and pinning feels like a waste when you could just control the trust root.
You're both missing the core audit problem. An empty trust store pins the CA implicitly, but you've lost forensic logging. Pinning with a callback creates a distinct, verifiable event in your agent logs that a specific SPKI was matched.
If you just restrict the CA and an engineer later adds another trusted cert to the file, the mesh degrades silently. No alert. A pinning failure is a loud, specific security event. That's the point.
Priya
Your `_create_ssl_context` method is incomplete in the snippet, but that's the critical failure point. The default context trusts the system's CA store, so any publicly-trusted certificate will validate.
You need to combine the approaches mentioned: a context that trusts *only* your CA, plus the pinning callback for explicit enforcement and logging. Here's a working version of the method:
```python
def _create_ssl_context(self):
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.load_verify_locations('internal-ca.crt')
ctx.verify_mode = ssl.CERT_REQUIRED
def verify_callback(conn, cert, errno, depth, ok):
if depth == 0: # server certificate
spki = cert.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
spki_hash = hashlib.sha256(spki).digest()
if spki_hash != self.pin:
return False
return ok
ctx.set_verify_callback(verify_callback)
return ctx
```
This enforces both the CA trust root and the specific pin, giving you the audit trail user138 mentioned. Without the callback, you're just doing CA restriction, not pinning.
r
Spot on with the callback. That's the right pattern for logging. One caveat - make sure `self.pin` is in binary format (the digest), not hex, or the comparison will always fail.
Also, watch the `ok` flag. If CA validation fails, `ok` will be False already. Your callback just adds the pin check on top. That's perfect.
No cloud, no problem.
You've got bigger problems than pinning logic. You're trying to use certificate pinning as an agent control layer, but your egress story is broken if you're not even restricting the trust root first.
This is exactly how you get agent sprawl talking to things you never approved. The pin is your last line of defense, not the first. Fix the CA trust store, then add the pin for the alerting like user138 said. Otherwise you'll never see the failures in your logs.
And you're pinning the CA's key. Why not pin the actual service keys? If you're worried about rogue proxies, pin the leaf. A rogue CA cert shouldn't exist in your fleet anyway.
Risk is not a feature toggle.