Solving Threading with the Producer Consumer Model

Introduction

You’ve rented out one of the larger EC2 instances to do some batch processing on, but it can still take for-ev-er for some particular jobs to finish up. We can see that we’re only using one of the virtual cores that come with it; the remaining three or seven are staying idle. Threading the application seems like a great way to speed up the processing by leveraging the remaining cores. However, how to best split up the work is still an issue. If you assume that all the work takes a uniform amount of time, it’s easy to split to give each thread an equal number of tasks. In the real world, though, it’s rare that all tasks are uniform. For example, say you added 10 new fields to a form after the first 10,000, and another 20 fields after the first 40,000. The thread assigned the first 10,000 or so are going to finish much quicker than the remaining ones. They could grab a random form, but where does the master list reside? How will they know which thread has gotten which piece of information?

This is a class of problems known as the Producer – Consumer problem. In the most general terms, N threads are listening for data that’s coming from one particular source. In operating systems, this can be input listeners, network, etc. For our uses here, we’ll have one producer thread passing on information to N number of workers.

For this problem, we’ll use an easy example of downloading web pages for a load test. We’ll attempt to download 50 pages per second spread out accross 16 threads. Since these are heavy on IO wait, we can run more threads than there are processors on the machine. The producer will update a shared memory location, and all the consumers will retrieve one entry at a time from the data structure. In this way, we can be sure that each thread is only doing work if it has no other work to do, so it’ll be evenly distributed across all workers.

Implementation

Shared Memory

This is the tie that binds the producer and the consumer together. In short, the shared memory structure allows the producer thread a place to store tasks that workers can grab whenever they determine they need more work to do. All the methods in this class are synchronized to avoid the dreaded ConcurrentModificationException. Since threads can preempt other threads, there’s no guarantee that a List.remove() method will finish before another thread tries to call List.remove() again, which can result in some corrupted data structures.

public class SharedDataStructure {
    //Data structure to store the urls
    protected static final List<String> urls;
    static {
        urls = new LinkedList<String>();
    }
    
    //The producer should call this method when it wants to add another url to be downloaded
    public static void push(String url) {
        synchronized(urls) {
            urls.add(url);
        }
    }
    
    // The consumers should call this to get another url to download
    public static String pop() {
        synchronized(urls) {
            if (urls.size() > 0) {
                return urls.remove(0);
            }
            return null;
        }
    }
}

Producer

The producer in this case is just going to grab 50 random pages from a static list of webpages to be included in the test. In an ideal test, these should be weighted according to the access patterns of the website, but we’re going simple here!

public class Producer extends Thread {
    private static final String[] webpages = { "http://google.com", "http://bing.com", "http://yahoo.com", "http://reddit.com" };

    @Override
    public void run() {
        Random random = new Random();
        while (true) {
            for (int i = 0; i < 50; i++) {
                String url = webpages[random.nextInt(webpages.length])];
                SharedDataStructure.push(url);
            }
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumer

The sole functions of the consumer are to read the next value off of the shared memory structure and grabbing the web page. In this example, I’m just going to read the first line from the response to keep the code short; in practice, this should also be sufficient for certain types of load tests, though you’ll probably always want to download the whole page.

public class Consumer extends Thread {
    @Override
    public void run() {
        while (true) {
            String url = SharedDataStructure.pop();
            if (url != null) {
                try {
                    URLConnection urlConnection = new URL(url).openConnection();
                    InputStreamReader reader = new InputStreamReader(urlConnection.getInputStream());
                    reader.read();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Starting it up

Now that we’ve got all three parts of the program written, let’s start it up!

public class Main {
    public static void main(String args[]) {
        for (int i = 0; i < 16; i++) {
            new Consumer().start();
        }
        new Producer().start();
    }
}

Conclusion

As we’ve discovered here, generalizing threaded batch processes to producer consumer type problems allows us to easily divide work fairly among as many threads as we’d like!

If you’d like to read further about the this, the following books are a great place to start for beginners and intermediate programmers!