Saturday, February 1, 2025

Hello World Plus - Part 3 Async , Batch, Threads and Serverless infrastructure.

 In this articles we will go though Async programming and request response from third party api and aggregate data and send it back to service. Here is one example where we get response from api and this will be executed around 200 times per one client request. So it is important to see how it is possible to achieve response for this client request with in 5 sec. time.  

One way is to span 200 threads and get response for this that will be quick. Let us see how many other options which could be memory optimized and time optimized and how these kind of batch processing can be hosted  using latest available technologies. 

 private async Task<List<APIResponse>> GetData(List<string> names)
 {
     List<APIResponse> data = new List<APIResponse>();

     try
     {
         string param = "";
         foreach (string name in names)
         {
      
         HttpResponseMessage response =
             await client.GetAsync("https://api.genderize.io?" + param +
                                   "&apikey=<APIKEY>");
         var content = await response.Content.ReadAsStringAsync();
         dynamic viewdata = JsonConvert.DeserializeObject<dynamic>(content);
         foreach (var d in viewdata)
         {
             var value = new APIResponse();
             value.name = d.name;
             value.gender = d.gender;
             data.Add(value);
         }
         //var returnValue = JsonConvert.DeserializeObject<APIResponse>(content);
     }
    
     return data;
 }

Few important design principles  for above problem could be 

 

1. Above function should be executed in parallel and should not be an blocking synchronous call,.Async should be used . 

2. \Number of threads as given about are 200,  it is not good design to loop though N number of times and create N number of threads. Most of the OS will have limitation on spawning resource either it is threads or file handlers. So above parallel execution should be divided into batches and combine the result of each batch and return response. 

3. The complete processing can be done on a independent infra like serverless apps or lamda as it is not using any data from the application. which can help to scale up the corresponding hardware if required. 

 Design Patterns


Worker pool/Job queue Pattern: The worker pool pattern is simple and most widely used concurrency pattern for distributing multiple jobs or patterns to multiple workers.



In above image jobs are stored in a data structure say Job queue and pool of worker threads which will get job based on scheduler. If we can access multiple cores it is possible to process them parallel like Golang. 

Monitor Pattern: n number of threads waiting on some condition to be true , if the condition is not true those threads need to be in sleep state and pushed to wait queue, and they have to be notified when condition is true.

Double Checked locking : for creating concurrent objects (ex: singleton pattern)

Barrier Pattern: all concurrently executing threads must wait for others to complete and wait at a point called Barrier

Reactor Pattern: In an event driven system, a service handler accepts events  from multiple incoming requests and demultiplexes to respective non blocking handlers.

Let us look at few solutions to execute above function and get response.

 

var queryTask = new List<Task>();

for (int i = 0; i < 150; i++) {

      queryTask.Add(da.ExecuteSPAsync("Async" + i.ToString()));

}

Task.WhenAll(queryTask).Wait();                     

Parallel.For(0, 150, new ParallelOptions { MaxDegreeOfParallelism = 5 },

              x => da.ExecuteSP("PPWith5Threads" + x.ToString())); 

 Here is code samples to use parallel programming using c# supported library.  Threads vs Tasks | C# Online Compiler | .NET Fiddle  


Here is basic solution where it can create  thread and there is mechanism in c# to control number of threads at a time can be created. which is fair enough and we can fine tune maxdegreeofparallelism according to resource and response time required.  This  concept is thread pooling and available in spring batch settings and other programming language as well. 

Here is one configuration used in a spring batch job

·            core-pool-size: 20   max-pool-size: 20

·            throttle-limit: 10

 Here is example from Phython for doing similar task i.e  make multiple requests simultaneously, use asyncio.gather: 

async def fetch_multiple():

    urls = [

        "https://api.github.com/users/github",

        "https://api.github.com/users/python",

        "https://api.github.com/users/django"

    ]

    async with aiohttp.ClientSession() as session:

        tasks = []

        for url in urls:

            tasks.append(asyncio.create_task(fetch_data(url)))

        results = await asyncio.gather(*tasks)

        return results

How to Measure improvement in processing 

Tasks and the event loop

Consider this example: Grandmaster Judith Polgar is at a chess convention. She plays against 24 amateur chess players. To make a move it takes her 5 seconds. The opponents need 55 seconds to make their move. A game ends at roughly 60 moves or 30 moves from each side. (Source: https://github.com/fillwithjoy1/async_io_for_beginners)

Synchronous version

import time

def play_game(player_name):
    for move in range(30):
        time.sleep(5) # the champion takes 5 seconds to make a move
        print(f"{player_name} made move {move+1}")
        time.sleep(55) # the opponent takes 55 seconds to make a move

if __name__ == "__main__":
    players = ['Judith'] + [f'Amateur{i+1}' for i in range(24)]
    for player in players:
        play_game(player)

Asynchronous version

import asyncio

async def play_game(player_name):
    for move in range(30):
        await asyncio.sleep(5) # the champion takes 5 seconds to make a move
        print(f"{player_name} made move {move+1}")
        await asyncio.sleep(55) # the opponent takes 55 seconds to make a move

async def play_all_games(players):
    tasks = [`asyncio.create_task`(play_game(player)) for player in players]
    await `asyncio.gather`(*tasks)

if __name__ == "__main__":
    players = ['Judith'] + [f'Amateur{i+1}' for i in range(24)]
    asyncio.run(play_all_games(players))

In the synchronous version, the program will run sequentially, playing one game after another. Therefore, it will take a total of 24 * 60 * 60 = 86,400 seconds (or 1 day) to complete all the games.

In the asynchronous version, the program will run concurrently, allowing multiple games to be played at the same time. Therefore, it will take approximately 60 * 5 = 300 seconds (or 5 minutes) to complete all the games, assuming that there are enough resources available to handle all the concurrent games.

 


No comments:

Post a Comment

Hello World Plus - Part 3 Async , Batch, Threads and Serverless infrastructure.

 In this articles we will go though Async programming and request response from third party api and aggregate data and send it back to servi...