How to Write Dynamic Integrations to Handle Extreme Volumes

Good integrations work correctly in extreme conditions. Making an integration dynamic allows it to handle extreme volumes.
Written by
Liz Yoder, Software Engineer
Published on
November 26, 2024

In simple terms, any integration flow usually needs to accomplish the following at a minimum:

  • Fetch records from the first API.
  • Transform each record in some way.
  • Send the transformed record to the second API.

Of course there are many ways it can get more complex. Information from the second API may need to be fetched to correctly transform the records. Sometimes the original records in the first API also need to be updated. A client’s individual use cases may add any number of details to your logic, but those three goals are the basic outline a flow often starts with.

Here at Pandium we limit our run times to 10 minutes to make a failed run less likely and to improve resiliency. So what happens when an installed integration suddenly needs to process more records than it can handle in one 10 minute run?

Let’s say you’ve written an integration flow that imports orders from OrderPlacementPavillion to FulfillmentsRUs. In load testing you learned your code can handle up to 1,000 orders in a single run.  

The integration goes live, and you typically see 100 orders that need to be synced into FulfillmentsRUs during each 10 minute run. That’s well within the 1,000 order capacity, so there are no problems.  

Then we get to Black Friday and suddenly there are 5,000 orders that need to be synced! Depending on how you’ve written the code a variety of things could happen:

  • Worst Code: The integration tries to fetch all the orders from OrderPlacementPavillion before processing any of them, and it spends the entire run fetching page after page of orders without processing a single one.
  • Bad Code:  The integration processes orders as it fetches them, so it’s able to process the first 1,000 orders into FulfillmentsRUs. Unfortunately the next run isn’t given information about the other 4,000 orders. They fall through the cracks and never get imported.
  • Best Code: This flow is dynamic so it processes the first 1,000 orders on this run, and then it spends the next four runs working through the backlog of orders until it is caught up.  Runs after that go back to processing the typical volumes.

So how do you make your flow dynamic?

You can avoid writing what we’ve dubbed the “worst code” by assuming that you won’t even be able to finish fetching all the records from the first API. This means you should process each page of records before fetching any more records. 

Here are two ways to go about this.

A.) Processing records in batches.

There is a temptation to accomplish the first goal: Fetch Records from the first API by writing a function `fetchAllRecords` that will fetch page after page of records from the first API until there are no more. Then it will return one big array with all those records which you will loop over to accomplish the second and third goals. This strategy produces temptingly tidy code, but it leads to the ‘worst code’ situation above.  

It’s better to accomplish the first goal by writing a function `fetchRecordsPage` which takes a page number and returns an array with just one page of records. Then take care of the second and third goals for all the records in that page before fetching the next page. Here’s a sketch of what code for that could look like this

const processRecord =  async(record) {
 const transformedRecord = transformRecord(record)
  await sendRecordToSecondApi(transformedRecord)
  }
  
let page = 1
while (true)  {
  const recordsBatch = await fetchRecordsPage(page)

  if  (recordsBatch.length === 0) break
  
  for (record of recordsBatch) {
    await processRecord(record)
  }
  page ++
} 

B.) Using an async generator and feeding its results into p-map.  

The async generator makes for cleaner code because it eliminates the need for the while loop that iterates over the pages of results. 

P-map is a great way to make your code more efficient when the second API doesn’t have a bulk endpoint for the record you’re trying to process. If you’re able to use p-map version 5.5 or higher you can pass the results of an async generator directly into p-map’s call back function. This gives you the best of both worlds because you can write an async generator function called `generateRecords` which will fetch pages and add each page’s records to one big array of all records. At the same time the callback function you pass to p-map’s can start processing records which have already been added to that one big array:

const processRecord =  async(record) {
	const transformedRecord = transformRecord(record)
	await sendRecordToSecondApi(transformedRecord)
}

const allRecords = generateAllRecords()
await pMap (allRecords, processRecord)

Either of these methods are a great improvement on the “worst code”, but if we left off here we’d just be stuck with the results of the “bad code,” which totally misses 4,000 orders!

When an integration flow is truly dynamic we assume it will not always be able to process all the records we fetch from the first API. That means we need to make sure the next run has a way to know which records have already been successfully processed. That way the next run will only attempt to fetch and work through unprocessed records; if the last run only gets half way through its work, the next run will be able to pick up where the last one left off.

Here are two ways to do this:

