`
songsong
  • 浏览: 11290 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

阻塞Map BlockingMap的实现

阅读更多
做socket应用用到了BlockingQueue接口,可用于生产者消费者模式,多个线程阻塞着等待queue的数据到来,但是如果是该线程需要等待某个特定的数据该如何处理呢,自己写了个BlockingMap
public interface BlockingMap<V> {
	
	public void put(Integer key, V o) throws InterruptedException;
	
	public V take(Integer key) throws InterruptedException;
	
	public V poll(Integer key, long timeout) throws InterruptedException;

}

public class HashBlockingMap<V> implements BlockingMap<V> {

	private ConcurrentMap<Integer, Item<V>> map;
	
	private final ReentrantLock lock = new ReentrantLock();

	public HashBlockingMap() {
		map = new ConcurrentHashMap<Integer, Item<V>>();
	}

	public void put(Integer key, V o) throws InterruptedException {
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			if (map.containsKey(key)) {
				Item<V> item = map.get(key);
				item.put(o);
			} else {
				Item<V> item = new Item<V>();
				map.put(key, item);
				item.put(o);
			}
		} finally {
            lock.unlock();
        }
	}

	public V take(Integer key) throws InterruptedException {
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			if (!map.containsKey(key)) {
				map.put(key, new Item<V>());
			}
		} finally {
            lock.unlock();
        }

		Item<V> item = map.get(key);
		V x = item.take();
		map.remove(key);

		return x;
	}
	
	public V poll(Integer key, long timeout) throws InterruptedException {
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			if (!map.containsKey(key)) {
				map.put(key, new Item<V>());
			}
		} finally {
            lock.unlock();
        }

		Item<V> item = map.get(key);
		V x = item.poll(timeout);
		map.remove(key);

		return x;
	}

	private static class Item<E> {

		private final ReentrantLock lock = new ReentrantLock();

		private final Condition cond = lock.newCondition();

		private E obj = null;

		private void put(E o) throws InterruptedException {
			if (o == null)
				throw new NullPointerException();
			final ReentrantLock lock = this.lock;
			lock.lockInterruptibly();
			try {
				obj = o;
				cond.signal();
			} finally {
				lock.unlock();
			}
		}
		
		E take() throws InterruptedException {
			E x;
			final ReentrantLock lock = this.lock;
			lock.lockInterruptibly();
			try {
				try {
					while (obj == null) {
						cond.await();
					}
				} catch (InterruptedException ie) {
					cond.signal();
					throw ie;
				}
				x = obj;
			} finally {
				lock.unlock();
			}
			return x;
		}
		
		private E poll(long timeout) throws InterruptedException {
			timeout = TimeUnit.MILLISECONDS.toNanos(timeout);
			E x;
			final ReentrantLock lock = this.lock;
			lock.lockInterruptibly();
			try {
	            for (;;) {
	                if (obj != null) {
	                    x = obj;
	                    break;
	                }
	                if (timeout <= 0) {
	                    return null;
	                }
	                try {
	                	timeout = cond.awaitNanos(timeout);
	                } catch (InterruptedException ie) {
	                	cond.signal();
	                    throw ie;
	                }
	            }
	        } finally {
	        	lock.unlock();
	        }
			return x;
		}

	}

}

// 消费者根据sequence取得自己想要的对象
Response response = blockingMap.poll(sequence, timeout);
// 生产者
blockingMap.put(response.getSequence(), response);

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics