; * @param capacity 阻塞队列初始容量
* @param processor 回调接口
*/
public WorkThread(String threadName, int queueSizeLimit, int period, int capacity, Processor<T> processor) {
this.threadName = threadName;
this.queueSizeLimit = queueSizeLimit;
this.period = period;
this.lastFlushTime = System.currentTimeMillis();
this.processor = processor;
this.queue = new ArrayBlockingQueue(capacity);
}
/**
* 往阻塞队列中添加元素
* @param item 添加的对象
* @return true:添加成功 false:添加失败
*/
public boolean add(T item) {
// log.info("add result:"+item);
boolean result = this.queue.offer(item);
// log.info("resultP{}",result);
this.checkQueueSize();
return result;
}
/**
* 当前时间与上次任务处理时间差是否超过指定阈值;如果超过触发start方法
*/
public void timeout() {
// log.info("{}====check timeout",currentThread.getName());
if (System.currentTimeMillis() - this.lastFlushTime >= (long)this.period) {
log.info("当前时间距离上次任务处理时间周期={}超出指定阈值={}",System.currentTimeMillis() - this.lastFlushTime ,period);
this.start();
&nb |