做socket应用用到了BlockingQueue接口,可用于生产者消费者模式,多个线程阻塞着等待queue的数据到来,但是如果是该线程需要等待某个特定的数据该如何处理呢,自己写了个BlockingMap
public interface BlockingMap<V> {
public void put(Integer key, V o) throws InterruptedException;
public V take(Integer key) throws InterruptedException;
public V poll(Integer key, long timeout) throws InterruptedException;
}
public class HashBlockingMap<V> implements BlockingMap<V> {
private ConcurrentMap<Integer, Item<V>> map;
private final ReentrantLock lock = new ReentrantLock();
public HashBlockingMap() {
map = new ConcurrentHashMap<Integer, Item<V>>();
}
public void put(Integer key, V o) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (map.containsKey(key)) {
Item<V> item = map.get(key);
item.put(o);
} else {
Item<V> item = new Item<V>();
map.put(key, item);
item.put(o);
}
} finally {
lock.unlock();
}
}
public V take(Integer key) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (!map.containsKey(key)) {
map.put(key, new Item<V>());
}
} finally {
lock.unlock();
}
Item<V> item = map.get(key);
V x = item.take();
map.remove(key);
return x;
}
public V poll(Integer key, long timeout) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (!map.containsKey(key)) {
map.put(key, new Item<V>());
}
} finally {
lock.unlock();
}
Item<V> item = map.get(key);
V x = item.poll(timeout);
map.remove(key);
return x;
}
private static class Item<E> {
private final ReentrantLock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();
private E obj = null;
private void put(E o) throws InterruptedException {
if (o == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
obj = o;
cond.signal();
} finally {
lock.unlock();
}
}
E take() throws InterruptedException {
E x;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (obj == null) {
cond.await();
}
} catch (InterruptedException ie) {
cond.signal();
throw ie;
}
x = obj;
} finally {
lock.unlock();
}
return x;
}
private E poll(long timeout) throws InterruptedException {
timeout = TimeUnit.MILLISECONDS.toNanos(timeout);
E x;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (obj != null) {
x = obj;
break;
}
if (timeout <= 0) {
return null;
}
try {
timeout = cond.awaitNanos(timeout);
} catch (InterruptedException ie) {
cond.signal();
throw ie;
}
}
} finally {
lock.unlock();
}
return x;
}
}
}
// 消费者根据sequence取得自己想要的对象
Response response = blockingMap.poll(sequence, timeout);
// 生产者
blockingMap.put(response.getSequence(), response);
分享到:
相关推荐
设备驱动中阻塞与非阻塞及实现:在Linux驱动程序中,我们可以使用等待队列(wait queue)来实现阻塞操作。wait queue很早就作为一个基本的功能单位出现在Linux内核里了,它以队列为基础数据结构,与进程调度机制紧密...
httplib库由阻塞式改为非阻塞式,通过线程池管理
电子-linux底层驱动中阻塞IO的实现.zip,单片机/嵌入式STM32-F0/F1/F2
非阻塞select 实现 socket服务器使用select 实现 非阻塞的方式 来实现 服务端
Verilog HDL非阻塞赋值工程实现
Verilog HDL阻塞赋值实现
非阻塞模式实现tcp通讯 服务器端代码 java实现
Java语言中非阻塞算法的实现
主要介绍了Python实现socket非阻塞通讯功能,结合实例形式分析了Python使用socket模块进行非阻塞通讯的原理、多线程及客户端、服务器端相关实现技巧,需要的朋友可以参考下
Java语言中非阻塞算法的实现.pdf
阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池...
用Java实现非阻塞通信 java.nio包提供了支持非阻塞通信的类,主要包括: ● ServerSocketChannel:ServerSocket的替代类,支持阻塞通信与非阻塞通信。 ● SocketChannel:Socket的替代类,支持阻塞通信与非阻塞通信...
使用MFC实现非阻塞套接字通信,可以多个客户端和一个服务器任意通信,通信协议采用protobuf,代码可直接运行exe文件,平台为vs2013。
阻塞图是另外支持在检索元素时等待键可用的操作的图。 阻塞映射在并发环境中充当生产者和消费者之间的同步器。 注意:该项目现在托管在GitHub(https://github.com/sarveswaran-m/blockingMap/wiki)
跟着野火学FreeRTOS:第一段(空闲任务与阻塞延时的实现)
用c++编写的程序 模拟实现进程管理 可以实现就绪、运行、阻塞三态之间的转变。
使用非阻塞方式实现串口打印,stm32f103rct6
Java实现简单的阻塞队列2种方式,1使用wait(),notify();2使用countdownlatch实现
基于JavaNIO的非阻塞通信的研究与实现
写了一个模拟程序对map中的一项进行读或者写,后台一直运行的协程阻塞的接受读写信号,并对map进行操作,但是读操作的时候没想好怎么返回这个值。 后来想到用传引用的方式,定义结构体,第一个参数是读写的标志,第...