6 minute read

Why I needed to throttle the number of Tasks running simultaneously

In the past few months I have come across the scenario where I wanted to run a whole bunch of Tasks (potentially thousands), but didn’t necessarily want to run all (or even a lot) of them in parallel at the same time. In my scenarios I was using the Tasks to make web requests; not CPU-heavy work, otherwise I would have opted for using Parallel.Foreach.

The first time I encountered this problem, it was because my application would be running on the cheapest VM that I could get from AWS; this meant a server with 1 slow CPU and less than 1GB of RAM.  Telling that server to spin up 100 threads simultaneously likely would not end very well. I realize that the OS determines how many threads to run at a time, so likely not all 100 threads would run concurrently, but having the ability to specify a lower maximum than the OS would use gives us more control over bulkheading our application to make sure it plays nice and does not consume too many server resources.

The second time, I needed to request information from one of our company’s own web services. The web service used pagination for retrieving a list of user. There was no endpoint that would give me all users in one shot; instead I had to request the users on page 1, page 2, page 3, etc. until I reached the last page. In this case, my concern was around DOSing (Denial of Service) our own web service. If I created 500 web request Tasks to retrieve the users from 500 pages and made all of the requests simultaneously, I risked putting a lot of stress on my web service, and potentially on my network as well.

In both of these cases I was looking for a solution that would still complete all of the Tasks I created, but would allow me to specify that a maximum of, say 5, should run at the same time.

What the code to run a bunch of Tasks typically looks like

Let’s say you have a function that you want to run a whole bunch of times concurrently in separate Tasks:

public void DoSomething(int taskNumber)
{
    Thread.Sleep(TimeSpan.FromSeconds(1));
    Console.WriteLine("Task Number: " + taskNumber);
}

Here is how you typically might start up 100 Tasks to do something:

public void DoSomethingALotWithTasks()
{
    var listOfTasks = new List<Task>();
    for (int i = 0; i < 100; i++)
    {
        var count = i;
        // Note that we start the Task here too.
        listOfTasks.Add(Task.Run(() => Something(count)));
    }
    Task.WaitAll(listOfTasks.ToArray());
}

What the code to run a bunch of Tasks and throttle how many are ran concurrently looks like

Here is how you would run those same tasks using the throttling function I provide further down, limiting it to running at most 3 Tasks simultaneously.

public void DoSomethingALotWithTasksThrottled()
{
    var listOfTasks = new List<Task>();
    for (int i = 0; i < 100; i++)
    {
        var count = i;
        // Note that we create the Task here, but do not start it.
        listOfTasks.Add(new Task(() => Something(count)));
    }
    Tasks.StartAndWaitAllThrottled(listOfTasks, 3);
}

Gimme the code to limit my concurrent Tasks!

Because I needed this solution in different projects, I created a nice generic, reusable function for it. I’m presenting the functions here, and they can also be found in my own personal open-source utility library here.

/// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxActionsToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxActionsToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
    StartAndWaitAllThrottled(tasksToRun, maxActionsToRunInParallel, -1, cancellationToken);
}

/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxActionsToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxActionsToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
    // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
    var tasks = tasksToRun.ToList();

    using (var throttler = new SemaphoreSlim(maxActionsToRunInParallel))
    {
        var postTaskTasks = new List<Task>();

        // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
        tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

        // Start running each task.
        foreach (var task in tasks)
        {
            // Increment the number of tasks currently running and wait if too many are running.
            throttler.Wait(timeoutInMilliseconds, cancellationToken);

            cancellationToken.ThrowIfCancellationRequested();
            task.Start();
        }

        // Wait for all of the provided tasks to complete.
        // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&amp;amp;#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
        Task.WaitAll(postTaskTasks.ToArray(), cancellationToken);
    }
}

Above I have them defined as static functions on my own Tasks class, but you can define them however you like. Notice that the functions also Start the Tasks, so you should not start them before passing them into these functions, otherwise an exception will be thrown when it tries to restart a Task. The last thing to note is you will need to include the System.Threading and System.Threading.Tasks namespaces.

Here are the async equivalents of the above functions, to make it easy to not block the UI thread while waiting for your tasks to complete:

/// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
    await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
}

/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
    // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
    var tasks = tasksToRun.ToList();

    using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
    {
        var postTaskTasks = new List<Task>();

        // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
        tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

        // Start running each task.
        foreach (var task in tasks)
        {
            // Increment the number of tasks currently running and wait if too many are running.
            await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

            cancellationToken.ThrowIfCancellationRequested();
            task.Start();
        }

        // Wait for all of the provided tasks to complete.
        // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&amp;amp;#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
        await Task.WhenAll(postTaskTasks.ToArray());
    }
}

