Saturday, December 3, 2022

Processing large data files with Python multithreading

 


We spend a lot of time waiting for some data preparation task to finish —the destiny of data scientists, you would say. Well, we can speed things up. Here are two techniques that will come handy: memory mapped files and multithreading.

The data

I had recently to extract terms and term frequencies from the Google Books Ngram corpus and found myself wondering if there are ways to speed up the task. The corpus consists of twenty-six files totalling 24GB of data. Each of the files I was interested in contains a term and other meta data, tab separated. The brute force approach of reading these files as pandas data frames was … slow. Since we wanted only the unique terms and their match counts, I thought I would try to make it faster :-)

Memory mapped files

This technique is not new. It has been around for a long time and originated in Unix (before Linux!). Briefly, mmap bypasses the usual I/O buffering by loading the contents of a file into pages of memory. This works very well for computers with large memory footprints. That’s mostly OK with today’s desktops and laptops where having 32GB of memory is not anymore in the esoteric department. The Python library mimics most of the Unix functionality and offers a handy readline() function to extract the bytes one line at a time.

# map the entire file into memory
mm = mmap.mmap(fp.fileno(), 0)
# iterate over the block, until next newline
for line in iter(mm.readline, b""):
# convert the bytes to a utf-8 string and split the fields
term = line.decode("utf-8").split("\t")

The fp is a file-pointer that was previously opened with the r+b access attribute. There you go, with this simple tweak you have made file reading twice as fast (well, the exact improvement will depend on a lot of things such as disk HW, etc).

Multithreading

The next technique that always helps in making things faster is adding parallelism. In our case, the task was I/O bound. That is a good fit for scaling-up —i.e. adding threads. You will find good discussions on when it is better to scale-out (multi-processing) on search engines.

Python3 has a great standard library for managing a pool of threads and dynamically assign tasks to them. All with an incredibly simple API.

# use as many threads as possible, default: os.cpu_count()+4
with ThreadPoolExecutor() as threads:
t_res = threads.map(process_file, files)

The default value of max_workers for ThreadPoolExecutor is 5 threads per CPU core (as of Python v3.8). The map()API will receive a function to be applied to each member of a list and will run the function automatically when threads become available. Wow. That simple. In less than fifty minutes I had converted the 24GB input into a handy 75MB dataset to be analysed with pandas—voilà.

The complete code is on GitHub. Comments and remarks are always welcome.

PS: I added a progress bar with tqdm for each thread. I really don’t know how they manage to avoid scrambling of the lines on the screen … It works like a charm.

UPDATE: Two years later, this came up :-)

Thursday, December 1, 2022

Multi threaded Design pattern - part 1

code details:

1. Read file.

2. create threads to processs each line.

3. store results after procssing. Here it will get some information from youtube and then

    also use genderize api to check user demographic details.

5. Reading of lines causes creation of new thread. and once all N number of thread created

     it wait for all to ,complete and store data. 

6. Then it will start creating new set of threads of next read operation.

7. Is is good mix of async and sync program to parallal programming. 


namespace Recon

{

    class Program

    {

        static HttpClient client = new HttpClient();

        static StreamWriter writer = new StreamWriter("c:\\temp\\result\\artFluck.txt");

        static void Main(string[] args)

        {

            int NUMBER_OF_THREADS = 30;

            string[] delimit = { "group" };

            foreach (string fileName in Directory.GetFiles("c:\\temp\\", "*.*"))

            {

                string[] fileLines = File.ReadAllLines(fileName);

                int ThreadNumber = 1;

                List<Task> tasksToWait = new List<Task>();

                foreach (string line in fileLines)

                {

                    var name = line.Replace("https://www.youtube.com/", "");

                    if (delimit.Any(s => name.Contains(s))) continue;

                    var extractName = name.Split('.');

                    if (extractName.Length > 0)

                    {

                        if (ThreadNumber <= NUMBER_OF_THREADS)

                        {

                            ThreadNumber++;

                            Task task = new Task(() =>

                            {

                                CreateTask(extractName[0], line);

                            });

                            task.Start();

                            tasksToWait.Add(task);

                        }

                        else

                        {

                            Task.WaitAll(tasksToWait.ToArray());

                            tasksToWait.Clear();

                            ThreadNumber = 1;

                        }

                    }

                }

            }

        }

