11import { and , eq } from 'drizzle-orm'
22import { type NextRequest , NextResponse } from 'next/server'
33import { createLogger } from '@/lib/logs/console/logger'
4- import { getOAuthToken } from '@/app/api/auth/oauth/utils'
4+ import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
55import { db } from '@/db'
6- import { webhook } from '@/db/schema'
6+ import { account , webhook } from '@/db/schema'
77
88const logger = createLogger ( 'WebhookUtils' )
99
@@ -860,6 +860,31 @@ export async function fetchAndProcessAirtablePayloads(
860860 return // Exit early
861861 }
862862
863+ // Require credentialId
864+ const credentialId : string | undefined = localProviderConfig . credentialId
865+ if ( ! credentialId ) {
866+ logger . error (
867+ `[${ requestId } ] Missing credentialId in providerConfig for Airtable webhook ${ webhookData . id } .`
868+ )
869+ return
870+ }
871+
872+ // Resolve owner and access token strictly via credentialId (no fallback)
873+ let ownerUserId : string | null = null
874+ try {
875+ const rows = await db . select ( ) . from ( account ) . where ( eq ( account . id , credentialId ) ) . limit ( 1 )
876+ ownerUserId = rows . length ? rows [ 0 ] . userId : null
877+ } catch ( _e ) {
878+ ownerUserId = null
879+ }
880+
881+ if ( ! ownerUserId ) {
882+ logger . error (
883+ `[${ requestId } ] Could not resolve owner for Airtable credential ${ credentialId } on webhook ${ webhookData . id } `
884+ )
885+ return
886+ }
887+
863888 // --- Retrieve Stored Cursor from localProviderConfig ---
864889 const storedCursor = localProviderConfig . externalWebhookCursor
865890
@@ -908,26 +933,25 @@ export async function fetchAndProcessAirtablePayloads(
908933 )
909934 }
910935
911- // --- Get OAuth Token ---
936+ // --- Get OAuth Token (strict via credentialId) ---
912937 let accessToken : string | null = null
913938 try {
914- accessToken = await getOAuthToken ( workflowData . userId , 'airtable' )
939+ accessToken = await refreshAccessTokenIfNeeded ( credentialId , ownerUserId , requestId )
915940 if ( ! accessToken ) {
916941 logger . error (
917- `[${ requestId } ] Failed to obtain valid Airtable access token. Cannot proceed.` ,
918- { userId : workflowData . userId }
942+ `[${ requestId } ] Failed to obtain valid Airtable access token via credential ${ credentialId } .`
919943 )
920944 throw new Error ( 'Airtable access token not found.' )
921945 }
922946
923947 logger . info ( `[${ requestId } ] Successfully obtained Airtable access token` )
924948 } catch ( tokenError : any ) {
925949 logger . error (
926- `[${ requestId } ] Failed to get Airtable OAuth token for user ${ workflowData . userId } ` ,
950+ `[${ requestId } ] Failed to get Airtable OAuth token for credential ${ credentialId } ` ,
927951 {
928952 error : tokenError . message ,
929953 stack : tokenError . stack ,
930- userId : workflowData . userId ,
954+ credentialId ,
931955 }
932956 )
933957 // Error logging handled by logging session
@@ -1102,7 +1126,7 @@ export async function fetchAndProcessAirtablePayloads(
11021126
11031127 // DEBUG: Log totals for this batch
11041128 logger . debug (
1105- `[${ requestId } ] TRACE: Processed ${ changeCount } changes in API call ${ apiCallCount } ` ,
1129+ `[${ requestId } ] TRACE: Processed ${ changeCount } changes in API call ${ apiCallCount } ) ` ,
11061130 {
11071131 currentMapSize : consolidatedChangesMap . size ,
11081132 }
@@ -1257,13 +1281,33 @@ export async function configureGmailPolling(
12571281 logger . info ( `[${ requestId } ] Setting up Gmail polling for webhook ${ webhookData . id } ` )
12581282
12591283 try {
1260- const accessToken = await getOAuthToken ( userId , 'google-email' )
1261- if ( ! accessToken ) {
1262- logger . error ( `[${ requestId } ] Failed to retrieve Gmail access token for user ${ userId } ` )
1284+ const providerConfig = ( webhookData . providerConfig as Record < string , any > ) || { }
1285+
1286+ const credentialId : string | undefined = providerConfig . credentialId
1287+ if ( ! credentialId ) {
1288+ logger . error (
1289+ `[${ requestId } ] Missing credentialId for Gmail webhook ${ webhookData . id } . Refusing to proceed.`
1290+ )
12631291 return false
12641292 }
12651293
1266- const providerConfig = ( webhookData . providerConfig as Record < string , any > ) || { }
1294+ // Resolve owner user ID from the credential and refresh token if needed
1295+ const rows = await db . select ( ) . from ( account ) . where ( eq ( account . id , credentialId ) ) . limit ( 1 )
1296+ if ( rows . length === 0 ) {
1297+ logger . error (
1298+ `[${ requestId } ] Credential ${ credentialId } not found for Gmail webhook ${ webhookData . id } `
1299+ )
1300+ return false
1301+ }
1302+ const ownerUserId = rows [ 0 ] . userId
1303+
1304+ const accessToken = await refreshAccessTokenIfNeeded ( credentialId , ownerUserId , requestId )
1305+ if ( ! accessToken ) {
1306+ logger . error (
1307+ `[${ requestId } ] Failed to refresh/access Gmail token for credential ${ credentialId } `
1308+ )
1309+ return false
1310+ }
12671311
12681312 const maxEmailsPerPoll =
12691313 typeof providerConfig . maxEmailsPerPoll === 'string'
@@ -1282,7 +1326,8 @@ export async function configureGmailPolling(
12821326 . set ( {
12831327 providerConfig : {
12841328 ...providerConfig ,
1285- userId, // Store user ID for OAuth access during polling
1329+ userId : ownerUserId ,
1330+ credentialId,
12861331 maxEmailsPerPoll,
12871332 pollingInterval,
12881333 markAsRead : providerConfig . markAsRead || false ,
@@ -1323,13 +1368,33 @@ export async function configureOutlookPolling(
13231368 logger . info ( `[${ requestId } ] Setting up Outlook polling for webhook ${ webhookData . id } ` )
13241369
13251370 try {
1326- const accessToken = await getOAuthToken ( userId , 'outlook' )
1327- if ( ! accessToken ) {
1328- logger . error ( `[${ requestId } ] Failed to retrieve Outlook access token for user ${ userId } ` )
1371+ const providerConfig = ( webhookData . providerConfig as Record < string , any > ) || { }
1372+
1373+ const credentialId : string | undefined = providerConfig . credentialId
1374+ if ( ! credentialId ) {
1375+ logger . error (
1376+ `[${ requestId } ] Missing credentialId for Outlook webhook ${ webhookData . id } . Refusing to proceed.`
1377+ )
13291378 return false
13301379 }
13311380
1332- const providerConfig = ( webhookData . providerConfig as Record < string , any > ) || { }
1381+ // Resolve owner user ID from the credential and refresh token if needed
1382+ const rows = await db . select ( ) . from ( account ) . where ( eq ( account . id , credentialId ) ) . limit ( 1 )
1383+ if ( rows . length === 0 ) {
1384+ logger . error (
1385+ `[${ requestId } ] Credential ${ credentialId } not found for Outlook webhook ${ webhookData . id } `
1386+ )
1387+ return false
1388+ }
1389+ const ownerUserId = rows [ 0 ] . userId
1390+
1391+ const accessToken = await refreshAccessTokenIfNeeded ( credentialId , ownerUserId , requestId )
1392+ if ( ! accessToken ) {
1393+ logger . error (
1394+ `[${ requestId } ] Failed to refresh/access Outlook token for credential ${ credentialId } `
1395+ )
1396+ return false
1397+ }
13331398
13341399 const maxEmailsPerPoll =
13351400 typeof providerConfig . maxEmailsPerPoll === 'string'
@@ -1348,7 +1413,8 @@ export async function configureOutlookPolling(
13481413 . set ( {
13491414 providerConfig : {
13501415 ...providerConfig ,
1351- userId, // Store user ID for OAuth access during polling
1416+ userId : ownerUserId ,
1417+ credentialId,
13521418 maxEmailsPerPoll,
13531419 pollingInterval,
13541420 markAsRead : providerConfig . markAsRead || false ,
0 commit comments