Skip to content
Open
235 changes: 227 additions & 8 deletions .github/scripts/dependency_age.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import sys
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
Expand All @@ -18,7 +19,6 @@
DEFAULT_MIN_AGE_HOURS = 48



@dataclass(frozen=True)
class Candidate:
version: str
Expand All @@ -28,6 +28,7 @@ class Candidate:
# Entry point for GitHub Actions workflows
# select-gradle: get newest Gradle release that is at least MIN_DEPENDENCY_AGE_HOURS hours old
# select-maven: get newest Maven artifact release that is at least MIN_DEPENDENCY_AGE_HOURS hours old
# validate-lockfiles: check that each new coordinate in the Gradle lockfiles is at least MIN_DEPENDENCY_AGE_HOURS hours old
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Dependency age helpers for GitHub workflows.")
subparsers = parser.add_subparsers(dest="command", required=True)
Expand All @@ -50,6 +51,15 @@ def parse_args() -> argparse.Namespace:
help="Case-insensitive regex fragment used to exclude prerelease versions.",
)

validate = subparsers.add_parser("validate-lockfiles", help="Validate age of new coordinates in Gradle lockfiles.")
validate.add_argument("--baseline-dir", required=True)
validate.add_argument("--current-dir", default=".")
validate.add_argument("--metadata-file", help="JSON file mapping group:artifact:version to a timestamp override.")
validate.add_argument("--search-url", default=MAVEN_SEARCH_URL)
validate.add_argument("--min-age-hours", type=int, default=default_min_age_hours())
validate.add_argument("--now")
validate.add_argument("--github-output", default=None)

return parser.parse_args()


Expand Down Expand Up @@ -98,7 +108,7 @@ def parse_datetime(value: Any) -> datetime:
except ValueError:
pass

# ISO 8601: normalise Z and +HHMM +HH:MM for fromisoformat
# ISO 8601: normalise Z and +HHMM -> +HH:MM for fromisoformat
text = re.sub(r"([+-])(\d{2})(\d{2})$", r"\1\2:\3", text.replace("Z", "+00:00"))
return datetime.fromisoformat(text).astimezone(timezone.utc)

Expand All @@ -120,8 +130,12 @@ def emit_outputs(outputs: dict[str, Any], github_output: str | None) -> None:
print(line)
if github_output:
with open(github_output, "a", encoding="utf-8") as handle:
for line in lines:
handle.write(f"{line}\n")
for key, value in outputs.items():
text = "" if value is None else str(value)
if "\n" in text:
handle.write(f"{key}<<__EOF__\n{text}\n__EOF__\n")
else:
handle.write(f"{key}={text}\n")


# load JSON from file or URL
Expand Down Expand Up @@ -156,7 +170,6 @@ def select_gradle_release(args: argparse.Namespace) -> int:

