我们常见的一种场景是合并上报,指定时间还没达到批量的阈值,有多少条报多少条,现在基于BlockingQueue
实现了一种简单的方式去实习这个特性。
- Add 方法
queue.offer(logItem, 10, TimeUnit.MILLISECONDS) // 如果队列已满,需要超时等待一段时间,使用此方法 |
- 批量获取方法
BlockingQueue
的批量取方法drainTo()
不支持超时特性,但是注意到poll()
支持,结合这两者的特性,我们做了如下的改动
public static <E> int batchGet(BlockingQueue<E> q,Collection<? super E> buffer, int numElements, long timeout, TimeUnit unit) throws InterruptedException { |
完整源代码如下:
private static final int SIZE = 5000; |