diff --git a/backend/.env.dist.local b/backend/.env.dist.local index eb67a512a7..680b6275d8 100755 --- a/backend/.env.dist.local +++ b/backend/.env.dist.local @@ -185,9 +185,9 @@ ENRICHER_BATCH_SIZE=100 ENRICHER_REPO_UPDATE_INTERVAL_HOURS=24 ENRICHER_IDLE_SLEEP_SEC=60 -OSSPCKGS_GCP_PROJECT= -OSSPCKGS_GCS_BUCKET= -OSSPCKGS_GCP_CREDENTIALS_B64= +OSSPCKGS_GCP_PROJECT=local-dev +OSSPCKGS_GCS_BUCKET=local-dev +OSSPCKGS_GCP_CREDENTIALS_B64=e30= # osv-sync (Temporal-scheduled; see services/apps/packages_worker/src/osv/schedule.ts) # OSV_ECOSYSTEMS uses OSV's canonical bucket case (npm lowercase, Maven titlecase) because @@ -199,3 +199,12 @@ OSV_ECOSYSTEMS=npm,Maven OSV_TMP_DIR=/tmp/osv OSV_BATCH_SIZE=500 OSV_DERIVE_BATCH_SIZE=1000 +# maven enricher + +POM_FETCHER_BATCH_SIZE=2000 +POM_FETCHER_CONCURRENCY=10 +POM_FETCHER_NON_CRITICAL_BATCH_SIZE=500 +POM_FETCHER_NON_CRITICAL_CONCURRENCY=20 +POM_FETCHER_REFRESH_DAYS=1 +POM_FETCHER_GROUP_DELAY_MS=100 +POM_FETCHER_MAVEN_BASE_URL=https://maven-central.storage-download.googleapis.com/maven2 \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 78422e559e..2e5b4aff0a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1336,6 +1336,12 @@ importers: semver: specifier: ^7.6.0 version: 7.6.0 + axios: + specifier: ^1.16.1 + version: 1.16.1 + fast-xml-parser: + specifier: ^5.8.0 + version: 5.8.0 tsx: specifier: ^4.7.1 version: 4.7.3 @@ -3744,6 +3750,9 @@ packages: '@nangohq/types@0.69.22': resolution: {integrity: sha512-3p7KMZ3GDXrt+wo5BKn/ouEX93TPTBtHRzFWq8AIRLl9aaOi3T0CraHz94NlHye1od5N2mWeN04sCu9f4WTyxA==} + '@nodable/entities@2.1.0': + resolution: {integrity: sha512-nyT7T3nbMyBI/lvr6L5TyWbFJAI9FTgVRakNoBqCD+PmID8DzFrrNdLLtHMwMszOtqZa8PAOV24ZqDnQrhQINA==} + '@nodelib/fs.scandir@2.1.5': resolution: {integrity: sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==} engines: {node: '>= 8'} @@ -5391,6 +5400,9 @@ packages: axios@1.13.5: resolution: {integrity: sha512-cz4ur7Vb0xS4/KUN0tPWe44eqxrIu31me+fbang3ijiNscE129POzipJJA6zniq2C/Z6sJCjMimjS8Lc/GAs8Q==} + axios@1.16.1: + resolution: {integrity: sha512-caYkukvroVPO8KrzuJEb50Hm07KwfBZPEC3VeFHTsqWHvKTsy54hjJz9BS/cdaypROE2rH6xvm9mHX4fgWkr3A==} + axios@1.6.8: resolution: {integrity: sha512-v/ZHtJDU39mDpyBoFVkETcd/uNdxrWRrg3bKpOKzXFA6Bvqopts6ALSMU3y6ijYxbw2B+wPrIv46egTzJXCLGQ==} @@ -6646,6 +6658,9 @@ packages: fast-uri@3.0.6: resolution: {integrity: sha512-Atfo14OibSv5wAp4VWNsFYE1AchQRTv9cBGWET4pZWHzYshFSS9NQI6I57rdKn9croWVMbYFbLhJ+yJvmZIIHw==} + fast-xml-builder@1.2.0: + resolution: {integrity: sha512-00aAWieqff+ZJhsXA4g1g7M8k+7AYoMUUHF+/zFb5U6Uv/P0Vl4QZo84/IcufzYalLuEj9928bXN9PbbFzMF0Q==} + fast-xml-parser@4.2.5: resolution: {integrity: sha512-B9/wizE4WngqQftFPmdaMYlXoJlJOYxGQOanC77fq9k8+Z0v5dDSVh+3glErdIROP//s/jgb7ZuxKfB8nVyo0g==} hasBin: true @@ -6658,6 +6673,10 @@ packages: resolution: {integrity: sha512-JeaA2Vm9ffQKp9VjvfzObuMCjUYAp5WDYhRYL5LrBPY/jUDlUtOvDfot0vKSkB9tuX885BDHjtw4fZadD95wnA==} hasBin: true + fast-xml-parser@5.8.0: + resolution: {integrity: sha512-6bIM7fsJxeo3uXv7OncQYsBAMPJ7V16Slahl/6M98C/i2q+vB1+4a0MtrvYwDFEUrwDSbAmeLDRXsOBwrL7yAg==} + hasBin: true + fastest-levenshtein@1.0.16: resolution: {integrity: sha512-eRnCtTTtGZFpQCwhJiUOuxPQWRXVKYDn0b2PeHfXL6/Zi53SLAzAHfVhVWK2AryC/WH05kGfxhFIPvTF0SXQzg==} engines: {node: '>= 4.9.1'} @@ -6752,6 +6771,15 @@ packages: debug: optional: true + follow-redirects@1.16.0: + resolution: {integrity: sha512-y5rN/uOsadFT/JfYwhxRS5R7Qce+g3zG97+JrtFZlC9klX/W5hD7iiLzScI4nZqUS7DNUdhPgw4xI8W2LuXlUw==} + engines: {node: '>=4.0'} + peerDependencies: + debug: '*' + peerDependenciesMeta: + debug: + optional: true + for-each@0.3.3: resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==} @@ -8569,6 +8597,10 @@ packages: resolution: {integrity: sha512-RjhtfwJOxzcFmNOi6ltcbcu4Iu+FL3zEj83dk4kAS+fVpTxXLO1b38RvJgT/0QwvV/L3aY9TAnyv0EOqW4GoMQ==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + path-expression-matcher@1.5.0: + resolution: {integrity: sha512-cbrerZV+6rvdQrrD+iGMcZFEiiSrbv9Tfdkvnusy6y0x0GKBXREFg/Y65GhIfm0tnLntThhzCnfKwp1WRjeCyQ==} + engines: {node: '>=14.0.0'} + path-is-absolute@1.0.1: resolution: {integrity: sha512-AVbw3UJ2e9bq64vSaS9Am0fje1Pa8pbGqTTsmXfaIiMpnr5DlDhfJOuLj9Sf95ZPVDAUerDfEk88MPmPe7UCQg==} engines: {node: '>=0.10.0'} @@ -8798,6 +8830,10 @@ packages: proxy-from-env@1.1.0: resolution: {integrity: sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==} + proxy-from-env@2.1.0: + resolution: {integrity: sha512-cJ+oHTW1VAEa8cJslgmUZrc+sjRKgAKl3Zyse6+PV38hZe/V6Z14TbCuXcan9F9ghlz4QrFr2c92TNF82UkYHA==} + engines: {node: '>=10'} + pseudomap@1.0.2: resolution: {integrity: sha512-b/YwNhb8lk1Zz2+bXXpS/LK9OisiZZ1SNsSLxN1x2OXVEhW2Ckr/7mWE5vrC1ZTiJlD9g19jWszTmJsB+oEpFQ==} @@ -9497,6 +9533,9 @@ packages: strnum@2.1.2: resolution: {integrity: sha512-l63NF9y/cLROq/yqKXSLtcMeeyOfnSQlfMSlzFt/K73oIaD8DGaQWd7Z34X9GPiKqP5rbSh84Hl4bOlLcjiSrQ==} + strnum@2.3.0: + resolution: {integrity: sha512-ums3KNd42PGyx5xaoVTO1mjU1bH3NpY4vsrVlnv9PNGqQj8wd7rJ6nEypLrJ7z5vxK5RP0yMLo6J/Gsm62DI5Q==} + stubs@3.0.0: resolution: {integrity: sha512-PdHt7hHUJKxvTCgbKX9C1V/ftOcjJQgz8BZwNfV5c4B6dcGqlpelTbJ999jBGZ2jYiPAwcX5dP6oBwVlBlUbxw==} @@ -10164,6 +10203,10 @@ packages: resolution: {integrity: sha512-PSNhEJDejZYV7h50BohL09Er9VaIefr2LMAf3OEmpCkjOi34eYyQYAXUTjEQtZJTKcF0E2UKTh+osDLsgNim9Q==} engines: {node: '>=8'} + xml-naming@0.1.0: + resolution: {integrity: sha512-k8KO9hrMyNk6tUWqUfkTEZbezRRpONVOzUTnc97VnCvyj6Tf9lyUR9EDAIeiVLv56jsMcoXEwjW8Kv5yPY52lw==} + engines: {node: '>=16.0.0'} + xml2js@0.4.19: resolution: {integrity: sha512-esZnJZJOiJR9wWKMyuvSE1y6Dq5LCuJanqhxslH2bxM6duahNZ+HMpCLhBQGZkbX6xRf8x1Y2eJlgt2q3qo49Q==} @@ -10368,8 +10411,8 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sso-oidc': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -10563,11 +10606,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.572.0(@aws-sdk/client-sts@3.572.0)': + '@aws-sdk/client-sso-oidc@3.572.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -10606,7 +10649,6 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: - - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso@3.556.0': @@ -10782,11 +10824,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sts@3.572.0': + '@aws-sdk/client-sts@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -10825,6 +10867,7 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: + - '@aws-sdk/client-sso-oidc' - aws-crt '@aws-sdk/client-sts@3.985.0': @@ -10990,7 +11033,7 @@ snapshots: '@aws-sdk/credential-provider-ini@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/credential-provider-env': 3.568.0 '@aws-sdk/credential-provider-process': 3.572.0 '@aws-sdk/credential-provider-sso': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) @@ -11167,7 +11210,7 @@ snapshots: '@aws-sdk/credential-provider-web-identity@3.568.0(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/types': 2.12.0 @@ -11479,7 +11522,7 @@ snapshots: '@aws-sdk/token-providers@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/shared-ini-file-loader': 2.4.0 @@ -12423,6 +12466,8 @@ snapshots: transitivePeerDependencies: - debug + '@nodable/entities@2.1.0': {} + '@nodelib/fs.scandir@2.1.5': dependencies: '@nodelib/fs.stat': 2.0.5 @@ -12927,7 +12972,7 @@ snapshots: '@sendgrid/client@8.1.3': dependencies: '@sendgrid/helpers': 8.0.0 - axios: 1.13.1 + axios: 1.13.5 transitivePeerDependencies: - debug @@ -12958,7 +13003,7 @@ snapshots: '@slack/types': 2.11.0 '@types/is-stream': 1.1.0 '@types/node': 20.12.7 - axios: 1.11.0 + axios: 1.13.5 eventemitter3: 3.1.2 form-data: 2.5.1 is-electron: 2.2.2 @@ -14314,7 +14359,7 @@ snapshots: agent-base@6.0.2: dependencies: - debug: 4.4.0(supports-color@5.5.0) + debug: 4.4.3 transitivePeerDependencies: - supports-color @@ -14550,7 +14595,7 @@ snapshots: axios@0.21.4: dependencies: - follow-redirects: 1.15.6 + follow-redirects: 1.15.11 transitivePeerDependencies: - debug @@ -14571,8 +14616,8 @@ snapshots: axios@1.12.0: dependencies: - follow-redirects: 1.15.6 - form-data: 4.0.4 + follow-redirects: 1.15.11 + form-data: 4.0.5 proxy-from-env: 1.1.0 transitivePeerDependencies: - debug @@ -14593,6 +14638,16 @@ snapshots: transitivePeerDependencies: - debug + axios@1.16.1: + dependencies: + follow-redirects: 1.16.0 + form-data: 4.0.5 + https-proxy-agent: 5.0.1 + proxy-from-env: 2.1.0 + transitivePeerDependencies: + - debug + - supports-color + axios@1.6.8: dependencies: follow-redirects: 1.15.6 @@ -16147,6 +16202,11 @@ snapshots: fast-uri@3.0.6: {} + fast-xml-builder@1.2.0: + dependencies: + path-expression-matcher: 1.5.0 + xml-naming: 0.1.0 + fast-xml-parser@4.2.5: dependencies: strnum: 1.0.5 @@ -16159,6 +16219,14 @@ snapshots: dependencies: strnum: 2.1.2 + fast-xml-parser@5.8.0: + dependencies: + '@nodable/entities': 2.1.0 + fast-xml-builder: 1.2.0 + path-expression-matcher: 1.5.0 + strnum: 2.3.0 + xml-naming: 0.1.0 + fastest-levenshtein@1.0.16: {} fastq@1.17.1: @@ -16260,6 +16328,8 @@ snapshots: follow-redirects@1.15.6: {} + follow-redirects@1.16.0: {} + for-each@0.3.3: dependencies: is-callable: 1.2.7 @@ -16790,7 +16860,7 @@ snapshots: dependencies: '@tootallnate/once': 2.0.0 agent-base: 6.0.2 - debug: 4.4.0(supports-color@5.5.0) + debug: 4.4.3 transitivePeerDependencies: - supports-color @@ -16806,7 +16876,7 @@ snapshots: https-proxy-agent@5.0.1: dependencies: agent-base: 6.0.2 - debug: 4.4.0(supports-color@5.5.0) + debug: 4.4.3 transitivePeerDependencies: - supports-color @@ -18172,6 +18242,8 @@ snapshots: path-exists@5.0.0: {} + path-expression-matcher@1.5.0: {} + path-is-absolute@1.0.1: {} path-key@2.0.1: {} @@ -18209,7 +18281,7 @@ snapshots: peopledatalabs@6.1.5: dependencies: - axios: 1.11.0 + axios: 1.13.5 copy-anything: 3.0.5 transitivePeerDependencies: - debug @@ -18379,6 +18451,8 @@ snapshots: proxy-from-env@1.1.0: {} + proxy-from-env@2.1.0: {} + pseudomap@1.0.2: {} pstree.remy@1.1.8: {} @@ -18614,7 +18688,7 @@ snapshots: retry-request@4.2.2: dependencies: - debug: 4.4.0(supports-color@5.5.0) + debug: 4.4.3 extend: 3.0.2 transitivePeerDependencies: - supports-color @@ -19254,6 +19328,8 @@ snapshots: strnum@2.1.2: {} + strnum@2.3.0: {} + stubs@3.0.0: {} superagent@8.1.2: @@ -20069,6 +20145,8 @@ snapshots: xdg-basedir@4.0.0: {} + xml-naming@0.1.0: {} + xml2js@0.4.19: dependencies: sax: 1.2.1 diff --git a/scripts/builders/packages-worker.env b/scripts/builders/packages-worker.env new file mode 100644 index 0000000000..038dcb5575 --- /dev/null +++ b/scripts/builders/packages-worker.env @@ -0,0 +1,4 @@ +DOCKERFILE="./services/docker/Dockerfile.packages" +CONTEXT="../" +REPO="sjc.ocir.io/axbydjxa5zuh/packages-worker" +SERVICES="maven-worker" diff --git a/scripts/services/maven-worker.yaml b/scripts/services/maven-worker.yaml new file mode 100644 index 0000000000..c0142c22fd --- /dev/null +++ b/scripts/services/maven-worker.yaml @@ -0,0 +1,67 @@ +version: '3.1' + +x-env-args: &env-args + DOCKER_BUILDKIT: 1 + NODE_ENV: docker + SERVICE: maven-worker + CROWD_TEMPORAL_TASKQUEUE: packages-worker + CROWD_TEMPORAL_NAMESPACE: ${CROWD_PACKAGES_TEMPORAL_NAMESPACE} + SHELL: /bin/sh + SUPPRESS_NO_CONFIG_WARNING: 'true' + +services: + maven-worker: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.packages-worker + command: 'pnpm run start:maven-worker' + working_dir: /usr/crowd/app/services/apps/packages_worker + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + restart: always + networks: + - crowd-bridge + + maven-worker-dev: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.packages-worker + command: 'pnpm run dev:maven-worker' + working_dir: /usr/crowd/app/services/apps/packages_worker + # user: '${USER_ID}:${GROUP_ID}' + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + hostname: maven-worker + networks: + - crowd-bridge + volumes: + - ../../services/libs/audit-logs/src:/usr/crowd/app/services/libs/audit-logs/src + - ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src + - ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src + - ../../services/libs/data-access-layer/src:/usr/crowd/app/services/libs/data-access-layer/src + - ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src + - ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src + - ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src + - ../../services/libs/nango/src:/usr/crowd/app/services/libs/nango/src + - ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src + - ../../services/libs/queue/src:/usr/crowd/app/services/libs/queue/src + - ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src + - ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src + - ../../services/libs/telemetry/src:/usr/crowd/app/services/libs/telemetry/src + - ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src + - ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src + - ../../services/apps/packages_worker/src:/usr/crowd/app/services/apps/packages_worker/src + +networks: + crowd-bridge: + external: true diff --git a/services/apps/packages_worker/package.json b/services/apps/packages_worker/package.json index 6ad988b6e4..7d5ba86dd0 100644 --- a/services/apps/packages_worker/package.json +++ b/services/apps/packages_worker/package.json @@ -5,18 +5,20 @@ "start:deps-dev-ingest": "CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest tsx src/bin/deps-dev-ingest.ts", "start:github-repos-enricher": "SERVICE=github-repos-enricher tsx src/bin/github-repos-enricher.ts", "start:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker tsx src/bin/packages-worker.ts", + "start:maven-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=maven-worker tsx src/bin/maven-worker.ts", + "backfill:maven:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=maven LOG_LEVEL=info tsx src/bin/maven-backfill.ts", + "backfill:maven": "SERVICE=maven tsx src/bin/maven-backfill.ts", + "dev:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts", + "dev:maven-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=maven-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/maven-worker.ts", "dev:deps-dev-ingest": "CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/deps-dev-ingest.ts", - "dev:deps-dev-ingest:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/deps-dev-ingest.ts", "dev:github-repos-enricher": "SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts", - "dev:github-repos-enricher:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts", - "dev:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts", "dev:packages-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts", + "dev:maven-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=maven-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/maven-worker.ts", + "dev:deps-dev-ingest:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/deps-dev-ingest.ts", + "dev:github-repos-enricher:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts", "export-to-bucket": "SERVICE=deps-dev-ingest tsx src/scripts/exportToBucket.ts", "export-to-bucket:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=deps-dev-ingest tsx src/scripts/exportToBucket.ts", - "monitor:osspckgs": "SERVICE=monitor tsx src/scripts/monitorOsspckgs.ts", - "monitor:osspckgs:local": "bash -c 'set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=monitor tsx src/scripts/monitorOsspckgs.ts'", - "trigger-bootstrap": "SERVICE=deps-dev-ingest tsx src/scripts/triggerBootstrap.ts", - "trigger-bootstrap:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=deps-dev-ingest tsx src/scripts/triggerBootstrap.ts", + "monitor:osspckgs:local": "bash -c 'set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && node ../../../scripts/monitor-osspckgs.mjs'", "lint": "npx eslint --ext .ts src --max-warnings=0", "format": "npx prettier --write \"src/**/*.ts\"", "format-check": "npx prettier --check .", @@ -38,6 +40,8 @@ "@temporalio/workflow": "~1.11.8", "jsonwebtoken": "^9.0.0", "semver": "^7.6.0", + "axios": "^1.16.1", + "fast-xml-parser": "^5.8.0", "tsx": "^4.7.1", "typescript": "^5.6.3", "unzipper": "^0.12.3" diff --git a/services/apps/packages_worker/src/activities.ts b/services/apps/packages_worker/src/activities.ts index 89f75f54c3..988f6d5c64 100644 --- a/services/apps/packages_worker/src/activities.ts +++ b/services/apps/packages_worker/src/activities.ts @@ -1,3 +1,4 @@ export * from './deps-dev/activities' export * from './npm/activities' export { osvSyncEcosystem, osvDeriveCriticalFlag } from './osv/activities' +export { processMavenCriticalBatch, processMavenNonCriticalBatch } from './maven/activities' diff --git a/services/apps/packages_worker/src/bin/maven-backfill.ts b/services/apps/packages_worker/src/bin/maven-backfill.ts new file mode 100644 index 0000000000..6550feeefc --- /dev/null +++ b/services/apps/packages_worker/src/bin/maven-backfill.ts @@ -0,0 +1,42 @@ +import { getServiceLogger } from '@crowd/logging' + +import { getMavenConfig } from '../config' +import { getPackagesDb } from '../db' +import { runMavenCriticalBackfill } from '../maven/runMavenEnrichmentLoop' + +const log = getServiceLogger() + +let shuttingDown = false + +// Graceful stop: finish the in-flight batch, then exit. Safe to interrupt — every +// write is an idempotent upsert and the DB state is the cursor, so re-running +// resumes where it left off. +const shutdown = () => { + if (shuttingDown) return + shuttingDown = true + log.info('Shutting down maven backfill (stopping after the current batch)...') +} + +process.on('SIGINT', shutdown) +process.on('SIGTERM', shutdown) + +const main = async () => { + log.info('maven backfill starting (one-shot, full extraction)...') + + const config = getMavenConfig() + log.info(config, 'Config loaded') + + const qx = await getPackagesDb() + await qx.selectOne('SELECT 1') + log.info('Connected to packages-db.') + + const totals = await runMavenCriticalBackfill(qx, config, () => shuttingDown) + + log.info({ ...totals }, 'maven backfill complete') + process.exit(0) +} + +main().catch((err) => { + log.error({ err }, 'maven backfill fatal error') + process.exit(1) +}) diff --git a/services/apps/packages_worker/src/bin/maven-worker.ts b/services/apps/packages_worker/src/bin/maven-worker.ts new file mode 100644 index 0000000000..54a4e7c672 --- /dev/null +++ b/services/apps/packages_worker/src/bin/maven-worker.ts @@ -0,0 +1,12 @@ +import { scheduleMavenCritical } from '../maven/schedule' +import { svc } from '../service' + +// Maven-only worker: runs on the shared `packages-worker` taskqueue (so it picks up +// the same bundled workflows/activities) but registers ONLY the maven-critical +// schedule. Intended for local dev — lets you run Maven in isolation without also +// firing the npm/osv schedules that bin/packages-worker.ts registers. +setImmediate(async () => { + await svc.init() + await scheduleMavenCritical() + await svc.start() +}) diff --git a/services/apps/packages_worker/src/bin/packages-worker.ts b/services/apps/packages_worker/src/bin/packages-worker.ts index 21dee223f4..0752db6a06 100644 --- a/services/apps/packages_worker/src/bin/packages-worker.ts +++ b/services/apps/packages_worker/src/bin/packages-worker.ts @@ -1,3 +1,4 @@ +import { scheduleMavenCritical } from '../maven/schedule' import { scheduleNpmIngest } from '../npm/schedule' import { scheduleOsvSync } from '../osv/schedule' import { svc } from '../service' @@ -6,5 +7,6 @@ setImmediate(async () => { await svc.init() await scheduleNpmIngest() await scheduleOsvSync() + await scheduleMavenCritical() await svc.start() }) diff --git a/services/apps/packages_worker/src/config.ts b/services/apps/packages_worker/src/config.ts index 9c7cc7829b..30d1f66ca0 100644 --- a/services/apps/packages_worker/src/config.ts +++ b/services/apps/packages_worker/src/config.ts @@ -45,3 +45,14 @@ export function getEnricherConfig() { fetchTimeoutMs: parseInt(process.env.ENRICHER_FETCH_TIMEOUT_MS ?? '10000', 10), } } + +export function getMavenConfig() { + return { + batchSize: requireEnvInt('POM_FETCHER_BATCH_SIZE'), + concurrency: requireEnvInt('POM_FETCHER_CONCURRENCY'), + nonCriticalBatchSize: requireEnvInt('POM_FETCHER_NON_CRITICAL_BATCH_SIZE'), + nonCriticalConcurrency: requireEnvInt('POM_FETCHER_NON_CRITICAL_CONCURRENCY'), + refreshDays: requireEnvInt('POM_FETCHER_REFRESH_DAYS'), + groupDelayMs: requireEnvInt('POM_FETCHER_GROUP_DELAY_MS'), + } +} diff --git a/services/apps/packages_worker/src/maven/__tests__/normalize.test.ts b/services/apps/packages_worker/src/maven/__tests__/normalize.test.ts new file mode 100644 index 0000000000..5f34412959 --- /dev/null +++ b/services/apps/packages_worker/src/maven/__tests__/normalize.test.ts @@ -0,0 +1,130 @@ +import { describe, expect, it } from 'vitest' + +import { normalizeScmUrl } from '../extract' +import { isPrerelease, parseRepoUrl } from '../normalize' + +describe('isPrerelease', () => { + it('returns false for a stable version', () => { + expect(isPrerelease('3.12.0')).toBe(false) + }) + + it('detects SNAPSHOT', () => { + expect(isPrerelease('1.0.0-SNAPSHOT')).toBe(true) + }) + + it('detects alpha', () => { + expect(isPrerelease('2.0.0-alpha')).toBe(true) + expect(isPrerelease('2.0.0-ALPHA.1')).toBe(true) + }) + + it('detects beta', () => { + expect(isPrerelease('1.5.0-beta.2')).toBe(true) + }) + + it('detects rc', () => { + expect(isPrerelease('4.0.0-rc1')).toBe(true) + expect(isPrerelease('4.0.0-RC.2')).toBe(true) + }) + + it('detects milestone (m1, m10)', () => { + expect(isPrerelease('5.3.0-m1')).toBe(true) + expect(isPrerelease('5.3.0-M10')).toBe(true) + }) + + it('returns false for versions with numbers that are not milestones', () => { + expect(isPrerelease('1.2.3')).toBe(false) + expect(isPrerelease('10.0.0')).toBe(false) + }) +}) + +describe('parseRepoUrl', () => { + it('identifies github.com', () => { + expect(parseRepoUrl('https://github.com/apache/commons-lang')).toEqual({ + host: 'github', + owner: 'apache', + name: 'commons-lang', + }) + }) + + it('identifies gitlab.com', () => { + expect(parseRepoUrl('https://gitlab.com/owner/repo')).toEqual({ + host: 'gitlab', + owner: 'owner', + name: 'repo', + }) + }) + + it('identifies bitbucket.org', () => { + expect(parseRepoUrl('https://bitbucket.org/owner/repo')).toEqual({ + host: 'bitbucket', + owner: 'owner', + name: 'repo', + }) + }) + + it('returns other for unknown hosts', () => { + const result = parseRepoUrl('https://svn.example.com/repo') + expect(result?.host).toBe('other') + }) + + it('returns null for invalid URLs', () => { + expect(parseRepoUrl('not-a-url')).toBeNull() + }) + + it('handles URLs with no path segments', () => { + const result = parseRepoUrl('https://github.com/') + expect(result).toEqual({ host: 'github', owner: null, name: null }) + }) +}) + +describe('normalizeScmUrl', () => { + it('returns null for null input', () => { + expect(normalizeScmUrl(null)).toBeNull() + }) + + it('strips scm:git: prefix', () => { + expect(normalizeScmUrl('scm:git:https://github.com/apache/commons-lang')).toBe( + 'https://github.com/apache/commons-lang', + ) + }) + + it('converts SSH git@ to https', () => { + expect(normalizeScmUrl('git@github.com:apache/commons-lang.git')).toBe( + 'https://github.com/apache/commons-lang', + ) + }) + + it('converts git:// to https://', () => { + expect(normalizeScmUrl('git://github.com/apache/commons-lang.git')).toBe( + 'https://github.com/apache/commons-lang', + ) + }) + + it('strips trailing .git', () => { + expect(normalizeScmUrl('https://github.com/apache/commons-lang.git')).toBe( + 'https://github.com/apache/commons-lang', + ) + }) + + it('strips /tree/... path suffix', () => { + expect(normalizeScmUrl('https://github.com/apache/commons-lang/tree/master')).toBe( + 'https://github.com/apache/commons-lang', + ) + }) + + it('strips trailing slash', () => { + expect(normalizeScmUrl('https://github.com/apache/commons-lang/')).toBe( + 'https://github.com/apache/commons-lang', + ) + }) + + it('handles combined scm:git: + SSH form', () => { + expect(normalizeScmUrl('scm:git:git@github.com:apache/commons-lang.git')).toBe( + 'https://github.com/apache/commons-lang', + ) + }) + + it('returns null for non-https result', () => { + expect(normalizeScmUrl('svn://svn.apache.org/repos/commons-lang')).toBeNull() + }) +}) diff --git a/services/apps/packages_worker/src/maven/activities.ts b/services/apps/packages_worker/src/maven/activities.ts new file mode 100644 index 0000000000..2380ec7a7d --- /dev/null +++ b/services/apps/packages_worker/src/maven/activities.ts @@ -0,0 +1,35 @@ +import { getServiceChildLogger } from '@crowd/logging' + +import { getMavenConfig } from '../config' +import { getPackagesDb } from '../db' + +import { BatchResult, processBatch } from './runMavenEnrichmentLoop' + +const log = getServiceChildLogger('maven-activity') + +export async function processMavenCriticalBatch(): Promise { + const config = getMavenConfig() + const qx = await getPackagesDb() + + // Universe-polling pass: skip POM extraction when version is unchanged. + const result = await processBatch(qx, config, true, false) + log.info({ ...result }, 'Maven critical batch complete') + return result +} + +export async function processMavenNonCriticalBatch(): Promise { + const config = getMavenConfig() + const qx = await getPackagesDb() + // Non-critical is DB-only (no POM fetch); the flag is unused on this path. + const result = await processBatch(qx, config, false, false) + log.info( + { + processed: result.processed, + skipped: result.skipped, + unchanged: result.unchanged, + error: result.error, + }, + 'Maven non-critical batch complete', + ) + return result +} diff --git a/services/apps/packages_worker/src/maven/extract.ts b/services/apps/packages_worker/src/maven/extract.ts new file mode 100644 index 0000000000..66485b558c --- /dev/null +++ b/services/apps/packages_worker/src/maven/extract.ts @@ -0,0 +1,535 @@ +/** + * Core POM extraction logic — pure functions (no I/O side-effects, no DB calls). + * Callers are responsible for concurrency, retries, and persistence. + */ +import axios from 'axios' +import { XMLParser } from 'fast-xml-parser' + +import { getServiceChildLogger } from '@crowd/logging' + +const log = getServiceChildLogger('maven') + +// ─── Types ──────────────────────────────────────────────────────────────────── + +export interface PomMaintainer { + username: string | null + displayName: string | null + email: string | null + url: string | null + role: 'author' | 'maintainer' +} + +export interface PomExtractionResult { + groupId: string + artifactId: string + version: string + purl: string + description: string | null + licenses: string[] + licensesRaw: string | null + scmUrl: string | null + homepageUrl: string | null + developers: PomMaintainer[] + contributors: PomMaintainer[] + parentHops: number + error: string | null +} + +// ─── Internal POM types ─────────────────────────────────────────────────────── + +interface PomData { + description?: unknown + url?: unknown + licenses?: { license?: unknown } + scm?: { url?: unknown; connection?: unknown } + developers?: { developer?: unknown } + contributors?: { contributor?: unknown } + parent?: { groupId?: unknown; artifactId?: unknown; version?: unknown } +} + +interface PomPerson { + id?: unknown + name?: unknown + email?: unknown + url?: unknown +} + +// ─── Config ─────────────────────────────────────────────────────────────────── + +// Base URL for fetching POMs/metadata. Defaults to canonical Central (repo1, Fastly — +// aggressive per-IP throttling). Point POM_FETCHER_MAVEN_BASE_URL at a high-throughput +// mirror (e.g. the Google GCS mirror) for bulk backfills. +const MAVEN_REPO = process.env.POM_FETCHER_MAVEN_BASE_URL ?? 'https://repo1.maven.org/maven2' +export const MAX_PARENT_HOPS = 8 +const REQUEST_TIMEOUT_MS = 15_000 + +const parser = new XMLParser({ + ignoreAttributes: false, + attributeNamePrefix: '@_', + parseTagValue: false, // keep all values as strings — prevents version "65" becoming number + parseAttributeValue: false, +}) + +// ─── Retry with exponential backoff ────────────────────────────────────────── + +const MAX_RETRIES = 3 +const RETRY_BASE_MS = 2_000 + +async function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)) +} + +async function getWithRetry(url: string): Promise { + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const res = await axios.get(url, { + responseType: 'text', + timeout: REQUEST_TIMEOUT_MS, + }) + return res.data + } catch (err) { + if (axios.isAxiosError(err)) { + const status = err.response?.status + // 429 = explicit rate limit, 403 = CDN throttle (Maven Central uses both) + if ((status === 429 || status === 403) && attempt < MAX_RETRIES) { + const delay = RETRY_BASE_MS * 2 ** attempt + Math.random() * 500 + await sleep(delay) + continue + } + } + throw err + } + } + throw new Error(`Max retries exceeded for ${url}`) +} + +// ─── POM fetch ──────────────────────────────────────────────────────────────── + +export function buildPomUrl(groupId: string, artifactId: string, version: string): string { + const groupPath = groupId.replace(/\./g, '/') + return `${MAVEN_REPO}/${groupPath}/${artifactId}/${version}/${artifactId}-${version}.pom` +} + +export async function fetchPom( + groupId: string, + artifactId: string, + version: string, + url: string, +): Promise { + try { + const data = await getWithRetry(url) + const parsed = parser.parse(data) + return (parsed?.project as PomData) ?? null + } catch (err) { + if (axios.isAxiosError(err)) { + const status = err.response?.status + if (status === 404) { + log.debug({ groupId, artifactId, version }, `POM not found (404): ${url}`) + return null + } + log.debug( + { groupId, artifactId, version }, + `HTTP ${status ?? 'unknown'} fetching POM: ${url}`, + ) + return null + } + throw err + } +} + +// ─── POM cache ────────────────────────────────────────────────────────────── +// +// Parent POMs are heavily shared across artifacts of the same namespace +// (e.g. org.apache:apache, org.springframework.boot:spring-boot-starter-parent), +// and the critical batch is re-sorted by namespace before processing (see +// runMavenEnrichmentLoop), so those siblings are processed close together. A +// module-level, coordinate-keyed in-process cache +// collapses those repeated parent fetches into a single HTTP request — the single +// biggest lever against Maven Central rate limiting. It also removes the redundant +// second fetch of each artifact's own POM (extractArtifact fetches the leaf, then +// resolveWithInheritance fetches it again at depth 0). +// +// Only *successful* fetches are cached: fetchPom() returns null for both a real 404 +// and a transient failure (throttle/timeout), so caching null would poison the cache +// with transient errors — we never do it. Maven coordinates are immutable, so a cached +// POM never goes stale; the LRU size cap is purely to bound memory. + +const POM_CACHE_MAX_ENTRIES = 5_000 + +const pomCache = new Map() +const inFlight = new Map>() +const pomCacheStats = { hits: 0, coalesced: 0, misses: 0, evictions: 0 } + +function pomCacheKey(groupId: string, artifactId: string, version: string): string { + return `${groupId}:${artifactId}:${version}` +} + +function cacheSet(key: string, pom: PomData): void { + pomCache.delete(key) // re-insert to refresh recency (LRU) + pomCache.set(key, pom) + if (pomCache.size > POM_CACHE_MAX_ENTRIES) { + const oldest = pomCache.keys().next().value + if (oldest !== undefined) { + pomCache.delete(oldest) + pomCacheStats.evictions++ + } + } +} + +/** + * Cached + request-coalescing wrapper around fetchPom(). + * - Cache hit → returns the stored POM, no HTTP. + * - In-flight → a concurrent fetch for the same coordinates is already running; + * await it instead of issuing a duplicate request. + * - Miss → performs the network fetch; caches the result only if non-null. + */ +async function fetchPomCached( + groupId: string, + artifactId: string, + version: string, +): Promise { + const key = pomCacheKey(groupId, artifactId, version) + + const cached = pomCache.get(key) + if (cached !== undefined) { + pomCacheStats.hits++ + pomCache.delete(key) // refresh recency on read (LRU) + pomCache.set(key, cached) + return cached + } + + const pending = inFlight.get(key) + if (pending) { + pomCacheStats.coalesced++ + return pending + } + + pomCacheStats.misses++ + const promise = fetchPom(groupId, artifactId, version, buildPomUrl(groupId, artifactId, version)) + .then((pom) => { + if (pom) cacheSet(key, pom) + return pom + }) + .finally(() => { + inFlight.delete(key) + }) + + inFlight.set(key, promise) + return promise +} + +/** Snapshot of cache effectiveness — logged once per critical batch by the enrichment loop. */ +export function getPomCacheStats(): { + size: number + hits: number + coalesced: number + misses: number + evictions: number + hitRate: number +} { + const lookups = pomCacheStats.hits + pomCacheStats.coalesced + pomCacheStats.misses + const hitRate = + lookups === 0 + ? 0 + : Math.round(((pomCacheStats.hits + pomCacheStats.coalesced) / lookups) * 100) / 100 + return { size: pomCache.size, ...pomCacheStats, hitRate } +} + +/** Clears the cache and counters. Intended for tests. */ +export function resetPomCache(): void { + pomCache.clear() + inFlight.clear() + pomCacheStats.hits = 0 + pomCacheStats.coalesced = 0 + pomCacheStats.misses = 0 + pomCacheStats.evictions = 0 +} + +// ─── Inheritance resolution ─────────────────────────────────────────────────── + +interface ResolvedFields { + description: string | null + licenses: string[] + licensesRaw: string | null + scmUrl: string | null + homepageUrl: string | null + developers: PomMaintainer[] + contributors: PomMaintainer[] + hops: number +} + +async function resolveWithInheritance( + groupId: string, + artifactId: string, + version: string, + depth = 0, +): Promise { + if (depth > MAX_PARENT_HOPS) { + log.debug({ groupId, artifactId, version }, `Max parent hops (${MAX_PARENT_HOPS}) reached`) + return emptyFields(depth) + } + + const pom = await fetchPomCached(groupId, artifactId, version) + if (!pom) return emptyFields(depth) + + const licenses = extractLicenses(pom) + const scmUrl = extractStr(pom.scm?.url ?? pom.scm?.connection) + const developers = extractPersons(pom.developers?.developer, 'author') + const contributors = extractPersons(pom.contributors?.contributor, 'maintainer') + + const missingLicense = licenses.length === 0 + const missingScm = !scmUrl + const parent = extractParent(pom) + + if (parent && (missingLicense || missingScm)) { + log.debug( + { groupId, artifactId, version }, + `[hop ${depth + 1}] ${parent.groupId}:${parent.artifactId}:${parent.version}`, + ) + const parentFields = await resolveWithInheritance( + parent.groupId, + parent.artifactId, + parent.version, + depth + 1, + ) + return { + description: extractStr(pom.description) ?? parentFields.description, + licenses: licenses.length > 0 ? licenses : parentFields.licenses, + licensesRaw: licenses.length > 0 ? licenses.join(', ') : parentFields.licensesRaw, + scmUrl: scmUrl ?? parentFields.scmUrl, + homepageUrl: extractStr(pom.url) ?? parentFields.homepageUrl, + developers: developers.length > 0 ? developers : parentFields.developers, + contributors: contributors.length > 0 ? contributors : parentFields.contributors, + hops: parentFields.hops, + } + } + + return { + description: extractStr(pom.description), + licenses, + licensesRaw: licenses.length > 0 ? licenses.join(', ') : null, + scmUrl, + homepageUrl: extractStr(pom.url), + developers, + contributors, + hops: depth, + } +} + +// ─── Public entry points ────────────────────────────────────────────────────── + +/** + * Fetches only the root POM without following the parent chain — faster than + * extractArtifact, but inherited fields (licenses, SCM) may be missing. + * Currently unused: kept as a lightweight option for high-throughput paths that + * don't need parent inheritance. + */ +export async function extractArtifactDirect( + groupId: string, + artifactId: string, + version: string, +): Promise { + const purl = `pkg:maven/${groupId}/${artifactId}@${version}` + const pomUrl = buildPomUrl(groupId, artifactId, version) + const pom = await fetchPomCached(groupId, artifactId, version) + + if (!pom) { + return { + groupId, + artifactId, + version, + purl, + description: null, + licenses: [], + licensesRaw: null, + scmUrl: null, + homepageUrl: null, + developers: [], + contributors: [], + parentHops: 0, + error: `POM not found: ${pomUrl}`, + } + } + + const licenses = extractLicenses(pom) + const scmUrl = extractStr(pom.scm?.url ?? pom.scm?.connection) + const developers = extractPersons(pom.developers?.developer, 'author') + const contributors = extractPersons(pom.contributors?.contributor, 'maintainer') + + return { + groupId, + artifactId, + version, + purl, + description: extractStr(pom.description), + licenses, + licensesRaw: licenses.length > 0 ? licenses.join(', ') : null, + scmUrl, + homepageUrl: extractStr(pom.url), + developers, + contributors, + parentHops: 0, + error: null, + } +} + +/** + * Fetches and resolves POM metadata for the given Maven artifact, following + * the parent chain to inherit licenses and SCM when not in the direct POM. + * Always returns a result object; errors are captured in `result.error`. + */ +export async function extractArtifact( + groupId: string, + artifactId: string, + version: string, +): Promise { + const purl = `pkg:maven/${groupId}/${artifactId}@${version}` + + const pomUrl = buildPomUrl(groupId, artifactId, version) + const rootPom = await fetchPomCached(groupId, artifactId, version) + if (!rootPom) { + return { + groupId, + artifactId, + version, + purl, + description: null, + licenses: [], + licensesRaw: null, + scmUrl: null, + homepageUrl: null, + developers: [], + contributors: [], + parentHops: 0, + error: `POM not found: ${pomUrl}`, + } + } + + try { + const resolved = await resolveWithInheritance(groupId, artifactId, version) + return { + groupId, + artifactId, + version, + purl, + description: resolved.description, + licenses: resolved.licenses, + licensesRaw: resolved.licensesRaw, + scmUrl: resolved.scmUrl, + homepageUrl: resolved.homepageUrl, + developers: resolved.developers, + contributors: resolved.contributors, + parentHops: resolved.hops, + error: null, + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + log.debug({ groupId, artifactId, version }, `Error resolving POM: ${message}`) + return { + groupId, + artifactId, + version, + purl, + description: null, + licenses: [], + licensesRaw: null, + scmUrl: null, + homepageUrl: null, + developers: [], + contributors: [], + parentHops: 0, + error: message, + } + } +} + +// ─── SCM URL normalisation ─────────────────────────────────────────────────── + +/** + * Converts the raw SCM URL from a POM (declared_repository_url) into a clean + * HTTPS repository URL suitable for storage as repository_url. + * + * Handles common Maven SCM URL forms: + * scm:git:git@github.com:owner/repo.git → https://github.com/owner/repo + * scm:git:https://github.com/owner/repo → https://github.com/owner/repo + * git://github.com/owner/repo.git → https://github.com/owner/repo + * https://github.com/owner/repo/tree/... → https://github.com/owner/repo + */ +export function normalizeScmUrl(raw: string | null): string | null { + if (!raw) return null + let url = raw.trim() + + // Strip scm:git: or scm: prefix + url = url.replace(/^scm:git:/i, '').replace(/^scm:/i, '') + + // Convert SSH git@host:owner/repo → https://host/owner/repo + url = url.replace(/^git@([^:]+):(.+)$/, 'https://$1/$2') + + // Convert git:// → https:// + url = url.replace(/^git:\/\//, 'https://') + + // Strip trailing .git + url = url.replace(/\.git$/, '') + + // Strip /tree/... or /blob/... path suffixes (keep only host + owner + repo) + url = url.replace(/\/(tree|blob)(\/.*)?$/, '') + + if (!url.startsWith('https://')) return null + + return url.replace(/\/$/, '') +} + +// ─── Private helpers ────────────────────────────────────────────────────────── + +function extractStr(value: unknown): string | null { + if (typeof value === 'string' && value.trim()) return value.trim() + return null +} + +function extractLicenses(pom: PomData): string[] { + const raw = pom.licenses?.license + if (!raw) return [] + const list = Array.isArray(raw) ? raw : [raw] + return (list as Array<{ name?: unknown }>) + .map((l) => extractStr(l?.name)) + .filter((n): n is string => n !== null) +} + +function extractPersons(raw: unknown, role: 'author' | 'maintainer'): PomMaintainer[] { + if (!raw) return [] + const list = Array.isArray(raw) ? raw : [raw] + return (list as PomPerson[]) + .filter((p) => p.id || p.name || p.email) + .map((p) => ({ + username: extractStr(p.id), + displayName: extractStr(p.name), + email: extractStr(p.email), + url: extractStr(p.url), + role, + })) +} + +function extractParent( + pom: PomData, +): { groupId: string; artifactId: string; version: string } | null { + const p = pom.parent + if (!p) return null + const groupId = extractStr(p.groupId) + const artifactId = extractStr(p.artifactId) + const version = extractStr(p.version) + if (!groupId || !artifactId || !version) return null + return { groupId, artifactId, version } +} + +function emptyFields(hops: number): ResolvedFields { + return { + description: null, + licenses: [], + licensesRaw: null, + scmUrl: null, + homepageUrl: null, + developers: [], + contributors: [], + hops, + } +} diff --git a/services/apps/packages_worker/src/maven/metadata.ts b/services/apps/packages_worker/src/maven/metadata.ts new file mode 100644 index 0000000000..5043d3b682 --- /dev/null +++ b/services/apps/packages_worker/src/maven/metadata.ts @@ -0,0 +1,123 @@ +/** + * Fetches maven-metadata.xml for a Maven artifact and returns the full version + * list plus the current release version. + * + * URL format: + * https://repo1.maven.org/maven2/{groupPath}/{artifactId}/maven-metadata.xml + * + * Returns null when the artifact is not found (404) or the metadata is + * malformed. + */ +import axios from 'axios' +import { XMLParser } from 'fast-xml-parser' + +const MAVEN_REPO = process.env.POM_FETCHER_MAVEN_BASE_URL ?? 'https://repo1.maven.org/maven2' +const REQUEST_TIMEOUT_MS = 10_000 +const MAX_RETRIES = 3 +const RETRY_BASE_MS = 2_000 + +const parser = new XMLParser({ + ignoreAttributes: false, + attributeNamePrefix: '@_', + parseTagValue: false, + parseAttributeValue: false, +}) + +export interface MavenVersionsMetadata { + versions: string[] + releaseVersion: string | null + lastUpdated: Date | null +} + +// maven-metadata.xml carries yyyyMMddHHmmss in UTC — +// the timestamp of the most recent publish. Parse it into a Date for +// packages.latest_release_at; return null on anything malformed. +function parseMavenLastUpdated(raw: unknown): Date | null { + if (typeof raw !== 'string') { + // fast-xml-parser may coerce the all-digits value to a number + raw = typeof raw === 'number' ? String(raw) : null + } + const m = + typeof raw === 'string' + ? raw.trim().match(/^(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})$/) + : null + if (!m) return null + const [, y, mo, d, h, mi, s] = m + const ts = Date.UTC(+y, +mo - 1, +d, +h, +mi, +s) + return Number.isNaN(ts) ? null : new Date(ts) +} + +export type MavenFetchError = + | { kind: 'NOT_FOUND' } + | { kind: 'RATE_LIMIT'; status: number } + | { kind: 'TRANSIENT'; message: string } + +export function isMavenFetchError(v: unknown): v is MavenFetchError { + return typeof v === 'object' && v !== null && 'kind' in v +} + +async function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)) +} + +export async function resolveVersionsList( + groupId: string, + artifactId: string, +): Promise { + const groupPath = groupId.replace(/\./g, '/') + const url = `${MAVEN_REPO}/${groupPath}/${artifactId}/maven-metadata.xml` + + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const res = await axios.get(url, { + responseType: 'text', + timeout: REQUEST_TIMEOUT_MS, + }) + const parsed = parser.parse(res.data) + + // Prefer over — release excludes snapshots/alphas + const versioning = parsed?.metadata?.versioning + const release = typeof versioning?.release === 'string' ? versioning.release.trim() : null + const latest = typeof versioning?.latest === 'string' ? versioning.latest.trim() : null + + const rawVersions = versioning?.versions?.version + let versions: string[] = [] + if (Array.isArray(rawVersions)) { + versions = rawVersions.map((v: unknown) => String(v).trim()).filter(Boolean) + } else if (typeof rawVersions === 'string' && rawVersions.trim()) { + versions = [rawVersions.trim()] + } + + return { + versions, + releaseVersion: release || latest || null, + lastUpdated: parseMavenLastUpdated(versioning?.lastUpdated), + } + } catch (err) { + if (axios.isAxiosError(err)) { + const status = err.response?.status + if (status === 404) return { kind: 'NOT_FOUND' } + // 429 = explicit rate limit, 403 = CDN throttle (Maven Central uses both) + if ((status === 429 || status === 403) && attempt < MAX_RETRIES) { + const delay = RETRY_BASE_MS * 2 ** attempt + Math.random() * 500 + await sleep(delay) + continue + } + if (status === 429 || status === 403) return { kind: 'RATE_LIMIT', status } + } + const message = err instanceof Error ? err.message : String(err) + return { kind: 'TRANSIENT', message } + } + } + + return { kind: 'RATE_LIMIT', status: 429 } +} + +export async function resolveLatestVersion( + groupId: string, + artifactId: string, +): Promise { + const meta = await resolveVersionsList(groupId, artifactId) + if (isMavenFetchError(meta)) return null + return meta.releaseVersion +} diff --git a/services/apps/packages_worker/src/maven/normalize.ts b/services/apps/packages_worker/src/maven/normalize.ts new file mode 100644 index 0000000000..00df7986aa --- /dev/null +++ b/services/apps/packages_worker/src/maven/normalize.ts @@ -0,0 +1,21 @@ +export function isPrerelease(version: string): boolean { + return /-(SNAPSHOT|alpha|beta|rc|m\d+)/i.test(version) +} + +export function parseRepoUrl( + url: string, +): { host: string; owner: string | null; name: string | null } | null { + try { + const parsed = new URL(url) + const h = parsed.hostname.toLowerCase() + let host: string + if (h === 'github.com' || h.endsWith('.github.com')) host = 'github' + else if (h === 'gitlab.com' || h.includes('gitlab')) host = 'gitlab' + else if (h === 'bitbucket.org') host = 'bitbucket' + else host = 'other' + const parts = parsed.pathname.split('/').filter(Boolean) + return { host, owner: parts[0] ?? null, name: parts[1] ?? null } + } catch { + return null + } +} diff --git a/services/apps/packages_worker/src/maven/runMavenEnrichmentLoop.ts b/services/apps/packages_worker/src/maven/runMavenEnrichmentLoop.ts new file mode 100644 index 0000000000..d20f1e0cd6 --- /dev/null +++ b/services/apps/packages_worker/src/maven/runMavenEnrichmentLoop.ts @@ -0,0 +1,503 @@ +import crypto from 'crypto' + +import { + MavenPackageToSync, + QueryExecutor, + listMavenPackagesToSync, + logAuditFieldChange, + replacePackageMaintainers, + touchPackageSyncedAt, + upsertMaintainer, + upsertPackage, + upsertPackageRepo, + upsertRepo, + upsertVersionsBatch, +} from '@crowd/data-access-layer' +import { getServiceChildLogger } from '@crowd/logging' + +import { getMavenConfig } from '../config' + +import { MAX_PARENT_HOPS, extractArtifact, getPomCacheStats, normalizeScmUrl } from './extract' +import { isMavenFetchError, resolveVersionsList } from './metadata' +import { isPrerelease, parseRepoUrl } from './normalize' + +const log = getServiceChildLogger('maven') + +// ─── Types ──────────────────────────────────────────────────────────────────── + +export interface BatchResult { + processed: number + skipped: number + error: number + unchanged: number + // critical packages whose parent chain was truncated at MAX_PARENT_HOPS (POM data may be incomplete) + hopLimitReached: number +} + +type CriticalStatus = 'processed' | 'skipped' | 'unchanged' | 'error' + +interface CriticalPackageResult { + status: CriticalStatus + hopLimitReached: boolean +} + +type MavenConfig = ReturnType +type PackageRow = MavenPackageToSync + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function mavenRegistryUrl(groupId: string, artifactId: string): string { + return `https://central.sonatype.com/artifact/${groupId}/${artifactId}` +} + +async function writeRepoLink( + qx: QueryExecutor, + packageId: number, + repositoryUrl: string | null, + changed: Set, +): Promise { + if (!repositoryUrl) return + const parsed = parseRepoUrl(repositoryUrl) + if (!parsed) return + const repoId = await upsertRepo(qx, { url: repositoryUrl, ...parsed }) + const repoChanged = await upsertPackageRepo(qx, { + packageId, + repoId, + source: 'declared', + confidence: 0.8, + }) + repoChanged.forEach((f) => changed.add(f)) +} + +// Postgres deadlock (40P01) is transient: concurrent transactions upserting the same shared +// rows (e.g. maintainer 'hboutemy' across many org.apache packages, or the shared apache repo) +// can form a lock cycle. Re-running the whole transaction resolves it — the upserts are idempotent. +async function withDeadlockRetry(fn: () => Promise, maxAttempts = 4): Promise { + for (let attempt = 1; ; attempt++) { + try { + return await fn() + } catch (err) { + const code = (err as { code?: string }).code + const isDeadlock = + code === '40P01' || /deadlock detected/i.test(String((err as Error)?.message)) + if (isDeadlock && attempt < maxAttempts) { + await new Promise((r) => setTimeout(r, 50 * attempt + Math.random() * 100)) + log.debug({ attempt }, 'Deadlock detected — retrying transaction') + continue + } + throw err + } + } +} + +// ─── Non-critical: copy universe stats into packages ───────────────────────── + +async function processNonCriticalPackage(qx: QueryExecutor, pkg: PackageRow): Promise { + await upsertPackage(qx, { + purl: pkg.purl, + ecosystem: 'maven', + namespace: pkg.namespace, + name: pkg.name, + description: null, + homepage: null, + registryUrl: pkg.namespace ? mavenRegistryUrl(pkg.namespace, pkg.name) : null, + declaredRepositoryUrl: null, + repositoryUrl: null, + licenses: null, + licensesRaw: null, + latestVersion: null, + ingestionSource: 'packages_universe', + criticalityScore: pkg.criticalityScore, + dependentPackagesCount: pkg.dependentPackagesCount, + dependentReposCount: pkg.dependentReposCount, + }) +} + +// ─── Critical: full POM extraction ─────────────────────────────────────────── + +async function processCriticalPackage( + qx: QueryExecutor, + pkg: PackageRow, + forceFullExtraction: boolean, +): Promise { + const groupId = pkg.namespace + const artifactId = pkg.name + + if (!groupId) { + log.warn({ purl: pkg.purl }, 'Skipping: null namespace (groupId)') + return { status: 'skipped', hopLimitReached: false } + } + + // Phase 1: lightweight metadata fetch to get the current upstream version. + const metadata = await resolveVersionsList(groupId, artifactId) + + if (isMavenFetchError(metadata)) { + if (metadata.kind === 'NOT_FOUND') { + await upsertPackage(qx, { + purl: pkg.purl, + ecosystem: 'maven', + namespace: groupId, + name: artifactId, + description: null, + homepage: null, + registryUrl: mavenRegistryUrl(groupId, artifactId), + declaredRepositoryUrl: null, + repositoryUrl: null, + licenses: null, + licensesRaw: null, + latestVersion: pkg.latestVersion ?? null, + ingestionSource: 'maven_not_on_central', + criticalityScore: pkg.criticalityScore, + dependentPackagesCount: pkg.dependentPackagesCount, + dependentReposCount: pkg.dependentReposCount, + }) + log.warn({ groupId, artifactId }, 'Not on Maven Central — writing minimal record') + return { status: 'skipped', hopLimitReached: false } + } + if (metadata.kind === 'RATE_LIMIT') { + log.warn( + { groupId, artifactId, status: metadata.status }, + 'Rate limited — will retry next pass', + ) + return { status: 'error', hopLimitReached: false } + } + throw new Error( + `Transient error fetching metadata for ${groupId}:${artifactId} — ${metadata.message}`, + ) + } + + const version = metadata.releaseVersion + + if (!version) { + await upsertPackage(qx, { + purl: pkg.purl, + ecosystem: 'maven', + namespace: groupId, + name: artifactId, + description: null, + homepage: null, + registryUrl: mavenRegistryUrl(groupId, artifactId), + declaredRepositoryUrl: null, + repositoryUrl: null, + licenses: null, + licensesRaw: null, + latestVersion: null, + ingestionSource: 'maven_no_version', + criticalityScore: pkg.criticalityScore, + dependentPackagesCount: pkg.dependentPackagesCount, + dependentReposCount: pkg.dependentReposCount, + }) + log.warn({ groupId, artifactId }, 'No release version in metadata — writing minimal record') + return { status: 'skipped', hopLimitReached: false } + } + + // Phase 2: skip full POM extraction when upstream version matches what we already have. + if (!forceFullExtraction && version === pkg.latestVersion) { + await touchPackageSyncedAt(qx, pkg.purl, { + criticalityScore: pkg.criticalityScore, + dependentPackagesCount: pkg.dependentPackagesCount, + dependentReposCount: pkg.dependentReposCount, + }) + log.debug({ groupId, artifactId, version }, 'Version unchanged — skipping POM extraction') + return { status: 'unchanged', hopLimitReached: false } + } + + // Phase 3: full POM extraction with parent-chain resolution — wrapped in a + // transaction so partial writes never leave the package in an inconsistent state. + const result = await extractArtifact(groupId, artifactId, version) + + if (result.error) { + log.warn({ groupId, artifactId, version, error: result.error }, 'POM extraction failed') + await upsertPackage(qx, { + purl: pkg.purl, + ecosystem: 'maven', + namespace: groupId, + name: artifactId, + description: null, + homepage: null, + registryUrl: mavenRegistryUrl(groupId, artifactId), + declaredRepositoryUrl: null, + repositoryUrl: null, + licenses: null, + licensesRaw: null, + latestVersion: version, + ingestionSource: 'maven_error', + criticalityScore: pkg.criticalityScore, + dependentPackagesCount: pkg.dependentPackagesCount, + dependentReposCount: pkg.dependentReposCount, + }) + return { status: 'error', hopLimitReached: false } + } + + const hopLimitReached = result.parentHops > MAX_PARENT_HOPS + if (hopLimitReached) { + log.warn( + { + groupId, + artifactId, + parentHops: result.parentHops, + missingLicenses: result.licenses.length === 0, + missingScm: !result.scmUrl, + }, + 'Parent hop limit reached — data may be incomplete', + ) + } + + const repositoryUrl = normalizeScmUrl(result.scmUrl) + + await withDeadlockRetry(() => + qx.tx(async (t) => { + const changed = new Set() + + const { id: packageId, changedFields: pkgChanged } = await upsertPackage(t, { + purl: pkg.purl, + ecosystem: 'maven', + namespace: groupId, + name: artifactId, + description: result.description, + homepage: result.homepageUrl, + registryUrl: mavenRegistryUrl(groupId, artifactId), + declaredRepositoryUrl: result.scmUrl, + repositoryUrl, + licenses: result.licenses.length > 0 ? result.licenses : null, + licensesRaw: result.licensesRaw, + latestVersion: version, + versionsCount: metadata.versions.length > 0 ? metadata.versions.length : null, + latestReleaseAt: metadata.lastUpdated, + ingestionSource: 'maven-registry', + criticalityScore: pkg.criticalityScore, + dependentPackagesCount: pkg.dependentPackagesCount, + dependentReposCount: pkg.dependentReposCount, + }) + pkgChanged.forEach((f) => changed.add(f)) + + const allVersions = metadata.versions.length > 0 ? metadata.versions : [version] + const verChanged = await upsertVersionsBatch( + t, + allVersions.map((v) => ({ + packageId, + ecosystem: 'maven', + namespace: groupId, + name: artifactId, + number: v, + isLatest: v === metadata.releaseVersion, + isPrerelease: isPrerelease(v), + license: result.licenses[0] ?? null, + })), + ) + verChanged.forEach((f) => changed.add(f)) + + const allPeople = [ + ...result.developers.map((d) => ({ ...d, role: 'author' as const })), + ...result.contributors.map((c) => ({ ...c, role: 'maintainer' as const })), + ].sort((a, b) => { + // Stable order on the shared maintainers table so concurrent transactions acquire + // row locks in the same order → no deadlock cycles. + const ka = a.username ?? a.email ?? a.displayName ?? '' + const kb = b.username ?? b.email ?? b.displayName ?? '' + return ka < kb ? -1 : ka > kb ? 1 : 0 + }) + + const maintainerLinks: Array<{ maintainerId: number; role: 'author' | 'maintainer' }> = [] + for (const person of allPeople) { + const username = person.username ?? person.email ?? person.displayName + if (!username) continue + const emailHash = person.email + ? crypto.createHash('sha256').update(person.email.toLowerCase().trim()).digest('hex') + : null + const { id: maintainerId, changedFields: mChanged } = await upsertMaintainer(t, { + ecosystem: 'maven', + username, + displayName: person.displayName, + url: person.url, + emailHash, + }) + mChanged.forEach((f) => changed.add(f)) + maintainerLinks.push({ maintainerId, role: person.role }) + } + + if (maintainerLinks.length > 0) { + const pmChanged = await replacePackageMaintainers(t, packageId, maintainerLinks) + pmChanged.forEach((f) => changed.add(f)) + } + + await writeRepoLink(t, packageId, repositoryUrl, changed) + + await logAuditFieldChange(t, 'maven', pkg.purl, Array.from(changed)) + + log.info( + { + groupId, + artifactId, + version, + parentHops: result.parentHops, + licenses: result.licenses.length, + maintainers: maintainerLinks.length, + versions: allVersions.length, + }, + 'ok', + ) + }), + ) + + return { status: 'processed', hopLimitReached } +} + +// ─── Batch processing ───────────────────────────────────────────────────────── + +export async function processBatch( + qx: QueryExecutor, + config: MavenConfig, + isCritical: boolean, + forceFullExtraction: boolean, +): Promise { + const batchSize = isCritical ? config.batchSize : config.nonCriticalBatchSize + const refreshDays = config.refreshDays + + const packages = await listMavenPackagesToSync(qx, { limit: batchSize, refreshDays, isCritical }) + + return processPackages(qx, config, packages, isCritical, forceFullExtraction) +} + +// Runs a concrete list of packages through the enrichment pipeline. +async function processPackages( + qx: QueryExecutor, + config: MavenConfig, + packages: PackageRow[], + isCritical: boolean, + forceFullExtraction: boolean, +): Promise { + const concurrency = isCritical ? config.concurrency : config.nonCriticalConcurrency + + if (packages.length === 0) + return { processed: 0, skipped: 0, error: 0, unchanged: 0, hopLimitReached: 0 } + + // Cluster the batch by namespace so artifacts sharing a parent POM are processed + // adjacently — this is what makes the parent-POM cache effective. The criticality + // ordering only decides *which* packages are in the batch (via the SQL LIMIT); + // it does not group same-namespace siblings, so we reorder here. Only matters on + // the critical path (the non-critical path issues no POM/parent HTTP). + if (isCritical) { + packages.sort( + (a, b) => + (a.namespace ?? '').localeCompare(b.namespace ?? '') || a.name.localeCompare(b.name), + ) + } + + log.info({ count: packages.length, isCritical }, 'Batch started') + + const counts = { processed: 0, skipped: 0, error: 0, unchanged: 0, hopLimitReached: 0 } + + for (let batchStart = 0; batchStart < packages.length; batchStart += concurrency) { + const group = packages.slice(batchStart, batchStart + concurrency) + + if (isCritical && config.groupDelayMs > 0 && batchStart > 0) { + await new Promise((r) => setTimeout(r, config.groupDelayMs)) + } + + await Promise.all( + group.map(async (pkg) => { + try { + if (!isCritical) { + await processNonCriticalPackage(qx, pkg) + counts.processed++ + return + } + + const res = await processCriticalPackage(qx, pkg, forceFullExtraction) + counts[res.status]++ + if (res.hopLimitReached) counts.hopLimitReached++ + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + log.error({ purl: pkg.purl, error: message }, 'Unexpected error processing package') + counts.error++ + } + }), + ) + + const done = batchStart + group.length + if (done % 25 === 0 || done === packages.length) { + log.debug({ done, total: packages.length, ...counts }, 'Progress') + } + } + + if (isCritical) { + // POM cache only fills on the critical path (parent-chain resolution). + log.info(getPomCacheStats(), 'POM cache') + } + + return counts +} + +// ─── Phase runner ───────────────────────────────────────────────────────────── + +async function runPhase( + qx: QueryExecutor, + config: MavenConfig, + isCritical: boolean, + isShuttingDown: () => boolean, +): Promise { + const label = isCritical ? 'critical' : 'non-critical' + const total: BatchResult = { + processed: 0, + skipped: 0, + error: 0, + unchanged: 0, + hopLimitReached: 0, + } + let batchNum = 0 + const phaseStartedAt = Date.now() + + log.info({ phase: label }, 'Phase started') + + while (!isShuttingDown()) { + // The standalone loop is the backfill entry point → always full extraction. + const result = await processBatch(qx, config, isCritical, true) + + if (result.processed + result.skipped + result.error + result.unchanged === 0) { + const durationSec = Math.round((Date.now() - phaseStartedAt) / 1000) + log.info({ phase: label, ...total, durationSec }, 'Phase complete') + return total + } + + batchNum++ + total.processed += result.processed + total.skipped += result.skipped + total.error += result.error + total.unchanged += result.unchanged + total.hopLimitReached += result.hopLimitReached + + log.info( + { + phase: label, + batch: batchNum, + totalProcessed: total.processed, + totalSkipped: total.skipped, + totalUnchanged: total.unchanged, + totalErrors: total.error, + totalHopLimitReached: total.hopLimitReached, + elapsedSec: Math.round((Date.now() - phaseStartedAt) / 1000), + }, + 'Batch done', + ) + } + + return total +} + +// ─── One-shot backfill ────────────────────────────────────────────────────────── + +/** + * Drains the Tier 2 critical queue once, with full POM extraction, and returns + * the totals. It does NOT idle-loop — it runs until a batch comes back empty (or + * shutdown is requested) and then returns, so the caller can exit. Meant to be + * triggered manually (e.g. `pnpm backfill:maven` execed into the packages-worker + * container). + */ +export async function runMavenCriticalBackfill( + qx: QueryExecutor, + config: MavenConfig, + isShuttingDown: () => boolean, +): Promise { + return runPhase(qx, config, true, isShuttingDown) +} diff --git a/services/apps/packages_worker/src/maven/schedule.ts b/services/apps/packages_worker/src/maven/schedule.ts new file mode 100644 index 0000000000..c8c004842d --- /dev/null +++ b/services/apps/packages_worker/src/maven/schedule.ts @@ -0,0 +1,82 @@ +import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client' + +import { svc } from '../service' +import { mavenCriticalWorkflow } from '../workflows' + +export async function scheduleMavenCritical(): Promise { + const { temporal } = svc + if (!temporal) throw new Error('Temporal client not initialized') + + const scheduleOptions: Parameters[0] = { + scheduleId: 'maven-critical', + spec: { + cronExpressions: ['*/1 * * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.SKIP, + catchupWindow: '1 hour', + }, + action: { + type: 'startWorkflow', + workflowType: mavenCriticalWorkflow, + taskQueue: 'packages-worker', + workflowExecutionTimeout: '15 minutes', + retry: { + initialInterval: '30 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [], + }, + } + + try { + await temporal.schedule.create(scheduleOptions) + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + // Schedule exists → delete and recreate so cron/spec changes take effect on + // restart (schedule.create is a no-op when the id exists → it would keep the old cron). + await temporal.schedule.getHandle('maven-critical').delete() + await temporal.schedule.create(scheduleOptions) + svc.log.info('Schedule maven-critical recreated (cron synced).') + } else { + throw err + } + } +} + +// export async function scheduleMavenNonCritical(): Promise { +// const { temporal } = svc +// if (!temporal) throw new Error('Temporal client not initialized') + +// try { +// await temporal.schedule.create({ +// scheduleId: 'maven-non-critical', +// spec: { +// cronExpressions: ['*/10 * * * *'], +// }, +// policies: { +// overlap: ScheduleOverlapPolicy.SKIP, +// catchupWindow: '1 hour', +// }, +// action: { +// type: 'startWorkflow', +// workflowType: mavenNonCriticalWorkflow, +// taskQueue: 'packages-worker', +// workflowExecutionTimeout: '5 minutes', +// retry: { +// initialInterval: '30 seconds', +// backoffCoefficient: 2, +// maximumAttempts: 3, +// }, +// args: [], +// }, +// }) +// } catch (err) { +// if (err instanceof ScheduleAlreadyRunning) { +// svc.log.info('Schedule maven-non-critical already registered.') +// } else { +// throw err +// } +// } +// } diff --git a/services/apps/packages_worker/src/maven/workflows.ts b/services/apps/packages_worker/src/maven/workflows.ts new file mode 100644 index 0000000000..223cee6701 --- /dev/null +++ b/services/apps/packages_worker/src/maven/workflows.ts @@ -0,0 +1,19 @@ +import { proxyActivities } from '@temporalio/workflow' + +import type * as activities from './activities' + +const { processMavenCriticalBatch } = proxyActivities({ + startToCloseTimeout: '15 minutes', +}) + +const { processMavenNonCriticalBatch } = proxyActivities({ + startToCloseTimeout: '5 minutes', +}) + +export async function mavenCriticalWorkflow(): Promise { + await processMavenCriticalBatch() +} + +export async function mavenNonCriticalWorkflow(): Promise { + await processMavenNonCriticalBatch() +} diff --git a/services/apps/packages_worker/src/workflows/index.ts b/services/apps/packages_worker/src/workflows/index.ts index 5dc712e8df..88471a3f28 100644 --- a/services/apps/packages_worker/src/workflows/index.ts +++ b/services/apps/packages_worker/src/workflows/index.ts @@ -10,3 +10,4 @@ export { } from '../deps-dev/workflows' export { npmHello } from '../npm/workflows' export { osvSync } from '../osv/workflows' +export { mavenCriticalWorkflow, mavenNonCriticalWorkflow } from '../maven/workflows' diff --git a/services/libs/data-access-layer/src/index.ts b/services/libs/data-access-layer/src/index.ts index 3c319ac5e0..ab7c60db59 100644 --- a/services/libs/data-access-layer/src/index.ts +++ b/services/libs/data-access-layer/src/index.ts @@ -17,5 +17,7 @@ export * from './maintainers' export * from './packages' export * from './project-catalog' export * from './osspckgs/ingestJobs' +export * from './osspckgs/maintainers' export * from './osspckgs/packages' export * from './osspckgs/repos' +export * from './osspckgs/versions' diff --git a/services/libs/data-access-layer/src/osspckgs/index.ts b/services/libs/data-access-layer/src/osspckgs/index.ts new file mode 100644 index 0000000000..d235aa9713 --- /dev/null +++ b/services/libs/data-access-layer/src/osspckgs/index.ts @@ -0,0 +1,5 @@ +export * from './types' +export * from './packages' +export * from './maintainers' +export * from './versions' +export * from './repos' diff --git a/services/libs/data-access-layer/src/osspckgs/maintainers.ts b/services/libs/data-access-layer/src/osspckgs/maintainers.ts new file mode 100644 index 0000000000..ab8d7fa953 --- /dev/null +++ b/services/libs/data-access-layer/src/osspckgs/maintainers.ts @@ -0,0 +1,80 @@ +import { QueryExecutor } from '../queryExecutor' + +import { IDbMaintainerUpsert, IDbPackageMaintainerUpsert } from './types' + +/** + * Inserts or updates a maintainer row. + * Returns the maintainer id and the list of fields that actually changed. + */ +export async function upsertMaintainer( + qx: QueryExecutor, + item: IDbMaintainerUpsert, +): Promise<{ id: number; changedFields: string[] }> { + const row = await qx.selectOne( + ` + WITH old AS ( + SELECT display_name, url, email_hash + FROM maintainers WHERE ecosystem = $(ecosystem) AND username = $(username) + ), + ins AS ( + INSERT INTO maintainers (ecosystem, username, display_name, url, email_hash) + VALUES ($(ecosystem), $(username), $(displayName), $(url), $(emailHash)) + ON CONFLICT (ecosystem, username) DO UPDATE SET + display_name = COALESCE(EXCLUDED.display_name, maintainers.display_name), + url = COALESCE(EXCLUDED.url, maintainers.url), + email_hash = COALESCE(EXCLUDED.email_hash, maintainers.email_hash) + RETURNING id, display_name, url, email_hash + ) + SELECT ins.id, + array_remove(ARRAY[ + CASE WHEN o.display_name IS DISTINCT FROM ins.display_name THEN 'maintainers.display_name' END, + CASE WHEN o.url IS DISTINCT FROM ins.url THEN 'maintainers.url' END, + CASE WHEN o.email_hash IS DISTINCT FROM ins.email_hash THEN 'maintainers.email_hash' END + ], NULL) AS changed_fields + FROM ins LEFT JOIN old o ON true + `, + item, + ) + return { id: row.id as number, changedFields: row.changed_fields as string[] } +} + +/** + * Replaces all maintainer links for a package with the given list. + * Deletes links that are no longer present and inserts/updates new ones. + * Returns the list of fields that changed (additions, removals, role changes). + */ +export async function replacePackageMaintainers( + qx: QueryExecutor, + packageId: number, + links: Array>, +): Promise { + const before: Array<{ maintainer_id: number; role: string | null }> = await qx.select( + `SELECT maintainer_id, role FROM package_maintainers WHERE package_id = $(packageId)`, + { packageId }, + ) + const beforeMap = new Map(before.map((r) => [r.maintainer_id, r.role])) + + await qx.result(`DELETE FROM package_maintainers WHERE package_id = $(packageId)`, { packageId }) + + const afterMap = new Map() + for (const { maintainerId, role } of links) { + await qx.result( + `INSERT INTO package_maintainers (package_id, maintainer_id, role) + VALUES ($(packageId), $(maintainerId), $(role)) + ON CONFLICT (package_id, maintainer_id) DO UPDATE SET role = EXCLUDED.role`, + { packageId, maintainerId, role }, + ) + afterMap.set(maintainerId, role) + } + + const changed = new Set() + for (const id of beforeMap.keys()) { + if (!afterMap.has(id)) changed.add('package_maintainers.maintainer_id') + } + for (const [id, role] of afterMap) { + if (!beforeMap.has(id)) changed.add('package_maintainers.maintainer_id') + else if (beforeMap.get(id) !== role) changed.add('package_maintainers.role') + } + + return Array.from(changed) +} diff --git a/services/libs/data-access-layer/src/osspckgs/packages.ts b/services/libs/data-access-layer/src/osspckgs/packages.ts index 124ef74fcc..344917b79e 100644 --- a/services/libs/data-access-layer/src/osspckgs/packages.ts +++ b/services/libs/data-access-layer/src/osspckgs/packages.ts @@ -1,5 +1,7 @@ import { QueryExecutor } from '../queryExecutor' +import { IDbPackageUniverse, IDbPackageUpsert } from './types' + export async function findPackageIdsByPurl( qx: QueryExecutor, purls: string[], @@ -10,3 +12,244 @@ export async function findPackageIdsByPurl( }) return new Map(rows.map((r: { purl: string; id: number }) => [r.purl, r.id])) } +// ─── packages_universe ──────────────────────────────────────────────────────── + +/** + * Carries everything the Maven enrichment path needs to sync a package. + * For the Tier 2 (critical) path these fields come from `packages`; the disabled + * non-critical path reads the same shape from `packages_universe`. + */ +export type MavenPackageToSync = Pick< + IDbPackageUniverse, + | 'id' + | 'namespace' + | 'name' + | 'criticalityScore' + | 'dependentPackagesCount' + | 'dependentReposCount' +> & { + purl: string + latestVersion: string | null +} + +// ingestion_source values this worker writes once it has attempted a package +// (success or a terminal outcome). A `packages` row carrying any other value — +// e.g. the marker the criticality worker sets when it promotes a package to +// Tier 2 — has never been POM-enriched, so we pick it up immediately instead of +// waiting for the staleness window. Errors/skips re-run only once stale, so a +// broken package isn't retried every pass. +const MAVEN_WORKER_OUTCOMES = [ + 'maven-registry', + 'maven_error', + 'maven_not_on_central', + 'maven_no_version', +] + +/** + * Returns a page of Maven packages that need syncing via POM extraction. + * + * isCritical=true → Tier 2: reads from `packages` (populated by the criticality + * worker, which writes ingestion_source + last_synced_at). + * A row is due when it hasn't been POM-enriched yet, or is + * stale by refreshDays. Ordered by criticality_score. + * isCritical=false → disabled non-critical path: reads from `packages_universe`. + * Kept for reference only — the universe→packages copy is owned + * by the criticality worker and this path is not scheduled. + */ +export async function listMavenPackagesToSync( + qx: QueryExecutor, + options: { limit: number; refreshDays: number; isCritical: boolean }, +): Promise { + const { limit, refreshDays, isCritical } = options + + if (isCritical) { + return qx.select( + ` + SELECT + p.id, + p.purl, + p.namespace, + p.name, + p.criticality_score AS "criticalityScore", + p.dependent_count AS "dependentPackagesCount", + p.dependent_repos_count AS "dependentReposCount", + p.latest_version AS "latestVersion" + FROM packages p + WHERE + p.ecosystem = 'maven' + AND p.is_critical + AND p.namespace IS NOT NULL + AND ( + p.ingestion_source IS NULL + OR p.ingestion_source <> ALL($(workerOutcomes)::text[]) + OR p.last_synced_at < NOW() - ($(refreshDays) || ' days')::interval + ) + ORDER BY + p.criticality_score DESC NULLS LAST, + p.id ASC + LIMIT $(limit) + `, + { limit, refreshDays, workerOutcomes: MAVEN_WORKER_OUTCOMES }, + ) + } + + // Disabled non-critical path — reads from packages_universe (not scheduled). + return qx.select( + ` + SELECT + pu.id, + pu.purl, + pu.namespace, + pu.name, + pu.criticality_score AS "criticalityScore", + pu.dependent_count AS "dependentPackagesCount", + pu.dependent_repos_count AS "dependentReposCount", + p.latest_version AS "latestVersion" + FROM packages_universe pu + LEFT JOIN packages p ON p.purl = pu.purl + WHERE + pu.ecosystem = 'maven' + AND pu.is_critical = false + AND pu.purl IS NOT NULL + AND pu.namespace IS NOT NULL + AND ( + p.purl IS NULL + OR p.last_synced_at < NOW() - ($(refreshDays) || ' days')::interval + ) + ORDER BY + pu.rank_in_ecosystem ASC NULLS LAST, + pu.id ASC + LIMIT $(limit) + `, + { limit, refreshDays }, + ) +} + +// ─── packages touch ─────────────────────────────────────────────────────────── + +/** + * Bumps last_synced_at without re-fetching POM data. + * Used when the upstream version is unchanged — avoids a full extraction pass + * while keeping the staleness timer fresh and syncing latest universe metrics. + */ +export async function touchPackageSyncedAt( + qx: QueryExecutor, + purl: string, + metrics: { + criticalityScore: number | null | undefined + dependentPackagesCount: number | null | undefined + dependentReposCount: number | null | undefined + }, +): Promise { + await qx.result( + ` + UPDATE packages SET + last_synced_at = NOW(), + criticality_score = COALESCE($(criticalityScore), criticality_score), + dependent_count = COALESCE($(dependentPackagesCount), dependent_count), + dependent_repos_count = COALESCE($(dependentReposCount), dependent_repos_count) + WHERE purl = $(purl) + `, + { + purl, + criticalityScore: metrics.criticalityScore ?? null, + dependentPackagesCount: metrics.dependentPackagesCount ?? null, + dependentReposCount: metrics.dependentReposCount ?? null, + }, + ) +} + +// ─── audit ──────────────────────────────────────────────────────────────────── + +export async function logAuditFieldChange( + qx: QueryExecutor, + worker: string, + purl: string, + changedFields: string[], +): Promise { + if (changedFields.length === 0) return + await qx.result( + `INSERT INTO audit_field_changes (worker, purl, changed_fields) VALUES ($(worker), $(purl), $(changedFields)::text[])`, + { worker, purl, changedFields }, + ) +} + +// ─── packages upsert ────────────────────────────────────────────────────────── + +/** + * Inserts or updates a row in `packages`. + * Returns the id and the list of fields that actually changed value. + */ +export async function upsertPackage( + qx: QueryExecutor, + item: IDbPackageUpsert, +): Promise<{ id: number; changedFields: string[] }> { + const row = await qx.selectOne( + ` + WITH old AS ( + SELECT description, homepage, registry_url, declared_repository_url, repository_url, + licenses, licenses_raw, latest_version, versions_count, latest_release_at, ingestion_source + FROM packages WHERE purl = $(purl) + ), + ins AS ( + INSERT INTO packages ( + purl, ecosystem, namespace, name, + description, homepage, registry_url, declared_repository_url, repository_url, + licenses, licenses_raw, latest_version, versions_count, latest_release_at, + criticality_score, dependent_count, dependent_repos_count, + ingestion_source, last_synced_at + ) VALUES ( + $(purl), $(ecosystem), $(namespace), $(name), + $(description), $(homepage), $(registryUrl), $(declaredRepositoryUrl), $(repositoryUrl), + $(licenses)::text[], $(licensesRaw), $(latestVersion), $(versionsCount), $(latestReleaseAt), + $(criticalityScore), $(dependentPackagesCount), $(dependentReposCount), + $(ingestionSource), NOW() + ) + ON CONFLICT (purl) DO UPDATE SET + description = COALESCE(EXCLUDED.description, packages.description), + homepage = COALESCE(EXCLUDED.homepage, packages.homepage), + registry_url = COALESCE(EXCLUDED.registry_url, packages.registry_url), + declared_repository_url = COALESCE(EXCLUDED.declared_repository_url, packages.declared_repository_url), + repository_url = COALESCE(EXCLUDED.repository_url, packages.repository_url), + licenses = COALESCE(EXCLUDED.licenses, packages.licenses), + licenses_raw = COALESCE(EXCLUDED.licenses_raw, packages.licenses_raw), + latest_version = COALESCE(EXCLUDED.latest_version, packages.latest_version), + versions_count = COALESCE(EXCLUDED.versions_count, packages.versions_count), + latest_release_at = COALESCE(EXCLUDED.latest_release_at, packages.latest_release_at), + criticality_score = COALESCE(EXCLUDED.criticality_score, packages.criticality_score), + dependent_count = COALESCE(EXCLUDED.dependent_count, packages.dependent_count), + dependent_repos_count = COALESCE(EXCLUDED.dependent_repos_count, packages.dependent_repos_count), + ingestion_source = EXCLUDED.ingestion_source, + last_synced_at = NOW() + RETURNING id, description, homepage, registry_url, declared_repository_url, repository_url, + licenses, licenses_raw, latest_version, versions_count, latest_release_at, ingestion_source + ) + SELECT ins.id, + array_remove(ARRAY[ + CASE WHEN o.description IS DISTINCT FROM ins.description THEN 'packages.description' END, + CASE WHEN o.homepage IS DISTINCT FROM ins.homepage THEN 'packages.homepage' END, + CASE WHEN o.registry_url IS DISTINCT FROM ins.registry_url THEN 'packages.registry_url' END, + CASE WHEN o.declared_repository_url IS DISTINCT FROM ins.declared_repository_url THEN 'packages.declared_repository_url' END, + CASE WHEN o.repository_url IS DISTINCT FROM ins.repository_url THEN 'packages.repository_url' END, + CASE WHEN o.licenses IS DISTINCT FROM ins.licenses THEN 'packages.licenses' END, + CASE WHEN o.licenses_raw IS DISTINCT FROM ins.licenses_raw THEN 'packages.licenses_raw' END, + CASE WHEN o.latest_version IS DISTINCT FROM ins.latest_version THEN 'packages.latest_version' END, + CASE WHEN o.versions_count IS DISTINCT FROM ins.versions_count THEN 'packages.versions_count' END, + CASE WHEN o.latest_release_at IS DISTINCT FROM ins.latest_release_at THEN 'packages.latest_release_at' END, + CASE WHEN o.ingestion_source IS DISTINCT FROM ins.ingestion_source THEN 'packages.ingestion_source' END + ], NULL) AS changed_fields + FROM ins LEFT JOIN old o ON true + `, + { + ...item, + registryUrl: item.registryUrl ?? null, + repositoryUrl: item.repositoryUrl ?? null, + versionsCount: item.versionsCount ?? null, + latestReleaseAt: item.latestReleaseAt ?? null, + criticalityScore: item.criticalityScore ?? null, + dependentPackagesCount: item.dependentPackagesCount ?? null, + dependentReposCount: item.dependentReposCount ?? null, + }, + ) + return { id: row.id as number, changedFields: row.changed_fields as string[] } +} diff --git a/services/libs/data-access-layer/src/osspckgs/repos.ts b/services/libs/data-access-layer/src/osspckgs/repos.ts index c06bfe5216..a85d521e3f 100644 --- a/services/libs/data-access-layer/src/osspckgs/repos.ts +++ b/services/libs/data-access-layer/src/osspckgs/repos.ts @@ -1,5 +1,7 @@ import { QueryExecutor } from '../queryExecutor' +import { IDbPackageRepoUpsert, IDbRepoUpsert } from './types' + export async function findRepoIdsByUrl( qx: QueryExecutor, urls: string[], @@ -8,3 +10,62 @@ export async function findRepoIdsByUrl( const rows = await qx.select(`SELECT id, url FROM repos WHERE url = ANY($(urls))`, { urls }) return new Map(rows.map((r: { url: string; id: number }) => [r.url, r.id])) } + +/** + * Inserts or updates a repo row keyed on url. + * Uses COALESCE so richer data from other enrichers (GitHub, deps.dev) is never + * overwritten with nulls from a partial write. + * Returns the repo id. + */ +export async function upsertRepo(qx: QueryExecutor, item: IDbRepoUpsert): Promise { + const row = await qx.selectOne( + ` + INSERT INTO repos (url, host, owner, name, last_synced_at) + VALUES ($(url), $(host), $(owner), $(name), NOW()) + ON CONFLICT (url) DO UPDATE SET + host = COALESCE(EXCLUDED.host, repos.host), + owner = COALESCE(EXCLUDED.owner, repos.owner), + name = COALESCE(EXCLUDED.name, repos.name), + last_synced_at = NOW() + RETURNING id + `, + item, + ) + return row.id as number +} + +/** + * Links a package to a repo with provenance metadata. + * On conflict keeps the higher confidence value and refreshes verified_at. + * Returns the list of fields that actually changed. + */ +export async function upsertPackageRepo( + qx: QueryExecutor, + item: IDbPackageRepoUpsert, +): Promise { + const row: { changed_fields: string[] } = await qx.selectOne( + ` + WITH old AS ( + SELECT source, confidence FROM package_repos + WHERE package_id = $(packageId) AND repo_id = $(repoId) + ), + ins AS ( + INSERT INTO package_repos (package_id, repo_id, source, confidence, verified_at) + VALUES ($(packageId), $(repoId), $(source), $(confidence), NOW()) + ON CONFLICT (package_id, repo_id) DO UPDATE SET + confidence = GREATEST(EXCLUDED.confidence, package_repos.confidence), + verified_at = NOW() + RETURNING source, confidence + ) + SELECT array_remove(ARRAY[ + CASE WHEN o.source IS NULL THEN 'package_repos.repo_id' END, + CASE WHEN o.source IS NULL THEN 'package_repos.source' END, + CASE WHEN o.source IS NULL + OR o.confidence IS DISTINCT FROM ins.confidence THEN 'package_repos.confidence' END + ], NULL) AS changed_fields + FROM ins LEFT JOIN old o ON true + `, + item, + ) + return row.changed_fields +} diff --git a/services/libs/data-access-layer/src/osspckgs/types.ts b/services/libs/data-access-layer/src/osspckgs/types.ts new file mode 100644 index 0000000000..2a2545f697 --- /dev/null +++ b/services/libs/data-access-layer/src/osspckgs/types.ts @@ -0,0 +1,87 @@ +// ─── packages_universe ──────────────────────────────────────────────────────── + +export interface IDbPackageUniverse { + id: string + purl: string | null + ecosystem: string + namespace: string | null + name: string + rankInEcosystem: number | null + isCritical: boolean + criticalityScore: number | null + dependentPackagesCount: number | null + dependentReposCount: number | null + downloads30d: bigint | null +} + +// ─── packages ───────────────────────────────────────────────────────────────── + +export type IDbPackageUpsert = { + purl: string + ecosystem: string + namespace: string | null + name: string + description: string | null + homepage: string | null + declaredRepositoryUrl: string | null + licenses: string[] | null + licensesRaw: string | null + latestVersion: string | null + versionsCount?: number | null + latestReleaseAt?: Date | null + ingestionSource: string + criticalityScore?: number | null + dependentPackagesCount?: number | null + dependentReposCount?: number | null + registryUrl?: string | null + repositoryUrl?: string | null +} + +// ─── maintainers ────────────────────────────────────────────────────────────── + +export type IDbMaintainerUpsert = { + ecosystem: string + username: string + displayName: string | null + url: string | null + emailHash: string | null +} + +// ─── package_maintainers ────────────────────────────────────────────────────── + +export type IDbPackageMaintainerUpsert = { + packageId: number + maintainerId: number + role: 'author' | 'maintainer' | null +} + +// ─── versions ───────────────────────────────────────────────────────────────── + +export type IDbVersionUpsert = { + packageId: number + ecosystem: string + namespace: string | null + name: string + number: string + isLatest: boolean + isPrerelease: boolean + license: string | null +} + +// ─── repos ──────────────────────────────────────────────────────────────────── + +export type IDbRepoUpsert = { + url: string + host: string | null + owner: string | null + name: string | null +} + +// ─── package_repos ──────────────────────────────────────────────────────────── + +export type IDbPackageRepoUpsert = { + packageId: number + repoId: number + source: 'declared' | 'deps_dev' | 'heuristic' | 'manual' + confidence: number +} diff --git a/services/libs/data-access-layer/src/osspckgs/versions.ts b/services/libs/data-access-layer/src/osspckgs/versions.ts new file mode 100644 index 0000000000..6ea6e46f51 --- /dev/null +++ b/services/libs/data-access-layer/src/osspckgs/versions.ts @@ -0,0 +1,89 @@ +import { QueryExecutor } from '../queryExecutor' + +import { IDbVersionUpsert } from './types' + +/** + * Bulk-upserts a list of versions for a single package. All elements must share + * the same packageId — throws otherwise (the change-detection logic assumes it). + * Uses UNNEST arrays to avoid N individual round-trips. + * On conflict (package_id, number) updates is_latest, is_prerelease, and + * licenses (never overwrites an existing licenses array with NULL). + * The per-version `license` input is stored as a single-element text[] in the + * `licenses` column (the schema is an array to match packages.licenses). + * Returns the list of fields that actually changed across all versions. + */ +export async function upsertVersionsBatch( + qx: QueryExecutor, + versions: IDbVersionUpsert[], +): Promise { + if (versions.length === 0) return [] + + // This function operates on a single package: `old` reads by a scalar packageId and + // the changed-fields join keys on `number` alone, so mixing packageIds would silently + // corrupt the result. Enforce the invariant rather than rely on the caller. + const packageId = versions[0].packageId + if (versions.some((v) => v.packageId !== packageId)) { + throw new Error('upsertVersionsBatch: all versions must belong to the same package') + } + + // maven-metadata.xml sometimes contains duplicate version strings — deduplicate + // by number before inserting to avoid "ON CONFLICT DO UPDATE command cannot affect + // row a second time" from PostgreSQL + const seen = new Set() + versions = versions.filter((v) => { + if (seen.has(v.number)) return false + seen.add(v.number) + return true + }) + + const row: { changed_fields: string[] } = await qx.selectOne( + ` + WITH old AS ( + SELECT number, is_latest, is_prerelease, licenses + FROM versions + WHERE package_id = $(packageId)::bigint AND number = ANY($(numbers)::text[]) + ), + ins AS ( + INSERT INTO versions (package_id, ecosystem, namespace, name, number, is_latest, is_prerelease, licenses, last_synced_at) + SELECT + $(packageId)::bigint, t.ecosystem, t.namespace, t.name, t.number, t.is_latest, t.is_prerelease, + CASE WHEN t.license IS NULL THEN NULL ELSE ARRAY[t.license] END, + NOW() + FROM UNNEST( + $(ecosystems)::text[], + $(namespaces)::text[], + $(names)::text[], + $(numbers)::text[], + $(isLatests)::bool[], + $(isPreleases)::bool[], + $(licenses)::text[] + ) AS t(ecosystem, namespace, name, number, is_latest, is_prerelease, license) + ON CONFLICT (package_id, number) DO UPDATE SET + namespace = COALESCE(EXCLUDED.namespace, versions.namespace), + is_latest = EXCLUDED.is_latest, + is_prerelease = EXCLUDED.is_prerelease, + licenses = COALESCE(EXCLUDED.licenses, versions.licenses), + last_synced_at = NOW() + RETURNING number, is_latest, is_prerelease, licenses + ) + SELECT array_remove(ARRAY[ + CASE WHEN bool_or(o.number IS NULL) THEN 'versions.number' END, + CASE WHEN bool_or(o.is_latest IS DISTINCT FROM ins.is_latest) THEN 'versions.is_latest' END, + CASE WHEN bool_or(o.is_prerelease IS DISTINCT FROM ins.is_prerelease) THEN 'versions.is_prerelease' END, + CASE WHEN bool_or(o.licenses IS DISTINCT FROM ins.licenses) THEN 'versions.licenses' END + ], NULL) AS changed_fields + FROM ins LEFT JOIN old o ON o.number = ins.number +`, + { + packageId, + ecosystems: versions.map((v) => v.ecosystem), + namespaces: versions.map((v) => v.namespace), + names: versions.map((v) => v.name), + numbers: versions.map((v) => v.number), + isLatests: versions.map((v) => v.isLatest), + isPreleases: versions.map((v) => v.isPrerelease), + licenses: versions.map((v) => v.license), + }, + ) + return row.changed_fields +}