Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy {
}

#weightedShuffle(weightedItems: WeightedEnv[]): string[] {
const totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0);
let totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0);
const result: string[] = [];
const items = [...weightedItems];

Expand All @@ -224,8 +224,11 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy {
}
index = Math.max(0, index - 1);

// Add selected item to result and remove from items
// Add selected item to result and remove from items. Decrement totalWeight
// so the next draw is scaled to the remaining items; otherwise random
// routinely overshoots the shrinking set and the tail item is over-picked.
result.push(items[index].envId);
totalWeight -= items[index].weight;
Comment thread
matt-aitken marked this conversation as resolved.
items.splice(index, 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,90 @@ describe("FairDequeuingStrategy", () => {
expect(queuesByEnv["env-1"]).toBeDefined();
expect(queuesByEnv["env-1"].length).toBe(2);
});

redisTest(
"weighted env shuffle stays uniform across every position for equal-weight envs",
async ({ redisOptions: redis }) => {
const keyProducer = new RunQueueFullKeyProducer();
// Biases must be non-zero so env ordering goes through the weighted shuffle
// path rather than the plain shuffle short-circuit.
const strategy = new FairQueueSelectionStrategy({
redis,
keys: keyProducer,
defaultEnvConcurrencyLimit: 10,
parentQueueLimit: 100,
seed: "weighted-shuffle-seed",
biases: {
concurrencyLimitBias: 0.75,
availableCapacityBias: 0.3,
queueAgeRandomization: 0,
},
});

const now = Date.now();

// Four envs, each its own org, one queue each. Identical concurrency limit
// and current usage means identical weights, so a correct weighted shuffle
// should land each env in each position equally often. Insertion order is
// alphabetical by org, which is the order the tail-overshoot bug skews by.
const envIds = ["env-1", "env-2", "env-3", "env-4"];
for (let i = 0; i < envIds.length; i++) {
const orgId = `org-${i + 1}`;
const projectId = `proj-${i + 1}`;
const envId = envIds[i];

await setupQueue({
redis,
keyProducer,
parentQueue: "parent-queue",
score: now - 1000,
queueId: `queue-${envId}`,
orgId,
projectId,
envId,
});

await setupConcurrency({
redis,
keyProducer,
env: { envId, projectId, orgId, currentConcurrency: 5, limit: 10 },
});
}

const iterations = 2000;
// positionCounts[position][envId] = times envId landed in that position
const positionCounts: Array<Record<string, number>> = envIds.map(() =>
Object.fromEntries(envIds.map((envId) => [envId, 0]))
);

for (let i = 0; i < iterations; i++) {
const envResult = await strategy.distributeFairQueuesFromParentQueue(
"parent-queue",
`consumer-${i % 3}`
);
const result = flattenResults(envResult);
expect(result).toHaveLength(envIds.length);

result.forEach((queueId, position) => {
const envId = keyProducer.envIdFromQueue(queueId);
positionCounts[position][envId]++;
});
}

// For equal-weight envs the share at every position should be ~1/N. The
// tail-overshoot bug leaves position 0 fair but skews later positions hard
// (one env far below, the tail env far above). Assert each share stays
// within 40% of uniform at every position, which the bug violates.
const expectedShare = 100 / envIds.length;
for (let position = 0; position < envIds.length; position++) {
for (const envId of envIds) {
const share = (positionCounts[position][envId] / iterations) * 100;
expect(share).toBeGreaterThan(expectedShare * 0.6);
expect(share).toBeLessThan(expectedShare * 1.4);
}
}
}
);
});

// Helper function to flatten results for counting
Expand Down