File Processing Using Concurrency With GoLang | by Yair Fernando | May, 2022

Master concurrency in Go

GoLang has incredible support for concurrent programs and in this article, we’ll see how we can optimize a program that processes a CSV file to send SMS notifications to its users.

If you’re new to using GoLang and want to get a better understanding of how concurrency works, I’d recommend reading this first article: Concurrency in GoLang, Goroutines, and Channels Explained.

For this writing, we’ll be using a CSV file and the purpose of this program is to read the file and process its data. The file content represents a list of 3,000 users.

The program should read this file and send a notification to each user and their friends as well.

You can find the source code in this Github repository for reference.

All right, let’s get started. We’ll first create a Go module, the main.go file, and a package called CSV. This package will have a method called ProcessFile which will be called from the main package. Let’s create the Go module first.

go mod init github.com/GithubHandle/fileProcessing

The initial folder structure looks as follows:

Folder Structure

As you can see there is also a students.csv file, you can grab this file from the repository. The main.goand the csv.go files look like this:

package mainimport "github.com/YairFernando67/fileProcessing/csv"func main() {
csv.ProcessFile()
}
package csvfunc ProcessFile() {}

Now, let’s open the file and read its content:

In the above code, we’re first opening the csv file using the os package. Then the file is passed to another function called scanFile. This function uses the bufio package to initialize a new scanner and scan each line of the file. For each line, we extract the information about the user and append it to a slice of users. Once the scan finishes, the function returns the slice of users to the caller.

The user struct is defined in csv/user.goand it looks like this.

package csvtype User struct {
Id, Name, LastName, Email, Phone string
FriendIds []string
}

If you run the program, you’ll see the slice of users being printed out to the console.

Sweet, we now have the content of the file represented as a slice of users, we can now use this slice to send an SMS notification to each of these users and their friends.

For that, we’ll first see how to do this sequentially, without using concurrency. Then we’ll modify the program to make it faster.

For the sequential processing, we have created a function and passed the users into it. This function ranges over the users and for each user it checks if it has already been visited before, if it has not been visited, it marks the user as visited and sends the SMS notification. Then it also ranges over the user’s friend ids, find each user, and performs the same steps, checking if they have been visited or not, marking them as visited, and sending the notification.

In the sendSmsNotification function, we are using the time.Sleep function to simulate some latency in sending the notification.

Let’s run a benchmark test on this version of the program and see how fast it is. Here’s the code:

Let’s run go test -bench=. github.com/YairFernando67/fileProcessing/csv -benchtime=5x

Sequential processing benchmark

It took 199.723seconds to process all users. Let’s see how we can improve the performance of the program using concurrency. For that, we’ll add another method to process the users concurrently.

For this concurrent implementation, we have created two channels, usersCh will hold the initial list of users and unvisitedUsers will hold individual unvisited users.

In line 35, we are feeding the first channel with the initial list of users that we got as a parameter into this function. This runs in a separate goroutine, because we don’t want the main goroutine to be blocked, this is a concept that I talked about in this article, you can go check it out if you’re still confused about the blocking concepts.

Then we call initializeWorkers. This function essentially initializes N number of goroutines determined by the constant MAX_GOROUTINESin this case, we’ll start with 10. Each worker is a function that listens on the unvisitedUsers channel and for each user that it receives, it sends the SMS notification to the user and also processes its friend ids by finding each user in the list and then sending the users to the usersCh channel. The processing of the user’s friend ids is run in a separate goroutine since we don’t want to block the current goroutine here as well.

This function will allow the program to have 10 goroutines waiting for users to be sent to the unvisitedUsers channel. And each of these goroutines will run concurrently, thus improving the performance of the program.

Another important thing to note here is that in line 57 we are using this syntax to check if the usersCh channel is still open, this prevents us from sending data to a closed channel. In this example, this is important since we’ll be closing the channel and we don’t want other goroutines to attempt to send data to this channel.

Then in line 37, we have the processUsers function which ranges over the usersCh channel, and for each list of users that it receives, it checks if the user has already been visited, if not then it marks it as visited and sends the user to the unvisitedUsers channel. This function also keeps track of how many users have processed with the count variable, by doing this, we can check if we have reached the size in line 75 and close the usersCh channel which will terminate the program.

Let’s run the benchmark test for this version of the program and see how much it improved.

Concurrent processing benchmark

As you can see, it only took 19.936 seconds to finish!! This is a huge performance improvement. We can also control how many active goroutines/workers we want the program to have by increasing or decreasing the MAX_GOROUTINES constant.

If we increase the number of workers, our program will run faster since more tasks will be running concurrently.

This is a good example of how to improve the performance of a program using channels and goroutines. Channels are a great way of communication between goroutines, and goroutines allow you to run code concurrently.

Leave a Comment