return emit_selection_result(
label="Gradle",
cutoff=cutoff,
github_output=args.github_output,
candidates=candidates,
not_found_reason=(
Expand Down Expand Up @@ -189,7 +202,6 @@ def select_maven_release(args: argparse.Namespace) -> int:

return emit_selection_result(
label=f"{args.group_id}:{args.artifact_id}",
cutoff=cutoff,
github_output=args.github_output,
candidates=candidates,
not_found_reason=(
Expand Down Expand Up @@ -246,7 +258,7 @@ def load_maven_documents(
return docs


# parse a version string into a tuple of ints for numeric comparison (e.g. "3.9.11" → (3, 9, 11))
# parse a version string into a sortable tuple for comparison; numeric segments sort before non-numeric
def _version_sort_key(version: str) -> tuple:
segments = []
for segment in re.split(r"([.\-])", version):
Expand All @@ -272,7 +284,6 @@ def _version_sort_key(version: str) -> tuple:
def emit_selection_result(
*,
label: str,
cutoff: datetime,
github_output: str | None,
candidates: list[Candidate],
not_found_reason: str,
Expand Down Expand Up @@ -336,12 +347,220 @@ def emit_selection_result(
return 0


# check that every new coordinate in the Gradle lockfiles is at least min_age_hours old
def validate_lockfiles(args: argparse.Namespace) -> int:
cutoff = now_utc(args.now) - timedelta(hours=args.min_age_hours)
baseline_dir = Path(args.baseline_dir)
current_dir = Path(args.current_dir)
metadata = load_metadata_overrides(args.metadata_file)

# Guard against a silent snapshot failure: if baseline is empty but current has lockfiles,
# every coordinate would appear "new" and the age check would be meaningless
baseline_has_lockfiles = baseline_dir.exists() and any(baseline_dir.rglob("gradle.lockfile"))
current_has_lockfiles = any(current_dir.rglob("gradle.lockfile"))
if not baseline_has_lockfiles and current_has_lockfiles:
print("::error::Baseline has no lockfiles but current directory does — the snapshot step may have failed.")
emit_outputs({"cutoff_at": format_datetime(cutoff), "reverted_files": 0}, args.github_output)
return 1

changed = changed_lockfile_coordinates(baseline_dir=baseline_dir, current_dir=current_dir)
if not changed:
print("No dependency version changes detected across Gradle lockfiles.")
emit_outputs({"cutoff_at": format_datetime(cutoff), "reverted_files": 0}, args.github_output)
return 0

changed_by_file: dict[str, list[str]] = {}
for relative_path, gav in changed:
changed_by_file.setdefault(relative_path, []).append(gav)

timestamp_cache: dict[str, tuple[datetime | None, str | None]] = {}
too_new = "too_new"
unverified = "unverified"
violations_by_file: dict[str, list[tuple[str, str]]] = {}
for relative_path, gavs in sorted(changed_by_file.items()):
for gav in gavs:
if gav not in timestamp_cache:
timestamp_cache[gav] = resolve_gav_timestamp(gav=gav, metadata=metadata, search_url=args.search_url)
published_at, reason = timestamp_cache[gav]
if published_at is None:
violations_by_file.setdefault(relative_path, []).append((gav, unverified))
elif published_at > cutoff:
violations_by_file.setdefault(relative_path, []).append((gav, too_new))
else:
print(f"Verified {gav} (published {format_datetime(published_at)}, cutoff {format_datetime(cutoff)})")

if violations_by_file:
revert_lockfiles_to_baseline(violations_by_file=violations_by_file, baseline_dir=baseline_dir, current_dir=current_dir)
Comment thread
amarziali marked this conversation as resolved.
for relative_path, entries in sorted(violations_by_file.items()):
for gav, kind in entries:
print(f"::warning file={relative_path}::{gav}: {'Cannot verify age' if kind == unverified else 'Too new'}. Reverted lockfile to baseline.")

reverted_files = len(violations_by_file)
summary = build_validation_summary(violations_by_file=violations_by_file, min_age_hours=args.min_age_hours)
emit_outputs({"cutoff_at": format_datetime(cutoff), "reverted_files": reverted_files, "summary": summary}, args.github_output)
print(f"Validated {len(changed)} changed coordinate(s) across {len(changed_by_file)} lockfile(s). {reverted_files} lockfile(s) reverted.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth listing on the PR what which artifact caused the push back?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally. Done in 5abefc4!

return 0


# build summary of reverted dependencies for PR descriptions
def build_validation_summary(*, violations_by_file: dict[str, list[tuple[str, str]]], min_age_hours: int) -> str:
if not violations_by_file:
return ""
summary_messages = {
"too_new": f"Did not meet {min_age_hours}h dependency age requirement",
"unverified": "Cannot verify age in Maven Central",
}
lines = [
f"## Dependency age policy",
f"",
f"The following dependencies were reverted:",
f"",
]
# deduplicate
seen: set[str] = set()
for entries in violations_by_file.values():
for gav, kind in entries:
if gav not in seen:
seen.add(gav)
lines.append(f"- `{gav}` — {summary_messages[kind]}")
return "\n".join(lines)


# restore each violating lockfile to its baseline copy to keep the file consistent
def revert_lockfiles_to_baseline(
*,
violations_by_file: dict[str, list[tuple[str, str]]],
baseline_dir: Path,
current_dir: Path,
) -> None:
for relative_path in sorted(violations_by_file):
current_path = current_dir / relative_path
baseline_path = baseline_dir / relative_path
if baseline_path.exists():
current_path.write_text(baseline_path.read_text(encoding="utf-8"), encoding="utf-8")
print(f"Reverted {relative_path} to baseline.")
else:
current_path.unlink(missing_ok=True)
print(f"Removed new lockfile {relative_path} (no baseline copy to restore).")


# look up the publish timestamp for a group:artifact:version coordinate in Maven Central
# returns (datetime, None) on success, (None, reason) when the timestamp cannot be determined
# retries once on any error
def resolve_gav_timestamp(
*,
gav: str,
metadata: dict[str, Any],
search_url: str,
) -> tuple[datetime | None, str | None]:
if gav in metadata:
return parse_metadata_override(gav, metadata[gav])

group_id, artifact_id, version = gav.split(":", 2)
query = urllib.parse.urlencode({
"q": f'g:"{group_id}" AND a:"{artifact_id}" AND v:"{version}"',
"core": "gav",
"rows": 20,
"wt": "json",
})
url = f"{search_url}?{query}"
payload = None
fetch_error = None
for attempt in range(2):
try:
payload = load_json(None, url)
fetch_error = None
break
except urllib.error.HTTPError as exc:
fetch_error = f"Maven Central search failed (HTTP {exc.code})."
except (urllib.error.URLError, TimeoutError, OSError, ValueError):
fetch_error = "Maven Central search was unreachable."

if fetch_error:
return None, fetch_error

for doc in payload.get("response", {}).get("docs", []):
if doc.get("v") != version:
continue
timestamp = doc.get("timestamp")
if timestamp is None:
return None, "Maven Central search result did not include a publish timestamp."
try:
return parse_datetime(timestamp), None
except (ValueError, TypeError) as exc:
return None, f"Maven Central returned an unparseable timestamp for {gav}: {exc}"
return None, f"{gav} was not found in Maven Central. Add an entry in --metadata-file to bypass."


# load optional metadata overrides from a JSON file (group:artifact:version -> timestamp)
def load_metadata_overrides(path: str | None) -> dict[str, Any]:
if not path:
return {}
return load_json(path, None)


# parse a single metadata override value: a timestamp string/number, or a dict with a timestamp key
def parse_metadata_override(gav: str, override: Any) -> tuple[datetime | None, str | None]:
if isinstance(override, dict):
for key in ("timestamp", "published_at", "timestamp_ms"):
if key in override:
try:
return parse_datetime(override[key]), None
except (ValueError, TypeError) as exc:
return None, f"Metadata override for {gav} has an invalid timestamp: {exc}"
return None, f"Metadata override for {gav} is missing a timestamp key (expected: timestamp, published_at, or timestamp_ms)."
if isinstance(override, (int, float, str)):
try:
return parse_datetime(override), None
except (ValueError, TypeError) as exc:
return None, f"Metadata override for {gav} has an invalid timestamp: {exc}"
return None, f"Unsupported metadata override format for {gav}."


# diff baseline and current lockfile directories; return (relative_path, gav) for each added or changed coordinate
def changed_lockfile_coordinates(*, baseline_dir: Path, current_dir: Path) -> list[tuple[str, str]]:
changed: list[tuple[str, str]] = []
baseline_lockfiles = collect_lockfiles(baseline_dir)
current_lockfiles = collect_lockfiles(current_dir)
for relative_path in sorted(set(baseline_lockfiles) | set(current_lockfiles)):
before = baseline_lockfiles.get(relative_path, set())
after = current_lockfiles.get(relative_path, set())
for gav in sorted(after - before):
changed.append((relative_path, gav))
return changed


# recursively find all gradle.lockfile paths under root and parse them into sets of coordinates
def collect_lockfiles(root: Path) -> dict[str, set[str]]:
if not root.exists():
return {}
return {
str(path.relative_to(root)): parse_lockfile(path)
for path in root.rglob("gradle.lockfile")
}


# parse a lockfile into a set of group:artifact:version coordinates (skipping comments and empty lines)
def parse_lockfile(path: Path) -> set[str]:
coordinates: set[str] = set()
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
coordinate = line.split("=", 1)[0]
if coordinate.count(":") == 2:
coordinates.add(coordinate)
return coordinates


def main() -> int:
args = parse_args()
if args.command == "select-gradle":
return select_gradle_release(args)
if args.command == "select-maven":
return select_maven_release(args)
if args.command == "validate-lockfiles":
return validate_lockfiles(args)
raise ValueError(f"Unsupported command: {args.command}")


Expand Down
Loading