A.) After the second API has successfully received each transformed record, update the original record in the first API to indicate it has been synced to the second API. Then adjust `generateAllRecords` or `fetchRecordsPage` in either of the code sketches above to use a filter so that it only fetches unprocessed records from the first API. 


The `processRecord` function would also need to invoke a new function:

const processRecord =  async(record) {
	const transformedRecord = transformRecord(record)
	const response = await 
sendRecordToSecondApi(transformedRecord)
	await markRecordProcessedInFirstApi(record)
}

When this method is available, it is quite reliable, but there are several reasons it isn’t always an option:

It depends on the first API having a field that can be used to track whether a record has been processed.  

It requires the first API to provide a means of filtering the record fetch based on the value of that field.  

If efficiency is a driving factor, this may not be the best strategy because it will make the integration flow slower; processing each record with this method requires a call to both the first and second API, rather than just the second.

B.) Passing state (which we call context) from one run to another. At Pandium the last string console logged in a given run will be saved as the standard out for that integration instance. The next time the integration is run for a tenant the standard out from the last run can be accessed as an environmental variable.

Here are the key steps to use context to make an integration flow dynamic:

Always process records in an ascending order. Created date or id often work well to determine record order.

Use a `timeoutRecord` object to keep track of which records you have gotten to and which ones you have not gotten to.  

At the end of a run use an `exitHandler` function to console log that information. Do this regardless of whether the run is timing out or whether all records were processed.

At the start of the next run consult the context provided by the last run. Only fetch records that have not been processed.

Here’s a rough sketch of how these four steps can be used with a record’s id field to make an integration flow dynamic.

// Step iii: Use an exitHandler function to log the timeoutRecord at the 
end of the run.  This will give the next run the highest record ID 
processed in this run.
const exitHandler = (timeoutRecord) {
	console.log(timeoutRecord)
}

const syncRecords = async () => {
	// Step ii: Use a timeoutRecord object to keep track of the highest record ID   
which was processed in this run.
	const timeoutRecord = {
		idStart = 0
	 }
     
 // Step iii- The exitHandler will print the timeoutRecord to the 
 standardOut even if the run times out before processing all records. 
 Since a run will time out at 10 minutes, we will use setTimeout 
 to trigger a graceful exit at 9 minutes.
setTimeout(() => exitHandler(timeoutRecord), 90 * 60 * 1000)

// Step iv: Consult the context provided by the last run to determine 
the highest record ID which it processed. 
	timeoutRecord.idStart = 
process.env.PAN_CTX_LAST_SUCCESSFUL_RUN_STD_OUT.?idStart?? 0
	const processRecord =  async(record) {
		const transformedRecord = transformRecord(record)
		const response = await sendRecordToSecondApi(transformedRecord)
        
// Step ii: Always keep track of the record ID most recently processed. 
So long as you follow step i and process the records in the order 
of ascending id this will always be the highest record ID processed.
	timeoutRecord.idStart = record.id
}

	let page = 1
    while (true)  {
    
	// Step i: The code for fetchRecordsPage is not shown here, but 
    we are assuming its results are in ascending order by record id.
    Many API endpoints will accept a parameter to sort the results.
	// Step iv: fetchRecordsPage now takes the idStart as a parameter so 
    that it only fetches records with an ID higher than the highest ID 
    processed in the last run.
    const recordsBatch = await fetchRecordsPage(page, timeoutRecord.idStart)

	if  (recordsBatch.length === 0) break

	for (record of recordsBatch) {
    	await processRecord(record)
	}
    page ++
	}
    
	// Step iii- the exitHandler will print the timeoutRecord to the standardOut 
    when we have finished processing all records without timing out.
	exitHandler(timeoutRecord)
}

syncRecords().then(
	() => {},
    () => {
    	process.exitCode = 1
	}
)

That’s how you make sure that all 5,000 orders get imported from OrderPlacementPavillion to FulfillmentsRUs even when your integration can only process 1,000 orders in each 10 minute run.

Latest

From the Blog

Check out our latest content on technology partnerships, integration and APIs. Access research, resources, and advice from industry experts.

A Guide to Integrating with Klaviyo's API

Get expert insights on integrating with the Klaviyo API. Understand authentication, customizations and triggers.

How to Write an Integration Specification Document [Template Included]

A clear integration specification document aligns teams, ensures smooth data flow, and delivers successful integrations. This blog breaks down what to include to keep your project on track.