        private static void CreateTask(string name, string line)

        {

            var response = GetData(name);

            if (response.Result == true)

            {

                //Multiple threads wirte to same file. so need thread safe design.

                try

                {

                    writer.WriteLineAsync(line);

                }

                catch (Exception ex)

                {

                    Console.Write(line);

                }

            }

        }

        private static async Task<bool> GetData(string name)

        {

            HttpResponseMessage response =

                await client.GetAsync("https://api.genderize.io/?name=" + name 

                                    

            var content = response.Content.ReadAsStringAsync();

            var value = content.Result.ToString();


            if (value.Contains("female")) return true;

            else return false;

        }

    }

}


Friday, July 15, 2022

A look at some of Azure Container App’s new features

 

A while ago, I created a YouTube playlist about Azure Container Apps. The videos were based on the first public preview. At the time, several features were missing or still needed to be improved (as expected with a preview release):

  • An easy way to create a container app, similar to az webapp up
  • Managed Identity support (system and user assigned)
  • Authentication support with identity providers like Microsoft, Google, Twitter
  • An easy way to follow the logs of a container from your terminal (instead of using Log Analytics queries)
  • Getting a shell to your container for troubleshooting purposes

Let’s take a look at some of these features.

az containerapp up

To manage Container Apps, you can use the containerapp Azure CLI extension. Add it with the following command:

az extension add --name containerapp --upgrade

One of the commands of this extension is up. It lets you create a container app from local source files or from GitHub. With your sources in the current folder, the simplest form of this command is:

az containerapp up --name YOURAPPNAME --source .

The command above creates the following resources:

  • a resource group: mine was called geert_baeke_rg_3837
  • a Log Analytics workspace
  • a Container Apps environment: its name is YOURAPPNAME-env
  • an Azure Container Registry: used to build the container image from a Dockerfile in your source folder
  • the container app: its name is YOURAPPNAME

The great thing here is that you do not need Docker on your local machine for this to work. Building and pushing the container image is done by an ACR task. You only need a Dockerfile in your source folder.

When you change your source code, simply run the same command to deploy your changes. A new image build and push will be started by ACR and a revision of your container app will be published.

⚠️TIP: by default, the container app does not enable ingress from the Internet. To do so, include an EXPOSE command in your Dockerfile.

If you want to try az containerapp up, you can use my super-api sample from GitHub: https://github.com/gbaeke/super-api

Use the following commands to clone the source code and create the container app:

1
2
3
git clone https://github.com/gbaeke/super-api.git
cd super-api
az containerapp up --name super-api --source . --ingress external --target-port 8080

Above, we added the –ingress and –target-port parameters to enable ingress. You will get a URL like https://super-api.livelyplant-fa0ceet5.eastus.azurecontainerapps.io to access the app. In your browser, you will just get: Hello from Super API. If you want a different message, you can run this command:

1
az containerapp up --name super-api --source . --ingress external --target-port 8080 --env-vars WELCOME=YOURMESSAGE

Running the above command will result in a new revision. Use az containerapp revision list -n super-api -g RESOURCEGROUP -o table to see the revisions of your container app.

There is much more you can do with az containerapp up:

  • Deploy directly from a container image in a registry (with the option to supply registry authentication if the registry is private)
  • Deploy to an existing container app environment
  • Deploy to an existing resource group
  • Use a GitHub repo instead of local sources which uses a workflow to deploy changes as you push them

Managed Identity

You can now easily enable managed identity on a container app. Both System assigned and User assigned are supported. Below, system assigned managed identity was enabled on super-api:

System assigned identity on super-api

Next, I granted the managed identity Reader role on my subscription:

Enabling managed identity is easy enough. In your code, however, you need to obtain a token to do the things you want to do. At a low level, you can use an HTTP call to fetch the token to access a resource like Azure Key Vault. Let’s try that and introduce a new command to get a shell to a container app:

az containerapp exec  -n super-api -g geert_baeke_rg_3837 --command sh

The above command gets a shell to the super-api container. If you want to try this, first modify the Dockerfile and remove the USER command. Otherwise, you are not root and will not be able to install curl. You will also need to use an alpine base image in the second stage instead of scratch (the scratch image does not offer a shell).

In the shell, run the following commands:

1
2
3
apk add curl
curl -H "X-IDENTITY-HEADER: $IDENTITY_HEADER" \
  "$IDENTITY_ENDPOINT?resource=https://vault.azure.net&api-version=2019-08-01"

The response to the above curl command will include an access token for the Azure Key Vault resource.

A container app with managed identity has several environment variables:

  • IDENTITY_ENDPOINT: http://localhost:42356/msi/token (the endpoint to request the token from)
  • IDENTITY_HEADER: used to protect against server-side request forgery (SSRF) attacks

Instead of using these values to create raw HTTP requests, you can use SDK’s instead. The documentation provides information for .NET, JavaScript, Python, Java, and PowerShell. To try something different, I used the Azure SDK for Go. Here’s a code snippet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *Server) authHandler(w http.ResponseWriter, r *http.Request) {
    // parse subscription id from request
    subscriptionId := r.URL.Query().Get("subscriptionId")
    if subscriptionId == "" {
        s.logger.Infow("Failed to get subscriptionId from request")
        w.WriteHeader(http.StatusBadRequest)
        return
    }
 
    client := resources.NewGroupsClient(subscriptionId)
    authorizer, err := auth.NewAuthorizerFromEnvironment()
    if err != nil {
        s.logger.Error("Error: ", zap.Error(err))
        return
    }
    client.Authorizer = authorizer

Although the NewAuthorizerFromEnvironment() call above supports managed identity, it seems it does not support the endpoint used in Container Apps and Azure Web App. The code above works fine on a virtual machine and even pod identity (v1) on AKS.

We can use another feature of az containerapp to check the logs:

az containerapp logs show -n super-api -g geert_baeke_rg_3837 --follow

"TimeStamp":"2022-05-05T10:49:59.83885","Log":"Connected to Logstream. Revision: super-api--0yp202c, Replica: super-api--0yp202c-64746cc57b-pf8xh, Container: super-api"}
{"TimeStamp":"2022-05-04T22:02:10.4278442+00:00","Log":"to super api"}
{"TimeStamp":"2022-05-04T22:02:10.427863+00:00","Log":""}
{"TimeStamp":"2022-05-04T22:02:10.4279478+00:00","Log":"read config error Config File "config" Not Found in "[/config]""}
{"TimeStamp":"2022-05-04T22:02:10.4280241+00:00","Log":"logger"}"}
{"TimeStamp":"2022-05-04T22:02:10.4282641+00:00","Log":"client initializing for: 127.0.0.1:50001"}
{"TimeStamp":"2022-05-04T22:02:10.4282792+00:00","Log":"values","welcome":"Hello from Super API","port":8080,"log":false,"timeout":15}"}
...

When I try to execute the code that’s supposed to get the token, I get the following error:

{"TimeStamp":"2022-05-05T10:51:58.9469835+00:00","Log":"{error 26 0  MSI not available}","stacktrace":"..."}

As always, it is easy to enable managed identity but tricky to do from code (sometimes 😉). With the new feature that lets you easily grab the logs, it is simpler to check the errors you get back at runtime. Using Log Analytics queries was just not intuitive.

Conclusion

The az container up command makes it extremely simple to deploy a container app from your local machine or GitHub. It greatly enhances the inner loop experience before you start deploying your app to other environments.

The tooling now makes it easy to exec into containers and troubleshoot. Checking runtime errors from logs is now much easier as well.

Managed Identity is something we all were looking forward to. As always, it is easy to implement but do check if the SDKs you use support it. When all else fails, you can always use HTTP! 😉

How Netflix Scales its API with GraphQL Federation (Part 1)

  Netflix is known for its loosely coupled and highly scalable microservice architecture. Independent services allow for evolving at differe...