In simple terms, any integration flow usually needs to accomplish the following at a minimum:
Of course there are many ways it can get more complex. Information from the second API may need to be fetched to correctly transform the records. Sometimes the original records in the first API also need to be updated. A client’s individual use cases may add any number of details to your logic, but those three goals are the basic outline a flow often starts with.
Here at Pandium we limit our run times to 10 minutes to make a failed run less likely and to improve resiliency. So what happens when an installed integration suddenly needs to process more records than it can handle in one 10 minute run?
Let’s say you’ve written an integration flow that imports orders from OrderPlacementPavillion to FulfillmentsRUs. In load testing you learned your code can handle up to 1,000 orders in a single run.
The integration goes live, and you typically see 100 orders that need to be synced into FulfillmentsRUs during each 10 minute run. That’s well within the 1,000 order capacity, so there are no problems.
Then we get to Black Friday and suddenly there are 5,000 orders that need to be synced! Depending on how you’ve written the code a variety of things could happen:
So how do you make your flow dynamic?
You can avoid writing what we’ve dubbed the “worst code” by assuming that you won’t even be able to finish fetching all the records from the first API. This means you should process each page of records before fetching any more records.
Here are two ways to go about this.
A.) Processing records in batches.
There is a temptation to accomplish the first goal: Fetch Records from the first API by writing a function `fetchAllRecords` that will fetch page after page of records from the first API until there are no more. Then it will return one big array with all those records which you will loop over to accomplish the second and third goals. This strategy produces temptingly tidy code, but it leads to the ‘worst code’ situation above.
It’s better to accomplish the first goal by writing a function `fetchRecordsPage` which takes a page number and returns an array with just one page of records. Then take care of the second and third goals for all the records in that page before fetching the next page. Here’s a sketch of what code for that could look like this
const processRecord = async(record) {
const transformedRecord = transformRecord(record)
await sendRecordToSecondApi(transformedRecord)
}
let page = 1
while (true) {
const recordsBatch = await fetchRecordsPage(page)
if (recordsBatch.length === 0) break
for (record of recordsBatch) {
await processRecord(record)
}
page ++
}
B.) Using an async generator and feeding its results into p-map.
The async generator makes for cleaner code because it eliminates the need for the while loop that iterates over the pages of results.
P-map is a great way to make your code more efficient when the second API doesn’t have a bulk endpoint for the record you’re trying to process. If you’re able to use p-map version 5.5 or higher you can pass the results of an async generator directly into p-map’s call back function. This gives you the best of both worlds because you can write an async generator function called `generateRecords` which will fetch pages and add each page’s records to one big array of all records. At the same time the callback function you pass to p-map’s can start processing records which have already been added to that one big array:
const processRecord = async(record) {
const transformedRecord = transformRecord(record)
await sendRecordToSecondApi(transformedRecord)
}
const allRecords = generateAllRecords()
await pMap (allRecords, processRecord)
Either of these methods are a great improvement on the “worst code”, but if we left off here we’d just be stuck with the results of the “bad code,” which totally misses 4,000 orders!
When an integration flow is truly dynamic we assume it will not always be able to process all the records we fetch from the first API. That means we need to make sure the next run has a way to know which records have already been successfully processed. That way the next run will only attempt to fetch and work through unprocessed records; if the last run only gets half way through its work, the next run will be able to pick up where the last one left off.
Here are two ways to do this:
A.) After the second API has successfully received each transformed record, update the original record in the first API to indicate it has been synced to the second API. Then adjust `generateAllRecords` or `fetchRecordsPage` in either of the code sketches above to use a filter so that it only fetches unprocessed records from the first API.
The `processRecord` function would also need to invoke a new function:
const processRecord = async(record) {
const transformedRecord = transformRecord(record)
const response = await
sendRecordToSecondApi(transformedRecord)
await markRecordProcessedInFirstApi(record)
}
When this method is available, it is quite reliable, but there are several reasons it isn’t always an option:
It depends on the first API having a field that can be used to track whether a record has been processed.
It requires the first API to provide a means of filtering the record fetch based on the value of that field.
If efficiency is a driving factor, this may not be the best strategy because it will make the integration flow slower; processing each record with this method requires a call to both the first and second API, rather than just the second.
B.) Passing state (which we call context) from one run to another. At Pandium the last string console logged in a given run will be saved as the standard out for that integration instance. The next time the integration is run for a tenant the standard out from the last run can be accessed as an environmental variable.
Here are the key steps to use context to make an integration flow dynamic:
Always process records in an ascending order. Created date or id often work well to determine record order.
Use a `timeoutRecord` object to keep track of which records you have gotten to and which ones you have not gotten to.
At the end of a run use an `exitHandler` function to console log that information. Do this regardless of whether the run is timing out or whether all records were processed.
At the start of the next run consult the context provided by the last run. Only fetch records that have not been processed.
Here’s a rough sketch of how these four steps can be used with a record’s id field to make an integration flow dynamic.
// Step iii: Use an exitHandler function to log the timeoutRecord at the
end of the run. This will give the next run the highest record ID
processed in this run.
const exitHandler = (timeoutRecord) {
console.log(timeoutRecord)
}
const syncRecords = async () => {
// Step ii: Use a timeoutRecord object to keep track of the highest record ID
which was processed in this run.
const timeoutRecord = {
idStart = 0
}
// Step iii- The exitHandler will print the timeoutRecord to the
standardOut even if the run times out before processing all records.
Since a run will time out at 10 minutes, we will use setTimeout
to trigger a graceful exit at 9 minutes.
setTimeout(() => exitHandler(timeoutRecord), 90 * 60 * 1000)
// Step iv: Consult the context provided by the last run to determine
the highest record ID which it processed.
timeoutRecord.idStart =
process.env.PAN_CTX_LAST_SUCCESSFUL_RUN_STD_OUT.?idStart?? 0
const processRecord = async(record) {
const transformedRecord = transformRecord(record)
const response = await sendRecordToSecondApi(transformedRecord)
// Step ii: Always keep track of the record ID most recently processed.
So long as you follow step i and process the records in the order
of ascending id this will always be the highest record ID processed.
timeoutRecord.idStart = record.id
}
let page = 1
while (true) {
// Step i: The code for fetchRecordsPage is not shown here, but
we are assuming its results are in ascending order by record id.
Many API endpoints will accept a parameter to sort the results.
// Step iv: fetchRecordsPage now takes the idStart as a parameter so
that it only fetches records with an ID higher than the highest ID
processed in the last run.
const recordsBatch = await fetchRecordsPage(page, timeoutRecord.idStart)
if (recordsBatch.length === 0) break
for (record of recordsBatch) {
await processRecord(record)
}
page ++
}
// Step iii- the exitHandler will print the timeoutRecord to the standardOut
when we have finished processing all records without timing out.
exitHandler(timeoutRecord)
}
syncRecords().then(
() => {},
() => {
process.exitCode = 1
}
)
That’s how you make sure that all 5,000 orders get imported from OrderPlacementPavillion to FulfillmentsRUs even when your integration can only process 1,000 orders in each 10 minute run.