Concurrency With Select, Goroutines, and Channels | by Yair Fernando | Jun, 2022

Master concurrency in Go

Image by author

In this article, we are going to talk about how to build concurrent programs combining select, goroutines, and channels in Golang.

I’d recommend reading these two articles first to get familiar with the concepts of concurrency, channels, and goroutines.

From the Go tour documentation:

“The select statement lets a goroutine wait on multiple communication operations.

A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.”

We’re going to investigate how we can use select to take the response from the quickest API call. Let’s dive right into some code to understand select and its powerful features.

The above implementation focus on highlighting how a select will wait until one of its cases run.

Different parts are important to understand in this example, so let’s look at them one by one.

Before we look at the select logic, let’s examine how the API calls are made.

The Function struct represents a single API call, its attributes are a function f which takes a channel of type Newsnotice how the signature of this function already enforces that the channel will be treated as a send-only channel, the second attribute is a channel of type Newsonce the API call is executed, and the response is parsed, this channel will be used to send the results.

The Newsstruct is the object to hold the articles and from which source they come.

In line 43, we initialize a slice of FunctionOld with two elements, the first one has the googleNews function and uses the google channel, and the second one uses the freeNews function and uses the free channel.

Since both API calls will fetch news, the channels are of the same type, but one for each function.

In lines 69 and 102, we have the implementations of these two APIs. Each one makes an HTTP request to their respective URLs and parses the response, once that is done, the news is sent through their respective channels.

Let’s now focus on quickestApiResponse method. The purpose of this method is to set the article variable to the response from the fastest API. In line 54, each function is executed by calling the Run method. This method starts a new goroutine on the function and passes the channel. It is important to note that these API calls need to run in a separate goroutine because we don’t want to run them sequentially.

Then, the select will wait for either the google or free channel to send a response. Once any of the API calls send the response through their respective channel, the select will execute the code under that case and ignore the other one. This will effectively set the articles to the response from the fastest API call.

Let’s run the program to see the output:

API server response output

The FreeNewsApi ran faster!.

This logic can be applied to many other use cases, allowing the program to run multiple goroutines, use channels to communicate, and use select to wait for them.

One more thing we can implement in this example is to enforce some sort of time out, if the API calls take more than the limit we’ll just leave the articles empty. The code below achieves this by adding one more case to the select.

The time.After returns a channel of type time.Time, and it will send the current time once the time specified has passed. Notice how here we are not assigning the value of this channel to a variable, this is because we don’t care about the data that the channel will send, we only care about receiving the signal. If we sleep for three seconds on both APIs, we’ll see the time-out case being executed and the other two cases are ignored.

API server response timeout

Let’s see how we can use selectto run a recurring process. For this program, we’ll have the following scenario:

The program needs to let us pass any function as the recurring process when that process should start running and an interval time between each run.

Below we have the initial code, let’s take a look:

The above code reflects the task that we want to run. We have two main functions collectNewUsersNotifications and handlePendingUsersNotifications. The first one is meant to collect all new user notifications, the ideal implementation would be that this function looks for unread notifications in a Database, but for the sake of this example, we are simulating getting random notifications for certain users.

The notifications are created using the Notification struct with only two fields one for the content and one for the user id.

The collect function uses the PendingUserNotifications type to store the notifications. This type is a map where the key is an integer representing the user id and the value is a slice of Notification.

After we collect all the notifications, we want to use handlePendingUserNotifications function to iterate over the notifications and run a handler function on each of them. After we process each user’s notifications, they are deleted from the map. The handler that we will use in this case is the sendUserBatchNotificationsEmail. Its purpose is to send an email to the user with all pending notifications so that they can take a look.

Let us focus now on how to run this task in a recurring fashion using select. As I previously mentioned we have to consider the following:

  • Allow passing an interval time
  • Allow passing the start time of the process
  • Allow the caller to cancel/stop the recurring process when they want

The below code shows how to achieve this:

We introduced a new struct to represent a recurring process RecurringProcess. This struct contains the following fields:

  • name — The name of the process
  • interval — The interval time between each run
  • startTime — The time when the process will start
  • handler — A handler function to call on each run
  • stop — A channel to stop the process

In pendingNotificationsProcess function, we initialize a new recurring process and the notifications in lines 30 and 31, respectively. The handler function that we will use is a function that has both collectNewUsersNotifications and handlePendingUsersNotifications functions inside. Notice here that we are passing the process to the handlePendingUsersNotifications because it will be needed to stop the process.

We also specified the interval and start time.

Then we call createRecurringProcess, this function creates the recurring process and starts it as well. Let’s focus on line 88, where we are using a goroutine to start the process.

In line 40, we block the main goroutine by reading from the stop channel, which means that the main goroutine will be blocked until a message is sent to this channel.

Let’s take a look at the Start function in line 93 which contains all the logic to run the recurring process.

This function uses the startTicker variable to start the recurring process using the start time. If the start time is in the past the process will start immediately.

The time.NewTimer will send the current time on its channel when the duration specified has passed, and this will allow us to start the process. This is why we have the first case of the select waiting for the channel to receive the signal.

We also have in line 95 a ticker variable which is a time.Ticker. A ticker in go will send ticks on its channel at the interval specified. Once the startTicker.C channel sends the signal, we assign a new ticker with the interval to the ticker variable in line 106 and also we call the handler function.

After this, the ticker will start receiving ticks on the second select case, and each time it receives one, the handler function will be called as well.

In the last case of the select, we wait until a signal is sent to stop the process by just returning.

Notice how the select is inside infinite for loop. This is because we want to keep looping until one of the cases explicitly breaks the loop. Each time we receive a tick, the second case will be executed and then it will enter in the same loop again where the select will again wait for some of its cases to run.

To stop the process we added some logic in line 55, we count the number of notifications and if there were no pending notifications at any point, the program cancels the process. The Cancel function closes the stop channel and this will make the program finish.

Let’s run the program to see how it works:

Program output

Great, the program works as expected. This is just an example of how to run a recurring process. This can be the base code to implement something more complex. You can build complex programs with select.

Building concurrent programs can be challenging at first, especially if you struggle to understand how goroutines, channels, and select work.

I hope with this article, you feel less confused, and you have found some use cases where you can use select.

Leave a Comment