|
11 | 11 | import org.junit.jupiter.api.Test; |
12 | 12 | import software.amazon.lambda.durable.config.CompletionConfig; |
13 | 13 | import software.amazon.lambda.durable.config.MapConfig; |
| 14 | +import software.amazon.lambda.durable.config.WaitForConditionConfig; |
14 | 15 | import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus; |
15 | 16 | import software.amazon.lambda.durable.model.ExecutionStatus; |
16 | 17 | import software.amazon.lambda.durable.model.MapResult; |
| 18 | +import software.amazon.lambda.durable.model.WaitForConditionResult; |
| 19 | +import software.amazon.lambda.durable.retry.WaitStrategies; |
17 | 20 | import software.amazon.lambda.durable.testing.LocalDurableTestRunner; |
18 | 21 |
|
19 | 22 | class MapIntegrationTest { |
@@ -890,6 +893,362 @@ void testMapWithNullResults() { |
890 | 893 | assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); |
891 | 894 | } |
892 | 895 |
|
| 896 | + // ---- 50-item map tests with waitForCallback ---- |
| 897 | + |
| 898 | + @Test |
| 899 | + void testMap50ItemsWithWaitForCallback() { |
| 900 | + var itemCount = 50; |
| 901 | + var items = new ArrayList<Integer>(); |
| 902 | + for (int i = 0; i < itemCount; i++) { |
| 903 | + items.add(i); |
| 904 | + } |
| 905 | + |
| 906 | + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
| 907 | + var result = context.map("50-callbacks", items, String.class, (item, index, ctx) -> { |
| 908 | + return ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {}); |
| 909 | + }); |
| 910 | + |
| 911 | + assertTrue(result.allSucceeded()); |
| 912 | + assertEquals(itemCount, result.size()); |
| 913 | + return String.valueOf(result.succeeded().size()); |
| 914 | + }); |
| 915 | + |
| 916 | + // First run — all items create callbacks and suspend |
| 917 | + var result = runner.run("test"); |
| 918 | + assertEquals(ExecutionStatus.PENDING, result.getStatus()); |
| 919 | + |
| 920 | + // Complete all 50 callbacks |
| 921 | + for (int i = 0; i < itemCount; i++) { |
| 922 | + var callbackId = runner.getCallbackId("approval-" + i + "-callback"); |
| 923 | + assertNotNull(callbackId, "Callback ID should exist for approval-" + i); |
| 924 | + runner.completeCallback(callbackId, "\"result-" + i + "\""); |
| 925 | + } |
| 926 | + |
| 927 | + // Re-run — all callbacks resolved, execution completes |
| 928 | + result = runner.runUntilComplete("test"); |
| 929 | + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); |
| 930 | + assertEquals("50", result.getResult(String.class)); |
| 931 | + } |
| 932 | + |
| 933 | + @Test |
| 934 | + void testMap50ItemsWithWaitForCallback_maxConcurrency5() { |
| 935 | + var itemCount = 50; |
| 936 | + var items = new ArrayList<Integer>(); |
| 937 | + for (int i = 0; i < itemCount; i++) { |
| 938 | + items.add(i); |
| 939 | + } |
| 940 | + |
| 941 | + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
| 942 | + var config = MapConfig.builder().maxConcurrency(5).build(); |
| 943 | + var result = context.map( |
| 944 | + "50-callbacks-limited", |
| 945 | + items, |
| 946 | + String.class, |
| 947 | + (item, index, ctx) -> { |
| 948 | + return ctx.waitForCallback("cb-" + index, String.class, (callbackId, stepCtx) -> {}); |
| 949 | + }, |
| 950 | + config); |
| 951 | + |
| 952 | + assertTrue(result.allSucceeded()); |
| 953 | + return String.valueOf(result.succeeded().size()); |
| 954 | + }); |
| 955 | + |
| 956 | + // First run — suspends on callbacks |
| 957 | + var result = runner.run("test"); |
| 958 | + assertEquals(ExecutionStatus.PENDING, result.getStatus()); |
| 959 | + |
| 960 | + // Complete callbacks in batches, re-running between batches to let concurrency-limited items start |
| 961 | + for (int batch = 0; batch < 10; batch++) { |
| 962 | + var completed = false; |
| 963 | + for (int i = batch * 5; i < (batch + 1) * 5; i++) { |
| 964 | + var callbackId = runner.getCallbackId("cb-" + i + "-callback"); |
| 965 | + if (callbackId != null) { |
| 966 | + runner.completeCallback(callbackId, "\"ok-" + i + "\""); |
| 967 | + completed = true; |
| 968 | + } |
| 969 | + } |
| 970 | + if (completed) { |
| 971 | + result = runner.run("test"); |
| 972 | + if (result.getStatus() == ExecutionStatus.SUCCEEDED) break; |
| 973 | + } |
| 974 | + } |
| 975 | + |
| 976 | + result = runner.runUntilComplete("test"); |
| 977 | + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); |
| 978 | + assertEquals("50", result.getResult(String.class)); |
| 979 | + } |
| 980 | + |
| 981 | + @Test |
| 982 | + void testMap50ItemsWithWaitForCallback_partialFailure() { |
| 983 | + var itemCount = 50; |
| 984 | + var items = new ArrayList<Integer>(); |
| 985 | + for (int i = 0; i < itemCount; i++) { |
| 986 | + items.add(i); |
| 987 | + } |
| 988 | + |
| 989 | + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
| 990 | + var result = context.map("50-callbacks-partial-fail", items, String.class, (item, index, ctx) -> { |
| 991 | + return ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {}); |
| 992 | + }); |
| 993 | + |
| 994 | + assertEquals(itemCount, result.size()); |
| 995 | + assertEquals(25, result.succeeded().size()); |
| 996 | + assertEquals(25, result.failed().size()); |
| 997 | + assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason()); |
| 998 | + |
| 999 | + return result.succeeded().size() + "/" + result.failed().size(); |
| 1000 | + }); |
| 1001 | + |
| 1002 | + // First run — all items create callbacks and suspend |
| 1003 | + var result = runner.run("test"); |
| 1004 | + assertEquals(ExecutionStatus.PENDING, result.getStatus()); |
| 1005 | + |
| 1006 | + // Complete even-indexed callbacks, fail odd-indexed ones |
| 1007 | + for (int i = 0; i < itemCount; i++) { |
| 1008 | + var callbackId = runner.getCallbackId("approval-" + i + "-callback"); |
| 1009 | + assertNotNull(callbackId, "Callback ID should exist for approval-" + i); |
| 1010 | + if (i % 2 == 0) { |
| 1011 | + runner.completeCallback(callbackId, "\"ok-" + i + "\""); |
| 1012 | + } else { |
| 1013 | + runner.failCallback( |
| 1014 | + callbackId, |
| 1015 | + software.amazon.awssdk.services.lambda.model.ErrorObject.builder() |
| 1016 | + .errorType("Rejected") |
| 1017 | + .errorMessage("Item " + i + " rejected") |
| 1018 | + .build()); |
| 1019 | + } |
| 1020 | + } |
| 1021 | + |
| 1022 | + result = runner.runUntilComplete("test"); |
| 1023 | + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); |
| 1024 | + assertEquals("25/25", result.getResult(String.class)); |
| 1025 | + } |
| 1026 | + |
| 1027 | + @Test |
| 1028 | + void testMap50ItemsWithWaitForCallback_stepsBeforeAndAfterCallback() { |
| 1029 | + var itemCount = 50; |
| 1030 | + var items = new ArrayList<Integer>(); |
| 1031 | + for (int i = 0; i < itemCount; i++) { |
| 1032 | + items.add(i); |
| 1033 | + } |
| 1034 | + |
| 1035 | + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
| 1036 | + var result = context.map("50-callbacks-with-steps", items, String.class, (item, index, ctx) -> { |
| 1037 | + var before = ctx.step("prepare-" + index, String.class, stepCtx -> "prepared-" + index); |
| 1038 | + var approval = ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {}); |
| 1039 | + return ctx.step("finalize-" + index, String.class, stepCtx -> before + ":" + approval + ":done"); |
| 1040 | + }); |
| 1041 | + |
| 1042 | + assertTrue(result.allSucceeded()); |
| 1043 | + return String.valueOf(result.succeeded().size()); |
| 1044 | + }); |
| 1045 | + |
| 1046 | + // First run — items execute prepare step, create callbacks, suspend |
| 1047 | + var result = runner.run("test"); |
| 1048 | + assertEquals(ExecutionStatus.PENDING, result.getStatus()); |
| 1049 | + |
| 1050 | + // Complete all callbacks |
| 1051 | + for (int i = 0; i < itemCount; i++) { |
| 1052 | + var callbackId = runner.getCallbackId("approval-" + i + "-callback"); |
| 1053 | + assertNotNull(callbackId, "Callback ID should exist for approval-" + i); |
| 1054 | + runner.completeCallback(callbackId, "\"approved-" + i + "\""); |
| 1055 | + } |
| 1056 | + |
| 1057 | + // Re-run — callbacks resolved, finalize steps execute |
| 1058 | + result = runner.runUntilComplete("test"); |
| 1059 | + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); |
| 1060 | + assertEquals("50", result.getResult(String.class)); |
| 1061 | + } |
| 1062 | + |
| 1063 | + // ---- 50-item map tests with waitForCondition ---- |
| 1064 | + |
| 1065 | + @Test |
| 1066 | + void testMap50ItemsWithWaitForCondition() { |
| 1067 | + var itemCount = 50; |
| 1068 | + var items = new ArrayList<Integer>(); |
| 1069 | + for (int i = 0; i < itemCount; i++) { |
| 1070 | + items.add(i); |
| 1071 | + } |
| 1072 | + var checkCounts = new AtomicInteger(0); |
| 1073 | + |
| 1074 | + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
| 1075 | + var result = context.map("50-conditions", items, Integer.class, (item, index, ctx) -> { |
| 1076 | + var targetChecks = (index % 3) + 1; // 1, 2, or 3 checks to complete |
| 1077 | + var strategy = WaitStrategies.<Integer>fixedDelay(10, Duration.ofSeconds(1)); |
| 1078 | + var wfcConfig = WaitForConditionConfig.<Integer>builder() |
| 1079 | + .waitStrategy(strategy) |
| 1080 | + .build(); |
| 1081 | + |
| 1082 | + return ctx.waitForCondition( |
| 1083 | + "poll-" + index, |
| 1084 | + Integer.class, |
| 1085 | + (state, stepCtx) -> { |
| 1086 | + checkCounts.incrementAndGet(); |
| 1087 | + var next = (state == null ? 0 : state) + 1; |
| 1088 | + return next >= targetChecks |
| 1089 | + ? WaitForConditionResult.stopPolling(next) |
| 1090 | + : WaitForConditionResult.continuePolling(next); |
| 1091 | + }, |
| 1092 | + wfcConfig); |
| 1093 | + }); |
| 1094 | + |
| 1095 | + assertTrue(result.allSucceeded()); |
| 1096 | + assertEquals(itemCount, result.size()); |
| 1097 | + |
| 1098 | + var sum = 0; |
| 1099 | + for (int i = 0; i < result.size(); i++) { |
| 1100 | + sum += result.getResult(i); |
| 1101 | + } |
| 1102 | + return String.valueOf(sum); |
| 1103 | + }); |
| 1104 | + |
| 1105 | + var result = runner.runUntilComplete("test"); |
| 1106 | + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); |
| 1107 | + |
| 1108 | + // Each item completes after (index%3)+1 checks: 17 items need 1, 17 need 2, 16 need 3 |
| 1109 | + // Sum of results: 17*1 + 17*2 + 16*3 = 17 + 34 + 48 = 99 |
| 1110 | + assertEquals("99", result.getResult(String.class)); |
| 1111 | + assertTrue(checkCounts.get() >= itemCount, "Should have at least " + itemCount + " checks"); |
| 1112 | + } |
| 1113 | + |
| 1114 | + @Test |
| 1115 | + void testMap50ItemsWithWaitForCondition_someExceedMaxAttempts() { |
| 1116 | + var itemCount = 50; |
| 1117 | + var items = new ArrayList<Integer>(); |
| 1118 | + for (int i = 0; i < itemCount; i++) { |
| 1119 | + items.add(i); |
| 1120 | + } |
| 1121 | + |
| 1122 | + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
| 1123 | + var result = context.map("50-conditions-some-fail", items, Integer.class, (item, index, ctx) -> { |
| 1124 | + // Odd items: maxAttempts=1 but need 2 checks → will fail |
| 1125 | + // Even items: maxAttempts=5, need 2 checks → will succeed |
| 1126 | + var maxAttempts = (index % 2 == 0) ? 5 : 1; |
| 1127 | + var strategy = WaitStrategies.<Integer>fixedDelay(maxAttempts, Duration.ofSeconds(1)); |
| 1128 | + var wfcConfig = WaitForConditionConfig.<Integer>builder() |
| 1129 | + .waitStrategy(strategy) |
| 1130 | + .build(); |
| 1131 | + |
| 1132 | + return ctx.waitForCondition( |
| 1133 | + "poll-" + index, |
| 1134 | + Integer.class, |
| 1135 | + (state, stepCtx) -> { |
| 1136 | + var next = (state == null ? 0 : state) + 1; |
| 1137 | + return next >= 2 |
| 1138 | + ? WaitForConditionResult.stopPolling(next) |
| 1139 | + : WaitForConditionResult.continuePolling(next); |
| 1140 | + }, |
| 1141 | + wfcConfig); |
| 1142 | + }); |
| 1143 | + |
| 1144 | + assertEquals(itemCount, result.size()); |
| 1145 | + assertEquals(25, result.succeeded().size()); |
| 1146 | + assertEquals(25, result.failed().size()); |
| 1147 | + assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason()); |
| 1148 | + |
| 1149 | + return result.succeeded().size() + "/" + result.failed().size(); |
| 1150 | + }); |
| 1151 | + |
| 1152 | + var result = runner.runUntilComplete("test"); |
| 1153 | + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); |
| 1154 | + assertEquals("25/25", result.getResult(String.class)); |
| 1155 | + } |
| 1156 | + |
| 1157 | + @Test |
| 1158 | + void testMap50ItemsWithWaitForCondition_replay() { |
| 1159 | + var itemCount = 50; |
| 1160 | + var items = new ArrayList<Integer>(); |
| 1161 | + for (int i = 0; i < itemCount; i++) { |
| 1162 | + items.add(i); |
| 1163 | + } |
| 1164 | + var checkCounts = new AtomicInteger(0); |
| 1165 | + |
| 1166 | + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
| 1167 | + var result = context.map("50-conditions-replay", items, String.class, (item, index, ctx) -> { |
| 1168 | + var strategy = WaitStrategies.<Integer>fixedDelay(5, Duration.ofSeconds(1)); |
| 1169 | + var wfcConfig = WaitForConditionConfig.<Integer>builder() |
| 1170 | + .waitStrategy(strategy) |
| 1171 | + .build(); |
| 1172 | + |
| 1173 | + var polled = ctx.waitForCondition( |
| 1174 | + "poll-" + index, |
| 1175 | + Integer.class, |
| 1176 | + (state, stepCtx) -> { |
| 1177 | + checkCounts.incrementAndGet(); |
| 1178 | + return WaitForConditionResult.stopPolling(1); |
| 1179 | + }, |
| 1180 | + wfcConfig); |
| 1181 | + |
| 1182 | + return String.valueOf(polled); |
| 1183 | + }); |
| 1184 | + |
| 1185 | + assertTrue(result.allSucceeded()); |
| 1186 | + return "done"; |
| 1187 | + }); |
| 1188 | + |
| 1189 | + var result1 = runner.runUntilComplete("test"); |
| 1190 | + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); |
| 1191 | + var firstRunChecks = checkCounts.get(); |
| 1192 | + assertEquals(itemCount, firstRunChecks); |
| 1193 | + |
| 1194 | + // Replay — check functions should not re-execute |
| 1195 | + var result2 = runner.run("test"); |
| 1196 | + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); |
| 1197 | + assertEquals(firstRunChecks, checkCounts.get(), "Check functions should not re-execute on replay"); |
| 1198 | + } |
| 1199 | + |
| 1200 | + // ---- 50-item map tests mixing waitForCallback and waitForCondition ---- |
| 1201 | + |
| 1202 | + @Test |
| 1203 | + void testMap50ItemsMixed_callbackAndCondition() { |
| 1204 | + var itemCount = 50; |
| 1205 | + var items = new ArrayList<Integer>(); |
| 1206 | + for (int i = 0; i < itemCount; i++) { |
| 1207 | + items.add(i); |
| 1208 | + } |
| 1209 | + |
| 1210 | + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
| 1211 | + var result = context.map("50-mixed", items, String.class, (item, index, ctx) -> { |
| 1212 | + if (index % 2 == 0) { |
| 1213 | + // Even items: waitForCallback |
| 1214 | + return ctx.waitForCallback("cb-" + index, String.class, (callbackId, stepCtx) -> {}); |
| 1215 | + } else { |
| 1216 | + // Odd items: waitForCondition |
| 1217 | + var strategy = WaitStrategies.<Integer>fixedDelay(5, Duration.ofSeconds(1)); |
| 1218 | + var wfcConfig = WaitForConditionConfig.<Integer>builder() |
| 1219 | + .waitStrategy(strategy) |
| 1220 | + .build(); |
| 1221 | + |
| 1222 | + var polled = ctx.waitForCondition( |
| 1223 | + "poll-" + index, |
| 1224 | + Integer.class, |
| 1225 | + (state, stepCtx) -> WaitForConditionResult.stopPolling(index), |
| 1226 | + wfcConfig); |
| 1227 | + |
| 1228 | + return "polled-" + polled; |
| 1229 | + } |
| 1230 | + }); |
| 1231 | + |
| 1232 | + assertEquals(itemCount, result.size()); |
| 1233 | + return String.valueOf(result.succeeded().size()); |
| 1234 | + }); |
| 1235 | + |
| 1236 | + // First run — callback items suspend, condition items may complete |
| 1237 | + var result = runner.run("test"); |
| 1238 | + |
| 1239 | + // Complete all callback items (even-indexed) |
| 1240 | + for (int i = 0; i < itemCount; i += 2) { |
| 1241 | + var callbackId = runner.getCallbackId("cb-" + i + "-callback"); |
| 1242 | + if (callbackId != null) { |
| 1243 | + runner.completeCallback(callbackId, "\"callback-" + i + "\""); |
| 1244 | + } |
| 1245 | + } |
| 1246 | + |
| 1247 | + result = runner.runUntilComplete("test"); |
| 1248 | + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); |
| 1249 | + assertEquals("50", result.getResult(String.class)); |
| 1250 | + } |
| 1251 | + |
893 | 1252 | @Test |
894 | 1253 | void testMultipleMapAsyncInParallel() { |
895 | 1254 | var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { |
|
0 commit comments