PreprocessIterator<TK, T>

Thursday, September 03, 2009 / Posted by Luke Puplett /

To get myself in the spirit of coding again after my holiday, I thought I’d implement an idea I’d had scribbled down for a few weeks. The concept goes something like this. An iterator usually dishes stuff up and then work is done upon that stuff before it is called again and more stuff is dished up. While the work is being done, would it not be prudent to prepare the next dish?

So I put my programming cape on and went to work. I created a simple class that takes and executes a delegate as the fetch or processing logic – this could either get something from a database, or process a bitmap or something – and then returns the result(s) to the caller in an iterator.

Using the class may look like this:

List<string> words = new List<string>();
words.Add("apple");
words.Add("pear");
words.Add("binoculars");

Func<string, string> processor = delegate(string s)
{
    string initial = s.Substring(0, 1).ToUpper();
    // Optional sleep.
    return String.Format("{0}{1}", initial, s.Substring(1));
};

PreprocessIterator<string, string> ppi
    = new PreprocessIterator<string, string>(processor, words);

foreach (var word in ppi)
{
    // Optional sleep.
    if (word == "Binoculars")
        Console.WriteLine(word);
}

I’ll jump right to the results and put the code of the actual class at the bottom.

The following shows the average time in ticks it takes for a single test to run. The tests are ran many times to get an average time and minimise ‘interference’ from other processes on my PC. I tested the code above as well as an equivalent version that doesn’t use the PreprocessIterator.

Because the work being carried out above is not very time-consuming I also added a pretend cost, in the shape of a Sleep(ms) call, to the processor work delegate and to the code that consumes the data from the iterator.

10,000 tests without faux cost:

8,770      // parallel <—actually performs much worse.
4,496      // serial

10 tests with 100:100ms faux cost:

4,141,667   // parallel
6,062,455   // serial

100 tests with 20:20ms faux cost:

   899,104    // parallel
1,347,777    // serial

 

It’s clear from the first test how the overhead of queuing a task on the threadpool far outways any performance gain I stand to make from having that task performed at the same time as the work of consuming the last lot. This was as expected.

However when the consumption of data is more costly than this overhead, then speedups can be achieved.

Using this class may not be as beneficial as other methods, especially when with new CLR 4.0 ThreadPool. If the main thread (the one consuming the data) has to wait for the producer, then this is dead time.

Partitioning data and working concurrently on those partitions in serial may yield better results as any thread completing early will go on to steal from those that are still running.

Where the processing involves invoking work on another computer then it is surely always better to kick the process off sooner rather than later as the CPU cycles expended on performing that work do not effect any that may have been better put to use locally, of course.

Here’s the code for the whole class:

    using System;
    using System.Threading;
    using System.Collections.Generic;

    public class PreprocessIterator<KT, T>
    {
        Exception _exception;
        AutoResetEvent _waitHandle = new AutoResetEvent(false);
        T _preprocessed;

        public PreprocessIterator(Func<KT, T> processor, IList<KT> keys)
        {
            this.Keys = new List<KT>(keys);
            this.ProcessAction = processor;
        }

        private Func<KT, T> ProcessAction { get; set; }
        private IList<KT> Keys { get; set; }

        public IEnumerator<T> GetEnumerator()
        {
            int upperBound = this.Keys.Count - 1;
            int i = 0;
            T processed = this.ProcessAction.Invoke(this.Keys[i]);

            while(i <= upperBound)
            {
                if (i < upperBound) // Preprocess the next thing (if there's more than one).
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.StartWork), this.Keys[i + 1]);

                yield return processed;     // return execution to caller.

                if (i == upperBound)
                    break;
    
                processed = this.EndWork();
                i++;
            }
        }

        private void StartWork(object obj)
        {            
            try
            {
                _preprocessed = this.ProcessAction.Invoke((KT)obj);
            }
            catch (Exception e)
            {
                _exception = e;
            }
            finally
            {
                _waitHandle.Set();
            }
        }

        private T EndWork()
        {
            _waitHandle.WaitOne();
            if (_exception != null)
                throw _exception;

            return _preprocessed;
        }
    }

Labels: ,

0 comments:

Post a Comment