Non-blocking concurrent collection?

System.Collections.Concurrent has some new collections that work very well in multithreaded environments. However, they are a bit limited. Either they block until an item becomes available, or they return default(T) (TryXXX methods).

I'm needing a collection that is thread safe, but instead of blocking the calling thread it uses a callback to inform me that at least one item is available.

My current solution is to use a BlockingCollection, but to use the APM with a delegate to get the next element. In other words, I create a delegate to a method that Takes from the collection, and execute that delegate using BeginInvoke.

Unfortunately, I have to keep a lot of state within my class in order to accomplish this. Worse, the class is not thread safe; it can only be used by a single thread. I'm skirting the edge of maintainability, which I'd prefer not to do.

I know there are some libraries out there that make what I'm doing here pretty simple (I believe the Reactive Framework is one of these), but I'd like to accomplish my goals without adding any references outside of version 4 of the framework.

Are there any better patterns I can use that don't require outside references that accomplish my goal?



tl;dr:

Are there any patterns that satisfy the requirement:

"I need to signal a collection that I am ready for the next element, and have the collection execute a callback when that next element has arrived, without any threads being blocked."

-------------Problems Reply------------

I think I have two possible solutions. I am not particularly satisfied with either, but they do at least provide a reasonable alternative to the APM approach.

The first does not meet your requirement of no blocking thread, but I think it is rather elegant because you can register callbacks and they will get called in round-robin fashion, but you still have the ability to call Take or TryTake as you normally would for a BlockingCollection. This code forces callbacks to be registered each time an item is requested. That is the signalling mechanism for the collection. The nice thing about this approach is that calls to Take do not get starved as they do in my second solution.

public class NotifyingBlockingCollection<T> : BlockingCollection<T>
{
private Thread m_Notifier;
private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>();

public NotifyingBlockingCollection()
{
m_Notifier = new Thread(Notify);
m_Notifier.IsBackground = true;
m_Notifier.Start();
}

private void Notify()
{
while (true)
{
Action<T> callback = m_Callbacks.Take();
T item = Take();
callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
}
}

public void RegisterForTake(Action<T> callback)
{
m_Callbacks.Add(callback);
}
}

The second does meet your requirement of no blocking thread. Notice how it transfers the invocation of the callback to the thread pool. I did this because I am thinking that if it got executed synchronously then the locks would be held longer resulting in the bottlenecking of Add and RegisterForTake. I have looked it over closely and I do not think it can get live locked (both an item and a callback are available, but the callback never gets executed) but you might want to look it over yourself to verify. The only problem here is that a call to Take would get starved as callbacks always take priority.

public class NotifyingBlockingCollection<T>
{
private BlockingCollection<T> m_Items = new BlockingCollection<T>();
private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>();

public NotifyingBlockingCollection()
{
}

public void Add(T item)
{
lock (m_Callbacks)
{
if (m_Callbacks.Count > 0)
{
Action<T> callback = m_Callbacks.Dequeue();
callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
}
else
{
m_Items.Add(item);
}
}
}

public T Take()
{
return m_Items.Take();
}

public void RegisterForTake(Action<T> callback)
{
lock (m_Callbacks)
{
T item;
if (m_Items.TryTake(out item))
{
callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
}
else
{
m_Callbacks.Enqueue(callback);
}
}
}
}

How about something like this? (The naming could probably use some work. And note that this is untested.)

public class CallbackCollection<T>
{
// Sychronization object to prevent race conditions.
private object _SyncObject = new object();

// A queue for callbacks that are waiting for items.
private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>();

// A queue for items that are waiting for callbacks.
private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>();

public void Add(T item)
{
Action<T> callback;
lock (_SyncObject)
{
// Try to get a callback. If no callback is available,
// then enqueue the item to wait for the next callback
// and return.
if (!_Callbacks.TryDequeue(out callback))
{
_Items.Enqueue(item);
return;
}
}

ExecuteCallback(callback, item);
}

public void TakeAndCallback(Action<T> callback)
{
T item;
lock(_SyncObject)
{
// Try to get an item. If no item is available, then
// enqueue the callback to wait for the next item
// and return.
if (!_Items.TryDequeue(out item))
{
_Callbacks.Enqueue(callback);
return;
}
}
ExecuteCallback(callback, item);
}

private void ExecuteCallback(Action<T> callback, T item)
{
// Use a new Task to execute the callback so that we don't
// execute it on the current thread.
Task.Factory.StartNew(() => callback.Invoke(item));
}
}

Category:c# Views:0 Time:2010-07-19

