In this articles we
will go though Async programming and request response from third party api and
aggregate data and send it back to service. Here is one example where we get
response from api and this will be executed around 200 times per one client
request. So it is important to see how it is possible to achieve response for
this client request with in 5 sec. time.
One way is to span 200
threads and get response for this that will be quick. Let us see how many other
options which could be memory optimized and time optimized and how these kind
of batch processing can be hosted using latest available technologies.
private async
Task<List<APIResponse>> GetData(List<string> names)
{
List<APIResponse> data = new
List<APIResponse>();
try
{
string param = "";
foreach (string name in names)
{
HttpResponseMessage response =
await
client.GetAsync("https://api.genderize.io?" + param +
"&apikey=<APIKEY>");
var content = await
response.Content.ReadAsStringAsync();
dynamic viewdata = JsonConvert.DeserializeObject<dynamic>(content);
foreach (var d in viewdata)
{
var value = new APIResponse();
value.name = d.name;
value.gender = d.gender;
data.Add(value);
}
//var returnValue =
JsonConvert.DeserializeObject<APIResponse>(content);
}
return data;
}
Few important design principles for above problem could be
1. Above function should be
executed in parallel and should not be an blocking synchronous call,.Async
should be used .
2. \Number of threads as
given about are 200, it is not good design to loop though N number of
times and create N number of threads. Most of the OS will have limitation on
spawning resource either it is threads or file handlers. So above parallel
execution should be divided into batches and combine the result of each batch
and return response.
3. The complete processing
can be done on a independent infra like serverless apps or lamda as it is not
using any data from the application. which can help to scale up the
corresponding hardware if required.
Design Patterns
Worker pool/Job queue Pattern: The worker pool pattern is simple and most widely used concurrency pattern for distributing multiple jobs or patterns to multiple workers.
In above image jobs are stored in a data structure say Job queue and pool of worker threads which will get job based on scheduler. If we can access multiple cores it is possible to process them parallel like Golang.
Monitor
Pattern: n number
of threads waiting on some condition to be true , if the condition is not true
those threads need to be in sleep state and pushed to wait queue, and they have
to be notified when condition is true.
Double
Checked locking : for creating concurrent objects
(ex: singleton pattern)
Barrier
Pattern: all concurrently
executing threads must wait for others to complete and wait at a point called
Barrier
Reactor Pattern: In an event driven
system, a service handler accepts events
from multiple incoming requests and demultiplexes to respective non
blocking handlers.
Let us look at
few solutions to execute above function and get response.
var
queryTask = new List<Task>();
for (int i = 0; i < 150; i++) {
queryTask.Add(da.ExecuteSPAsync("Async" +
i.ToString()));
}
Task.WhenAll(queryTask).Wait();
Parallel.For(0,
150, new ParallelOptions { MaxDegreeOfParallelism = 5 },
x =>
da.ExecuteSP("PPWith5Threads" + x.ToString()));
Here is code samples to use parallel
programming using c# supported library. Threads vs Tasks
| C# Online Compiler | .NET Fiddle
Here is basic solution where it can create thread and there is mechanism
in c# to control number of threads at a time can be created. which is fair
enough and we can fine tune maxdegreeofparallelism according to resource and
response time required. This concept is thread pooling and
available in spring batch settings and other programming language as
well.
Here is one
configuration used in a spring batch job
· core-pool-size:
20 max-pool-size: 20
· throttle-limit:
10
Here is
example from Phython for doing similar task i.e make multiple requests
simultaneously, use asyncio.gather:
async def fetch_multiple():
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/python",
"https://api.github.com/users/django"
]
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(asyncio.create_task(fetch_data(url)))
results = await asyncio.gather(*tasks)
return results
How to Measure improvement in processing
Tasks and the event loop
Consider this example:
Grandmaster Judith Polgar is at a chess convention. She plays against 24
amateur chess players. To make a move it takes her 5 seconds. The opponents
need 55 seconds to make their move. A game ends at roughly 60 moves or 30 moves
from each side. (Source: https://github.com/fillwithjoy1/async_io_for_beginners)
Synchronous version
import
time
def play_game(player_name):
for move in range(30):
time.sleep(5) # the champion
takes 5 seconds to make a move
print(f"{player_name} made
move {move+1}")
time.sleep(55) # the opponent
takes 55 seconds to make a move
if __name__ == "__main__":
players = ['Judith'] + [f'Amateur{i+1}'
for i in range(24)]
for player in players:
play_game(player)
Asynchronous version
import
asyncio
async def play_game(player_name):
for move in range(30):
await asyncio.sleep(5) # the
champion takes 5 seconds to make a move
print(f"{player_name} made
move {move+1}")
await asyncio.sleep(55) # the
opponent takes 55 seconds to make a move
async def play_all_games(players):
tasks =
[`asyncio.create_task`(play_game(player)) for player in players]
await `asyncio.gather`(*tasks)
if __name__ == "__main__":
players = ['Judith'] + [f'Amateur{i+1}'
for i in range(24)]
asyncio.run(play_all_games(players))
In the synchronous version,
the program will run sequentially, playing one game after another. Therefore,
it will take a total of 24 * 60 * 60 = 86,400 seconds
(or 1 day) to complete all the games.
In the asynchronous version,
the program will run concurrently, allowing multiple games to be played at the
same time. Therefore, it will take approximately 60 * 5 = 300 seconds (or 5 minutes) to complete all the games, assuming that there are
enough resources available to handle all the concurrent games.