And if you don’t believe me that this works, you can take a look at this sample project and run the code for yourself.

Update: 2016-04-29

Shortly after publishing this post I discovered Parallel.Invoke, which can also throttle the number of threads, but takes Actions as input instead of Tasks. Here’s an example of how to limit the number of threads using Parallel.Invoke:

public static void DoSomethingALotWithActionsThrottled()
{
    var listOfActions = new List<Action>();
    for (int i = 0; i < 100; i++)
    {
        var count = i;
        // Note that we create the Action here, but do not start it.
        listOfActions.Add(() => DoSomething(count));
    }

    var options = new ParallelOptions {MaxDegreeOfParallelism = 3};
    Parallel.Invoke(options, listOfActions.ToArray());
}

Notice that you define the max number of threads to run simultaneously by using the ParallelOptions classes MaxDegreeOfParallelism property, which also accepts a CancellationToken if needed. This method is nice because it doesn’t require having the additional custom code; it’s all built into .Net. However, it does mean dealing with Actions instead of Tasks, which isn’t a bad thing at all, but you may have a personal preference of which one you prefer to work with. Also, Tasks can offer additional functionality, such as ContinueWith(), and Parallel.Invoke does not provide an asynchronous version, but my functions do. According to this MSDN page, Parallel.Invoke uses Task.WaitAll() under the hood, so they should be equivalent performance-wise, and there shouldn’t be any situations where using one is preferable over the other. This other MSDN page goes into detail about Tasks, and also mentions using Parallel.Invoke near the start.

I hope you find this information useful. Happy coding!

Comments

deadlydog

@Chris Wolf Good eye. I actually do use async/await in my real code. For the example here I chose to use synchronous code simply to keep the code example simpler. To use async/await here you would just have to declare the “Something” function prototype as:

public async Task Something(int taskNumber) { await Task.Delay(TimeSpan.FromSeconds(1)); Console.WriteLine(“Task Number: “ + taskNumber); }

and then change the “() => Something(count)” action to:

async () => await Something(count)

Cyber Sinh

Your code doesn’t seem to work as expected when tasks contain async code. Awaiting the StartAndWaitAllThrottledAsync method doesn’t suspend the execution of the caller method.

You will find here a sample repro: http://pastebin.com/iE1cc5d5

Do you know what is wrong? Thanks.

Sean

Hi Chris,

Interesting example and project. Question…

I don’t see any actual task(s). For example, lets say I need to build up 5,000 url(s) to scrape those pages using webrequest or httpclient, but only allow so many concurrently, how do I make that into a task list and then throw it at your template?

Also, I get data out of those pages, and process those page strings as they come in, so does any call from inside your template that is NOT ASync become automatically Async because it is called from an Async process? In this regard, would the Await task.whenAny() work in your template as well, without breaking it?

Awesome stuff thanks! -S

Ritash Koul

Question:

I have a windows service which is consuming a messaging system(Kafka) to fetch messages. I have to use an existing callback mechanism with the help of Timer class which helps me to check the message after some fixed time to fetch and process. Previously, the service is processing the message one by one. But I want after the message arrives the processing mechanism to execute in parallel. So if the first message arrived it should go for processing on one task and even if the processing is not finished for the first message still after the interval time configured using the callback method (callback is working now) next message should be picked and processed on a different task. In my case I dont have to use WaitAll because the messages will be independent of each other. Till now I am using the below code:

private static void Main() { try { int taskCount = 5; Task.Factory.StartNewAsync(() => { Subscriber consumer = new Subcriber() { Interval = 1000 };

               consumer.CallBack(Process, msg =&gt; msg!= 
               null);
             }, taskCount);
            Console.ReadLine();
          }
         catch (Exception e)
        {
             Console.WriteLine(e.Message);
        }

        public static void StartNewAsync(this TaskFactory 
        target, Action action, int taskCount)
       {
             var tasks = new Task[taskCount];
             for (int i = 0; i &lt; taskCount; i++)
             {
                  tasks[i] = target.StartNew(action);
             }
         }

         public static void Process(Message message)
        {
             if (message != null)
            {

             }
            else
            { }
         }
    }

But my problem is I want to know which task id picks the message after a message is received in the method Process(CDC message)?

Leave a Comment

Your email address will not be published. Required fields are marked *

Loading...