Related post

  • Bounded, auto-discarding, non-blocking, concurrent collection 2010-10-23

    I'm looking for a collection that: is a Deque/List - i.e. supports inserting elements at "the top" (newest items go to the top) - deque.addFirst(..) / list.add(0, ..). It could be a Queue, but the iteration order should be reverse - i.e. the most rec

  • What is "non-blocking" concurrency and how is it different than normal concurrency? 2010-05-13

    What is "non-blocking" concurrency and how is it different than normal concurrency using threads? Why don't we use non-blocking concurrency in all the scenarios where concurrency is required? Is there overhead for using non-blocking concurrency? I ha

  • Why java concurrent collections are really thread-safe 2010-11-23

    I was looking at the code of java concurrent collections and I see that they just wrap simple collections with locking some lock in the beginning of the operation and unlocking it in the end. What about volatile? If the back end collection is not vol

  • Use concurrent collection in EJB program 2011-01-20

    Can I use concurrent collection (java.util.concurrent) in EJB program? Because there are many constrains in managed-environment. Perhaps EJB container manages all threads control, thus java.util.concurrent will violate the EJB environment. Do I under

  • Is concurrent collection's contain-like method useless? 2012-01-30

    if(concurrentHashMap.containKey(key)) { // oops, v has been removed in another thread right after current thread // complete containKey calling Value v = concurrentHashMap.get(key); // do something on v // null pointer exception } It seems concurrent

  • java: concurrent collections 2012-02-09

    I'm trying to find one or more concurrent collections to use that I can implement the following behavior (the names are contrived for analogy purposes): /** * Acts as a broker for a concurrent hash map that stores its keys in order * of submission. A

  • Concurrent collections in C# 2008-09-02

    I'm looking for a way of getting a concurrent collection in C# or at least a collection which supports a concurrent enumerator. Right now I'm getting an InvalidOperationException when the collection over which I'm iterating changes. I could just deep

  • Why are there no concurrent collections in C#? 2009-12-22

    I am trying to get an overview of the thread safety theory behind the collections in C#. Why are there no concurrent collections as there are in Java? (java docs). Some collections appear thread safe but it is not clear to me what the position is for

  • What does C++ Graph mean in Intel Concurrent Collections? 2010-10-03

    I am reading the user guide that came with Intel concurrent collections, and I don't understand what they mean by the following sentence [Page 4 Section 1.1]: You define an Intel Concurrent Collections for C++ graph which specifies the following: Any

  • What is the difference between SynchronizedCollection and the other concurrent collections? 2011-01-11

    How does SynchronizedCollection<T> and the other concurrent collections in the System.Collections.Concurrent namespace differ from each other, apart from Concurrent Collections being a namespace and SynchronizedCollection<T> being a class

  • Should volatile be used with a (non-concurrent) collection? 2011-06-29

    I'm familiar with the basic idea of volatile (to prevent compiler optimization of instructions involving values that may be accessed from multiple threads, in summary), but I've noticed that examples I've found involving volatile and .NET 3.5 collect

  • iPhone/mobile safari = .css files block concurrent server requests? 2011-08-02

    A mobile web site project I've been working on has been recently been analyzed by a performance consulting firm and they came back recommending that we move all of our .css file links to the BOTTOM of the HTML to accommodate issues on the iPhone wher

  • How to sort a concurrent collection in .NET 4.0 2011-08-09

    How to sort a concurrent collection in .NET 4.0 For example I have constructed my ConcurrentBag collection. How can I sort the elements in it? ConcurrentBag<string> stringCollection; ConcurrentBag<CustomType> customCollection; -----------

  • Concurrent collections eating too much cpu without Thread.Sleep 2011-07-01

    What would be the correct usage of either, BlockingCollection or ConcurrentQueue so you can freely dequeue items without burning out half or more of your CPU using a thread ? I was running some tests using 2 threads and unless I had a Thread.Sleep of

  • Java Concurrent Collection Search 2011-08-30

    I've been programming in Java for sometime but new to concurrent programming, so bear with me! I'm trying to develop a class that holds a group of Collection classes [eg ArrayLists] and then to find a specified value it traverses all collections at t

  • is non-blocking concurrent programming for real? 2010-11-09

    I have been reading random links on non-blocking algorithms and uses thereof in concurrent programming. Are there any useful libraries using non-blocking algorithms with C/C++ and what types of concurrent data structures that benefit the most from th

  • could smtpclient block concurrent logins to an smtp server? 2011-04-16

    so i am developing an application where an email should be sent via smtp whenever an error occurs. Problem is: what will happen if 2 or more people use the same credentials to send the email? will the second request be blocked? I am using Smtpclient

  • Can concurrent collection be run in Young Generation for minor collection 2011-07-11

    Or in other words, is there any algorithm like 'concurrent copy algorithm' which can help in low pause for minor collections? --------------Solutions------------- Yes, different such algorithms exist. The easiest to implement Eden / Young-Generation

  • Concurrent Collections and Unique elements 2011-10-04

    Hi i have a concurrent BlockingCollection and i have repeated elements how can make to add Distinct elements or get Distinct elements ? --------------Solutions------------- The default backing store for BlockingCollection is a ConcurrentQueue. As som

Copyright (C) dskims.com, All Rights Reserved.

processed in 0.082 (s). 11 q(s)