This is a walkthrough of python-tuf’s refresh()
. refresh() implements the client workflow of the TUF specification. This post is meant to help users that want to get into the code of python-tuf’s refresh(). The purpose of this post is that you can read it and increase your knowledge about the refresh() workflow. You will not have a full understanding, but it is a great beginners starting point.
refresh()
is available here:
def refresh(self) -> None:
"""Refresh top-level metadata.
Downloads, verifies, and loads metadata for the top-level roles in the
specified order (root -> timestamp -> snapshot -> targets) implementing
all the checks required in the TUF client workflow.
A ``refresh()`` can be done only once during the lifetime of an Updater.
If ``refresh()`` has not been explicitly called before the first
``get_targetinfo()`` call, it will be done implicitly at that time.
The metadata for delegated roles is not updated by ``refresh()``:
that happens on demand during ``get_targetinfo()``. However, if the
repository uses `consistent_snapshot
<https://theupdateframework.github.io/specification/latest/#consistent-snapshots>`_,
then all metadata downloaded by the Updater will use the same consistent
repository state.
Raises:
OSError: New metadata could not be written to disk
RepositoryError: Metadata failed to verify in some way
DownloadError: Download of a metadata file failed in some way
"""
self._load_root()
self._load_timestamp()
self._load_snapshot()
self._load_targets(Targets.type, Root.type)
`refresh()` does 4 things:
- Loads the root metadata:
_load_root()
- Loads the timestamp metadata:
_load_timestamp()
- Loads the snapshot metadata:
_load_shapshot()
- Loads the targets metadata:
_load_targets()
Load root
This is 5.3 of the TUF specification (https://theupdateframework.github.io/specification/v1.0.28/index.html#load-trusted-root) and is implemented in _load_root()
On the first two lines, _load_root()
updates the root version of the trusted set and sets the limit to root rotations. The default limit to the root rotations is set by the user, and _load_root()
calculates what that is compared to the current root version.
_load_root()
then attempts to download the metadata of each version between the lower_bound and the upper_bound, and if it receives a 403 or 404 error, it considers that to mean the last successful download is the newest available metadata:
for next_version in range(lower_bound, upper_bound):
try:
data = self._download_metadata(
Root.type,
self.config.root_max_length,
next_version,
)
self._trusted_set.update_root(data)
self._persist_metadata(Root.type, data)
except exceptions.DownloadHTTPError as exception:
if exception.status_code not in {403, 404}:
raise
# 404/403 means current root is newest available
break
For each attempt to download the latest root metadata, _load_root()
does three things:
Step 1: Download root metadata from the repository
It downloads the remote metadata file. It uses the fetcher for this. The fetcher is a class used for downloading data over the network. _load_root()
first invokes _download_metadata(), which then invokes fetcher.download_bytes(), which then invokes fetcher.download_file(), which downloads the remote root metadata file in chunks. After downloading each chunk, it checks if the amount of data received exceeds the limit that the client is configured with.
Step 2: Verify downloaded data and load it as the newest in the client
If step 1 succeeded, _load_root()
proceeds to updating the root of the trusted set. In this step, the python TUF client verifies the downloaded data from step 1 and adds it as the new root metadata. This is done in Client.update_root()
, calls into the _load_data()
method:
if Timestamp.type in self._trusted_set:
raise RuntimeError("Cannot update root after timestamp")
logger.debug("Updating root")
new_root, new_root_bytes, new_root_signatures = self._load_data(
Root, data, self.root
)
This method either refers to _load_from_metadata() or _load_from_simple_envelope() depending on the settings of the trusted metadata.
Both of these methods do two things:
- Parse the bytes into the respective type – either a
Metadata
or aSimpleEnvelope
- If there is a delegator, the delegator verifies the signatures of the parsed
Metadata
orSimpleEnvelope
. In this case, the client does pass a delegator which isself.root
on line 183:
if Timestamp.type in self._trusted_set:
raise RuntimeError("Cannot update root after timestamp")
logger.debug("Updating root")
new_root, new_root_bytes, new_root_signatures = self._load_data(
Root, data, self.root
)
What this means is that the delegator delegates trust to a delegate – which in this case is the new metadata. To fully assign trust to the new metadata, python-tuf must verify the delegate.
So let’s talk about what it means to verify a delegate, which is what happens on these lines depending on the envelope type:
if delegator:
if role_name is None:
role_name = role.type
delegator.verify_delegate(role_name, md.signed_bytes, md.signatures)
if delegator:
if role_name is None:
role_name = role.type
delegator.verify_delegate(
role_name, envelope.pae(), envelope.signatures
)
TUF has the concept of delegation, which means that roles can allow other roles to carry out one or more of the same tasks that they can. For example, a role with a trust level for signing a particular type of metadata can assign other roles to do the same. As such, this forms a chain of trust which starts with the root role. The root role has full permissions for all metadata and delegates different responsibilities to the other roles in TUF which are:
- Targets role
- Snapshot role
- Timestamp role
- Mirrors role
When for example a targets role has been delegated, that targets role can delegate full or limited trust to other roles. Since the targets role does not have trust to sign any other metadata than targets, they also cannot delegate trust related to any other metadata than targets.
So how does that relate to the step in question in the Python TUF client?
The delegation verification happens in verify_delegate()
which is a wrapper around get_verification_result()
which verifies the keys of the new root metadata. get_verification_result()
checks which of the current root key ID’s the new root metadata has and then verifies the signature against the new root metadata:
for keyid in role.keyids:
try:
key = self.get_key(keyid)
except ValueError:
logger.info("No key for keyid %s", keyid)
continue
if keyid not in signatures:
unsigned[keyid] = key
logger.info("No signature for keyid %s", keyid)
continue
sig = signatures[keyid]
try:
key.verify_signature(sig, payload)
signed[keyid] = key
except sslib_exceptions.UnverifiedSignatureError:
unsigned[keyid] = key
logger.info("Key %s failed to verify %s", keyid, delegated_role)
Specifically, what get_verification_result()
does on line 407 to line 421 is:
- Line 407: Loop through the current root metadatas key IDs.
- Line 409: Get the key corresponding to the current key ID.
- Line 414-415: If the new metadatas signatures do not contain the current key ID, count it as “unsigned”. This is to count the threshold.
- Line 421: The key of the key ID in the current iteration – which is the current Metadatas key – verifies the signature of the new root metadata.
To verify a signature, get_verification_result()
calls verify_signature()
on line 421 and passes a signature and the raw bytes of the new root metadata. verify_signature()
is a method of the securesystemslib library which defines the abstract method here:
@abstractmethod
def verify_signature(self, signature: Signature, data: bytes) -> None:
"""Raises if verification of signature over data fails.
Args:
signature: Signature object.
data: Payload bytes.
Raises:
UnverifiedSignatureError: Failed to verify signature.
VerificationError: Signature verification process error. If you
are only interested in the verify result, just handle
UnverifiedSignatureError: it contains VerificationError as well
"""
raise NotImplementedError
And then implements the methods for different key types here:
- RSA, Ed25519, ECDSA keys: https://github.com/secure-systems-lab/securesystemslib/blob/1188325ee579ac86ed2055cb52c26a5f2dcd869d/securesystemslib/signer/_key.py#L403
- Sigstore: https://github.com/secure-systems-lab/securesystemslib/blob/1188325ee579ac86ed2055cb52c26a5f2dcd869d/securesystemslib/signer/_sigstore_signer.py#L61
- SPHINCS+: https://github.com/secure-systems-lab/securesystemslib/blob/1188325ee579ac86ed2055cb52c26a5f2dcd869d/securesystemslib/signer/_spx_signer.py#L72
- OpenPGP: https://github.com/secure-systems-lab/securesystemslib/blob/1188325ee579ac86ed2055cb52c26a5f2dcd869d/securesystemslib/signer/_gpg_signer.py#L42
Essentially, all verify_signature()
methods verify that a key from the existing root metadata has signed the new metadata.
If that verification passes, that is if the new root metadata has been signed by a threshold of the current root keys, we go back to update_root()
. At this point, these lines pass without errors:
new_root, new_root_bytes, new_root_signatures = self._load_data(
Root, data, self.root
)
if new_root.version != self.root.version + 1:
raise exceptions.BadVersionNumberError(
f"Expected root version {self.root.version + 1}"
f" instead got version {new_root.version}"
)
Python-tuf now proceeds to these two lines:
# Verify that new root is signed by itself
new_root.verify_delegate(Root.type, new_root_bytes, new_root_signatures)
This is similar to what happened earlier, where python-tuf checked whether the current metadatas keys had signed the new root metadata. This time, python-ruf verifies that the new metadatas keys have signed the new metadata.
If they have, then python-tuf proceeds to updating the root metadata to the newly downloaded, and returns update_root()
which brings us back to this part:
for next_version in range(lower_bound, upper_bound):
try:
data = self._download_metadata(
Root.type,
self.config.root_max_length,
next_version,
)
self._trusted_set.update_root(data)
self._persist_metadata(Root.type, data)
At this point, we assume that python-tuf has completed line 332. The final step is line 333:
for next_version in range(lower_bound, upper_bound):
try:
data = self._download_metadata(
Root.type,
self.config.root_max_length,
next_version,
)
self._trusted_set.update_root(data)
self._persist_metadata(Root.type, data)
_persist_metadata()
simply writes the new metadata to disk in an atomic manner.
With that, we have completed the walkthrough of the first step of refresh which is _load_root()
.
Load timestamp metadata
The next step is _load_timestamp()
:
self._load_root()
self._load_timestamp()
self._load_snapshot()
self._load_targets(Targets.type, Root.type)
Here, python-tuf loads the timestamp metadata. It first attempts to load it locally (line 343-348):
def _load_timestamp(self) -> None:
"""Load local and remote timestamp metadata."""
try:
data = self._load_local_metadata(Timestamp.type)
self._trusted_set.update_timestamp(data)
except (OSError, exceptions.RepositoryError) as e:
# Local timestamp does not exist or is invalid
logger.debug("Local timestamp not valid as final: %s", e)
# Load from remote (whether local load succeeded or not)
data = self._download_metadata(
Timestamp.type, self.config.timestamp_max_length
)
try:
self._trusted_set.update_timestamp(data)
except exceptions.EqualVersionNumberError:
# If the new timestamp version is the same as current, discard the
# new timestamp. This is normal and it shouldn't raise any error.
return
self._persist_metadata(Timestamp.type, data)
It then attempts to load it from a remote source (line 351-353).
After that, it updates the timestamp metadata in the trusted set (line 354-359) with a call to update_timestamp()
. This is where the verification of 5.4.2, 5.4.3 and 5.4.4 of the TUF specification happens.
update_timestamp()
should be called after updating the snapshot metadata, and it first checks if that is the case:
if Snapshot.type in self._trusted_set:
raise RuntimeError("Cannot update timestamp after snapshot")
It then checks if the current root metadata is expired:
if self.root.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError("Final root.json is expired")
Next, it parses and verifies the raw metadata bytes using the same _load_data()
helper method as the _load_root()
step used. If you forgot about that, you can refer to it here (URL to the section in the doc).
Assuming that passes, update_timestamp()
then verifies the content of the new timestamp metadata:
# If an existing trusted timestamp is updated,
# check for a rollback attack
if Timestamp.type in self._trusted_set:
# Prevent rolling back timestamp version
if new_timestamp.version < self.timestamp.version:
raise exceptions.BadVersionNumberError(
f"New timestamp version {new_timestamp.version} must"
f" be >= {self.timestamp.version}"
)
# Keep using old timestamp if versions are equal.
if new_timestamp.version == self.timestamp.version:
raise exceptions.EqualVersionNumberError
# Prevent rolling back snapshot version
snapshot_meta = self.timestamp.snapshot_meta
new_snapshot_meta = new_timestamp.snapshot_meta
if new_snapshot_meta.version < snapshot_meta.version:
raise exceptions.BadVersionNumberError(
f"New snapshot version must be >= {snapshot_meta.version}"
f", got version {new_snapshot_meta.version}"
)
First, it checks if the new timestamp metadata’s version is lower than the current timestamp metadata’s version and throws an exception if it is:
if new_timestamp.version < self.timestamp.version:
raise exceptions.BadVersionNumberError(
f"New timestamp version {new_timestamp.version} must"
f" be >= {self.timestamp.version}"
)
Next, it checks if it is the same version and throws an exception if it is:
# Keep using old timestamp if versions are equal.
if new_timestamp.version == self.timestamp.version:
raise exceptions.EqualVersionNumberError
Next, it checks if the version of the snapshot metadata of the new timestamp metadata is lower than the current snapshot metadata version and throws an error if it is:
# Prevent rolling back snapshot version
snapshot_meta = self.timestamp.snapshot_meta
new_snapshot_meta = new_timestamp.snapshot_meta
if new_snapshot_meta.version < snapshot_meta.version:
raise exceptions.BadVersionNumberError(
f"New snapshot version must be >= {snapshot_meta.version}"
f", got version {new_snapshot_meta.version}"
)
Now, python-tuf has verified the timestamp metadata and proceeds to updating its timestamp metadata using the new metadata:
self._trusted_set[Timestamp.type] = new_timestamp
Finally, update_timestamp()
does a last check to see if the timestamp has expired and throws an exception if it has.
If update_timestamp()
has not thrown an exception at this point, update_timestamp()
is complete and the trusted set is updated. Now it is time to update the files on disk which _load_timestamp()
does with a call to _persist_metadata()
:
tuf-python has now updated the timestamp metadata.
Load snapshot metadata
Next, tuf-python’s refresh method updates the snapshot metadata by invoking self._load_snapshot()
:
self._load_root()
self._load_timestamp()
self._load_snapshot()
self._load_targets(Targets.type, Root.type)
This method is very similar to _load_timestamp()
in that it does the following things:
- It attempts to load the local metadata.
- If step 1 fails, it loads the snapshot data from a remote source.
- After having loaded the snapshot metadata,
_load_snapshot()
passes the metadata toupdate_snapshot()
which parses and verifies the metadata and then updates the trusted set.
Verify and update metadata
python-tuf verifies the new snapshot metadata in update_snapshot()
. This method first checks if the timestamp metadata is missing and if the targets metadata already exists locally in the trusted set. If that is the case, then update_snapshot()
raises an exception:
if Timestamp.type not in self._trusted_set:
raise RuntimeError("Cannot update snapshot before timestamp")
if Targets.type in self._trusted_set:
raise RuntimeError("Cannot update snapshot after targets")
We will see in a bit why update_snapshot()
requires the timestamp metadata to first be updated before updating the snapshot metadata.
Next, update_snapshot()
checks if the timestamp has expired:
self._check_final_timestamp()
Next, update_snapshot()
invokes a method called verify_length_and_hashes()
and passes the data to it:
snapshot_meta = self.timestamp.snapshot_meta
# Verify non-trusted data against the hashes in timestamp, if any.
# Trusted snapshot data has already been verified once.
if not trusted:
snapshot_meta.verify_length_and_hashes(data)
Here we see why the tuf client must first update the timestamp before updating the snapshot.
On line 311, it assigns self.timestamp.snapshot_metadata to the snapshot_meta variable. The timestamp metadata file has a field called “meta” includes a list of hashes, a “length” value and a version number. This is a reference to the new snapshot metadata, which the client can use to verify the new snapshot metadata, because at this point, it has downloaded the newer timestamp metadata. An example of the timestamp metadata file is available in the documentation, that shows how it references the snapshot metadata:
We have highlighted the “meta” content in the green box.
This is the data that the tuf client now uses to verify the length and hashes of the new snapshot metadata. On the current lines in the execution, “snapshot_meta” is the data in the “meta” field in timestamp.json, and “data” is the new snapshot metadata:
snapshot_meta = self.timestamp.snapshot_meta
# Verify non-trusted data against the hashes in timestamp, if any.
# Trusted snapshot data has already been verified once.
if not trusted:
snapshot_meta.verify_length_and_hashes(data)
verify_length_and_hashes()
first does a simple length comparison between the metadata bytes and the meta[“snapshot.json”][“length”]
value of the timestamp.json
file on these lines.
Python-tuf verifies the hashes in _verify_hashes()
. This method loops through the hashes in the timestamp metadatas meta[“snapshot.json”][“hashes”]
and does the following with each hash:
It first creates a digest_object on line 671 or 675 and uses it to hash the new snapshot metadata. If the snapshot metadata are raw bytes, they are hashed on line 672, and if they are in a file format, they are hashed on line 675.
It then creates the hexadecimal representation of the hashed data on line 684.
It then compares the hexademical representation with the hashvalue in the current loop iteration.
In other words, in each loop iteration, it hashes the new raw snapshot bytes using the same algorithm in the current loop iteration and then compares the hexadecimal representation with the hexadecimal value in meta[“snapshot.json”][“hashes”]
. The code is available below:
def _verify_hashes(
data: Union[bytes, IO[bytes]], expected_hashes: Dict[str, str]
) -> None:
"""Verify that the hash of ``data`` matches ``expected_hashes``."""
is_bytes = isinstance(data, bytes)
for algo, exp_hash in expected_hashes.items():
try:
if is_bytes:
digest_object = sslib_hash.digest(algo)
digest_object.update(data)
else:
# if data is not bytes, assume it is a file object
digest_object = sslib_hash.digest_fileobject(data, algo)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
raise LengthOrHashMismatchError(
f"Unsupported algorithm '{algo}'"
) from e
observed_hash = digest_object.hexdigest()
if observed_hash != exp_hash:
raise LengthOrHashMismatchError(
f"Observed hash {observed_hash} does not match "
f"expected hash {exp_hash}"
)
If the comparison fails on line 685, _verify_hashes()
raises an exception. This cannot happen for the verification to succeed.
If _verify_hashes()
passes without raising an exception, update_snapshot()
has verified the length and the hashes of the new snapshot metadata, and execution continues.
update_snapshot()
now parses the new snapshot data, verifies that the version of each of the new snapshot files is the same or later as the same file in the existing metadata, and verifies that all existing snapshot files exist in the new snapshot metadata:
new_snapshot, _, _ = self._load_data(Snapshot, data, self.root)
# version not checked against meta version to allow old snapshot to be
# used in rollback protection: it is checked when targets is updated
# If an existing trusted snapshot is updated, check for rollback attack
if Snapshot.type in self._trusted_set:
for filename, fileinfo in self.snapshot.meta.items():
new_fileinfo = new_snapshot.meta.get(filename)
# Prevent removal of any metadata in meta
if new_fileinfo is None:
raise exceptions.RepositoryError(
f"New snapshot is missing info for '{filename}'"
)
# Prevent rollback of any metadata versions
if new_fileinfo.version < fileinfo.version:
raise exceptions.BadVersionNumberError(
f"Expected {filename} version "
f"{new_fileinfo.version}, got {fileinfo.version}."
)
The snapshot files are listed in snapshot.json
in “meta”:
https://theupdateframework.github.io/specification/v1.0.28/index.html#file-formats-snapshot
Each of the listed files have a version. If we consider the above json to be the existing snapshot metadata file, then the next snapshot metadata must have the same files with a version higher than 1 for all three files (targets.json
, project1.json
and project2.json
).
If this verification passes, then update_snapshot()
loads the new snapshot metadata to the trusted set and checks if it has expired. If it hasn’t expired, then it returns without raising an exception, and _load_snapshot()
stores the new metadata on disk. At this point, the refresh cycle has completed downloading the new snapshot metadata and has successfully verified it.