侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 285 篇文章
  • 累计创建 125 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

java并发编程之DelayQueue延时队列原理及实战

孔子说JAVA
2022-08-01 / 0 评论 / 0 点赞 / 117 阅读 / 21,153 字 / 正在检测是否收录...

DelayQueue延时队列是一种阻塞队列,是由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列则基于数组的扩容实现。入队的对象必须要实现Delayed接口(Delayed继承自Comparable接口),出队的对象必须在规定的到期时间之后才能取走。该队列是有序的,即在创建元素时,可以指定多久可以从队列中获取到当前元素。只有在延时期满才能从队列中获取到当前元素。

image-1659318931009

1、DelayQueue介绍

1.1 简介

DelayQueue 是⼀个没有边界的 BlockingQueue 实现,加⼊其中的元素必需实现Delayed接⼝。当⽣产者线程调⽤put之类的⽅法加⼊元素时,会触发Delayed 接⼝中的 compareTo ⽅法进⾏排序,也就是说队列中元素的顺序是按到期时间排序的,⽽不是它们进⼊队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间越晚。

  • 消费者线程查看队列头部的元素,注意是查看不是取出。然后调⽤元素的getDelay⽅法,如果此⽅法返回的值⼩0或者等于0,则消费者线程会从队列中取出此元素,并进⾏处理。如果getDelay⽅法返回的值⼤于0,则消费者线程wait返回的时间值后,再从队列头部取出元素,此时元素应该已经到期。

  • Delayed接口是一种混合风格的接口,是一个具有过期时间的元素,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。

  • PriorityQueue是一个根据队列里元素某些属性排列先后的顺序队列。

  • DelayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素。

  • DelayQueue是Leader-Followr模式的变种,消费者线程处于等待状态时,总是等待最先到期的元素,⽽不是长时间的等待。消费者线程尽量把时间花在处理任务上,最⼩化空等的时间,以提⾼线程的利⽤效率。

  • 不能将null元素放置到 DelayQueue 队列中。

1.2 DelayQueue的运行过程

下面通过队列及消费者线程状态变化描述一下DelayQueue的运行过程。

DelayQueue运⾏的初始状态

20221191445138471

因为队列是没有边界的,向队列中添加元素的线程不会阻塞,添加操作相对简单。假设现在共有三个消费者线程。

  1. 队列中的元素按到期时间排序,队列头部的元素2s以后到期。消费者线程1查看了头部元素以后,发现还需要2s才到期,于是它进⼊等待状态,2s以后醒来,等待头部元素到期的线程称为 Leader 线程。

  2. 消费者线程2与消费者线程3处于待命状态,它们不等待队列中的⾮头部元素。当消费者线程1拿到对象5以后,会向它们发送signal。这个时候两个中的⼀个会结束待命状态⽽进⼊等待状态。

DelayQueue运⾏2S以后

20221191445138472

  1. 消费者线程1已经拿到了对象5,从等待状态进入处理状态,处理它取到的对象5,同时向消费者线程2与消费者线程3发送signal。

  2. 消费者线程2与消费者线程3会争抢领导权,图示是消费者线程2进入等待状态,成为 Leader 线程,等待2s以后对象4到期。而消费者线程3则继续处于待命状态。

  3. 此时队列中加入了一个新元素对象6,它10s后到期,排在队尾。

DelayQueue再运⾏2S以后

20221191445138473

  1. 先看线程1,如果它已经结束了对象5的处理,则进入待命状态。如果还没有结束,则它继续处理对象5。

  2. 消费线程2取到对象4以后,也进入处理状态,同时给处于待命状态的消费线程3发送信号,消费线程3进入等待状态,成为新的Leader。现在头部元素是新插入的对象7,因为它1s以后就过期,要早于其它所有元素,所以排到了队列头部。

DelayQueue再运⾏1S以后

此时运行状态分为2种情况:

情况一(不好的结果):

20221191445138474

  1. 消费线程3一定正在处理对象7。消费线程1与消费线程2还没有处理完它们各自取得的对象,无法进入待命状态,也更加进入不了等待状态。此时对象3马上要到期,那么如果它到期时没有消费者线程空下来,则它的处理一定会延期。可以想见,如果元素进入队列的速度很快,元素之间的到期时间相对集中,而处理每个到期元素的速度又比较慢的话,则队列会越来越大,队列后边的元素延期处理的时间会越来越长。

情况二(好的结果):

20221191445138475

  1. 消费线程1与消费线程2很快的完成对取出对象的处理,及时返回重新等待队列中的到期元素。一个处于等待状态(Leader),对象3一到期就立刻处理。另一个则处于待命状态。这样,每一个对象都能在到期时被及时处理,不会发生明显的延期。所以,消费者线程的数量要够,处理任务的速度要快。否则,队列中的到期元素无法被及时取出并处理,造成任务延期、队列元素堆积等情况。

2、DelayQueue源码分析

2.1 DelayQueue类定义

DelayQueue是一个阻塞队列,其实现了BlockingQueue接口。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
}

2.2 队列元素

DelayQueue<E extends Delayed> 的队列元素需要实现Delayed接口,即添加到延迟队列中的元素E必须实现Delayed接口,接口定义如下:

public interface Delayed extends Comparable<Delayed> {
 
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

该接口继承自Comparable,由Delayed定义可以得知,队列元素需要实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法, getDelay定义了剩余到期时间,compareTo方法定义了元素排序规则。注意,元素的排序规则影响了元素的获取顺序。

// 获取元素的延误时间
long getDelay(TimeUnit unit);

// 元素比较
public int compareTo(T o);

  • 为了将最早过期的元素放置在队列头部,DelayQueue基于PriorityQueue优先队列来维护队列中的元素;
  • DelayQueue基于ReentrantLock的Condition实现了阻塞。

image

2.3 内部存储结构

DelayQueue的元素存储交由优先级队列存放。DelayQueue的优先级队列使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。

  • 若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
 
    //锁
    private final transient ReentrantLock lock = new ReentrantLock();
    //优先级队列 执行时间最早的排在第一个
    private final PriorityQueue<E> q = new PriorityQueue<E>(); // 存放元素
    
    //是否有线程在等待任务到执行时间
    private Thread leader;
    //条件唤醒
    private final Condition available = lock.newCondition();

入队列时的步骤:

  1. 可重入锁。
  2. 用于根据delay时间排序的优先级队列。
  3. 用于优化阻塞通知的线程元素leader。
  4. 用于实现阻塞和通知的Condition对象。

2.4 入队方法

offer方法将元素插入到队列中,并返回是否成功插入。具体过程如下:

  1. 执行加锁操作。
  2. 把元素添加到优先级队列中。
  3. 查看元素是否为队首。
  4. 如果是队首的话,设置leader为空,唤醒所有等待的队列。
  5. 释放锁。
public boolean offer(E e) {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		// 执行优先队列的offer方法,将元素插入
		q.offer(e);
		// 如果优先队列的首元素为该元素
		if (q.peek() == e) {
			// 将leader置为null,代表可以获取该元素
			leader = null;
			// 通知工作线程来获取该元素
			available.signal();
		}
		// 返回true
		return true;
	} finally {
		lock.unlock();
	}
}

public boolean add(E e) {
   return offer(e);
}

put方法为阻塞队列(BlockingQueue)的方法,内部直接调用offer方法:

public void put(E e) {
	offer(e);
}

offer(E e, long timeout, TimeUnit unit)也属于阻塞队列(BlockingQueue)的方法,入参需要额外指定超时时间timeout,但仍然直接调用的offer方法,timeout未发挥作用。

public boolean offer(E e, long timeout, TimeUnit unit) {
	return offer(e);
}

由于底层存储元素的PriorityQueue优先队列相当于是"无界"队列(最多Integer.MAX_VALUE-8个元素),所以添加元素的阻塞与非阻塞实现是一致的。

2.5 出队方法

take()是阻塞方法,若首元素的延迟时间未到,则持续阻塞。出队列时做了如下步骤:

  1. 执行加锁操作
  2. 取出优先级队列元素q的队首
  3. 如果元素q的队首/队列为空,阻塞请求
  4. 如果元素q的队首(first)不为空,获得这个元素的delay时间值
  5. 如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法
  6. 如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露
  7. 判断leader元素是否为空,不为空的话阻塞当前线程
  8. 如果leader元素为空的话,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队首到达可以出队的时间,在finally块中释放leader元素的引用
  9. 循环执行从1~8的步骤
  10. 如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程
  11. 执行解锁操作
public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	// 添加可中断锁
	lock.lockInterruptibly();
	try {
		for (;;) {
			E first = q.peek();
			// 若优先队列为空,代表无可消费的元素,则消费线程执行awit()方法阻塞等待
			if (first == null)
				available.await();
			// 若优先队列不为空
			else {
				// 获取优先队列首元素的过期时间
				long delay = first.getDelay(NANOSECONDS);
				// 若已过期,则直接返回优先队列的首元素
				if (delay <= 0L)
					return q.poll();
				first = null; // don't retain ref while waiting
				// 若leader不为空,则证明已有线程在拉取首元素,当前线程阻塞
				if (leader != null)
					available.await();
				else {
				// 若leader不为空,则将leader设置为自身线程,然后await delay
				// 延迟时间到达之后,会重新进入到for(;;) 此时获取到first的delay<=0,执行q.poll()返回首元素
					Thread thisThread = Thread.currentThread();
					leader = thisThread;
					try {
						available.awaitNanos(delay);
					} finally {
						if (leader == thisThread)
							leader = null;
					}
				}
			}
		}
	} finally {
		// 如果leader已置为null且优先队列q中首元素不为null,则唤醒等待的线程,并释放锁
		if (leader == null && q.peek() != null)
			available.signal();
		lock.unlock();
	}
}

可以看到。take()阻塞方法是若队列为空,则持续await阻塞挂起,直到被队列中添加元素的线程signal唤醒;

poll()为非阻塞方法,检索并删除此队列的头部,如果此队列首元素的延迟时间未到,则直接返回null。

  • 获取元素时,总是判断PriorityQueue队列的队首元素是否到期,若未到期,返回null,所以compareTo()的方法实现不当的话,会造成队首元素未到期,而队列中有到期元素却获取不到的情况。因此,队列元素的compareTo方法实现需要注意,应该根据过期时间进行排序。
public E poll() {
	final ReentrantLock lock = this.lock;
	// 加锁
	lock.lock();
	try {
		// 获取优先队列的首元素
		E first = q.peek();
		// 如果首元素为null或者首元素的延迟时间>0,则直接返回null
		// 如果首元素不为null,且首元素的延迟时间<=0,则执行q.poll(),返回优先队列的首元素
		return (first == null || first.getDelay(NANOSECONDS) > 0)
			? null
			: q.poll();
	} finally {
		// 释放锁
		lock.unlock();
	}
}

而poll(long timeout, TimeUnit unit)则是阻塞方法,timeout为超时时间,其会在timeout时间内反复尝试获取优先队列的首元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		for (;;) {
			E first = q.peek();
			// 若优先队列首元素为null
			if (first == null) {
				// 超时时间nanos<=0,代表已超时,直接返回null
				if (nanos <= 0L)
					return null;
				// 否则阻塞等待nanos纳秒
				else
					nanos = available.awaitNanos(nanos);
			// 若预先队列首元素不为null
			} else {
				// 获取首元素的延迟时间
				long delay = first.getDelay(NANOSECONDS);
				// 如果延迟时间已到,代表任务已到期,则不管timeout是否已经<=0,均直接执行q.poll()
				if (delay <= 0L)
					return q.poll();
				// 任务未到期,但timeout已经<=0,直接返回null
				if (nanos <= 0L)
					return null;
				first = null; // don't retain ref while waiting
				// 若nanos<delay或者leader不为null,阻塞等待nanos纳秒
				if (nanos < delay || leader != null)
					nanos = available.awaitNanos(nanos);
					// 阻塞唤醒之后,继续进入到for(;;)循环,此时nanos<=0,然后会返回null
				else {
				// 若nanos>=delay且leader为null
				// 则将当前线程设置为leader,并阻塞等待delay纳秒
					Thread thisThread = Thread.currentThread();
					leader = thisThread;
					try {
						long timeLeft = available.awaitNanos(delay);
						// 被唤醒之后,更新nanos为nanos-(delay - timeLeft),继续执行新一轮的for(;;)
						nanos -= delay - timeLeft;
					} finally {
						if (leader == thisThread)
							leader = null;
					}
				}
			}
		}
	} finally {
		// 如果leader已置为null且优先队列q中首元素不为null,则唤醒等待的线程,并释放锁
		if (leader == null && q.peek() != null)
			available.signal();
		lock.unlock();
	}
}

3、应用场景

在大多数业务场景中,我们会利用中间件提供的延时消息的功能。比如利用redis zset实现 ,kafka rabbit mq 的延时队列。DelayQueue应用场景如下:

  • 订单到期(超时未支付取消)、限时支付等。
  • 调用其他系统时失败间隔重试(如调用第三方接口时,过段时间异步获取结果)。
  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期。然后用一个线程循环的查询DelayQueue队列,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  • 定时任务调度:使用DelayQueue队列保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。比如Java中的TimerQueue就是使用DelayQueue实现的。

4、应用示例

4.1 定时任务示例

DelayQueue的一个应用场景是定时任务调度。本例中先让主线程向DelayQueue添加 10 个任务,任务之间的启动间隔在1~2s之间,每个任务的执行时间固定为2s,代码如下:

DelayTask实现了Delayed接口

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class DelayTask implements Delayed {
    private static long currentTime = System.currentTimeMillis();
    protected final String taskName;
    protected final int timeCost;
    protected final long scheduleTime;

    protected static final AtomicInteger taskCount = new AtomicInteger(0);

    // 定时任务之间的启动时间间隔在1~2s之间,timeCost表示处理此任务需要的时间,本示例中为2s
    public DelayTask(String taskName, int timeCost) {
        this.taskName = taskName;
        this.timeCost = timeCost;
        taskCount.incrementAndGet();
        currentTime += 1000 + (long) (Math.random() * 1000);
        scheduleTime = currentTime;
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.scheduleTime - ((DelayTask) o).scheduleTime);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long expirationTime = scheduleTime - System.currentTimeMillis();
        return unit.convert(expirationTime, TimeUnit.MILLISECONDS);
    }

    public void execTask() {
        long startTime = System.currentTimeMillis();
        System.out.println("Task " + taskName + ": schedule_start_time=" + scheduleTime + ",real start time="
                + startTime + ",delay=" + (startTime - scheduleTime));
        try {
            Thread.sleep(timeCost);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者线程

import java.util.concurrent.BlockingQueue;

class DelayTaskComsumer extends Thread {
    private final BlockingQueue<DelayTask> queue;

    public DelayTaskComsumer(BlockingQueue<DelayTask> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        DelayTask task = null;
        try {
            while (true) {
                task = queue.take();
                task.execTask();
                DelayTask.taskCount.decrementAndGet();
            }
        } catch (InterruptedException e) {
            System.out.println(getName() + " finished");
        }
    }
}

DelayQueue测试用例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

public class DelayQueueExample {

    public static void main(String[] args) {

        BlockingQueue<DelayTask> queue = new DelayQueue<DelayTask>();

        for (int i = 0; i < 10; i++) {
            try {
                queue.put(new DelayTask("work " + i, 2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        ThreadGroup g = new ThreadGroup("Consumers");
			// 消费者线程个数
        int threadNums = 1;
        for (int i = 0; i < threadNums; i++) {
            new Thread(g, new DelayTaskComsumer(queue)).start();
        }

        while (DelayTask.taskCount.get() > 0) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        g.interrupt();
        System.out.println("Main thread finished");
    }
}

执行结果:

首先启动一个消费者线程(测试代码中 threadNums = 1),因为消费者线程处单个任务的时间为2s,而任务的调度间隔为1~2s。这种情况下,每当消费者线程处理完一个任务,回头再从队列中新取任务时,新任务肯定延期了,无法按给定的时间调度任务。而且越往后情况越严重。运行代码看一下输出:

Task work 0: schedule_start_time=1659317183227,real start time=1659317183239,delay=12
Task work 1: schedule_start_time=1659317184977,real start time=1659317185250,delay=273
Task work 2: schedule_start_time=1659317186630,real start time=1659317187260,delay=630
Task work 3: schedule_start_time=1659317187652,real start time=1659317189270,delay=1618
Task work 4: schedule_start_time=1659317189312,real start time=1659317191281,delay=1969
Task work 5: schedule_start_time=1659317190463,real start time=1659317193285,delay=2822
Task work 6: schedule_start_time=1659317192370,real start time=1659317195290,delay=2920
Task work 7: schedule_start_time=1659317193723,real start time=1659317197291,delay=3568
Task work 8: schedule_start_time=1659317195316,real start time=1659317199304,delay=3988
Task work 9: schedule_start_time=1659317197275,real start time=1659317201309,delay=4034
Main thread finished
Thread-0 finished

最后一个任务的延迟时间已经超过4s了。将消费者线程的个数调整为2后再次测试(测试代码中 threadNums = 2),这时任务应该都能够按时启动,延迟应该很小,运行程序看一下结果:

Task work 0: schedule_start_time=1659317338970,real start time=1659317338979,delay=9
Task work 1: schedule_start_time=1659317340334,real start time=1659317340343,delay=9
Task work 2: schedule_start_time=1659317342242,real start time=1659317342252,delay=10
Task work 3: schedule_start_time=1659317344104,real start time=1659317344117,delay=13
Task work 4: schedule_start_time=1659317345629,real start time=1659317345642,delay=13
Task work 5: schedule_start_time=1659317346918,real start time=1659317346932,delay=14
Task work 6: schedule_start_time=1659317348539,real start time=1659317348548,delay=9
Task work 7: schedule_start_time=1659317350520,real start time=1659317350521,delay=1
Task work 8: schedule_start_time=1659317352445,real start time=1659317352460,delay=15
Task work 9: schedule_start_time=1659317354353,real start time=1659317354356,delay=3
Main thread finished
Thread-0 finished
Thread-2 finished

基本上按时启动,最大延迟为15毫秒,最小延迟为1毫秒。将消费者线程个数调整为3后再次测试(测试代码中 threadNums = 3),运行程序看一下结果:

Task work 0: schedule_start_time=1659317835018,real start time=1659317835032,delay=14
Task work 1: schedule_start_time=1659317836823,real start time=1659317836825,delay=2
Task work 2: schedule_start_time=1659317838262,real start time=1659317838277,delay=15
Task work 3: schedule_start_time=1659317839540,real start time=1659317839553,delay=13
Task work 4: schedule_start_time=1659317840707,real start time=1659317840723,delay=16
Task work 5: schedule_start_time=1659317841864,real start time=1659317841878,delay=14
Task work 6: schedule_start_time=1659317843419,real start time=1659317843435,delay=16
Task work 7: schedule_start_time=1659317845374,real start time=1659317845376,delay=2
Task work 8: schedule_start_time=1659317846480,real start time=1659317846480,delay=0
Task work 9: schedule_start_time=1659317848384,real start time=1659317848399,delay=15
Main thread finished
Thread-0 finished
Thread-2 finished
Thread-4 finished

最大延迟为15毫秒,最小延迟为0毫秒,情况好像还不如2个线程的情况。将消费者线程数调整成5,运行看一下结果:

Task work 0: schedule_start_time=1659318007363,real start time=1659318007372,delay=9
Task work 1: schedule_start_time=1659318009064,real start time=1659318009074,delay=10
Task work 2: schedule_start_time=1659318010762,real start time=1659318010767,delay=5
Task work 3: schedule_start_time=1659318011866,real start time=1659318011868,delay=2
Task work 4: schedule_start_time=1659318012925,real start time=1659318012926,delay=1
Task work 5: schedule_start_time=1659318014505,real start time=1659318014520,delay=15
Task work 6: schedule_start_time=1659318015702,real start time=1659318015717,delay=15
Task work 7: schedule_start_time=1659318016876,real start time=1659318016880,delay=4
Task work 8: schedule_start_time=1659318018143,real start time=1659318018157,delay=14
Task work 9: schedule_start_time=1659318019985,real start time=1659318019997,delay=12
Main thread finished
Thread-6 finished
Thread-0 finished
Thread-8 finished
Thread-2 finished
Thread-4 finished

与3个消费者线程的情况差不多。

结论:

  • 最优的消费者线程的个数与任务启动的时间间隔存在这样的关系:单个任务处理时间的最大值/相邻任务的启动时间最小间隔=最优线程数,如果最优线程数是小数,则取整数后加1,比如1.3的话,那么最优线程数应该是2。如果消费者线程数小于此值,则来不及处理到期的任务。如果大于此值,线程太多,在调度、同步上花更多的时间,无益改善性能。

4.2 具有过期时间的缓存

向缓存添加内容时,给每一个key设定过期时间,系统自动将超过过期时间的key清除。该场景中有几个需要注意的地方:

  1. 当向缓存中添加key-value对时,如果这个key在缓存中存在并且还没有过期,需要用这个key对应的新过期时间。
  2. 为了能够让DelayQueue将其已保存的key删除,需要重写实现Delayed接口添加到DelayQueue的DelayedItem的hashCode函数和equals函数。
  3. 当缓存关闭,监控程序也应关闭,因而监控线程应当用守护线程。
package com.kz.example.common.queue;

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 具有过期时间的缓存
 */
public class Cache<K, V> {
    public ConcurrentHashMap<K, V> map = new ConcurrentHashMap<K, V>();
    public DelayQueue<DelayedItem<K>> queue = new DelayQueue<DelayedItem<K>>();

    public void put(K k, V v, long liveTime) {
        V v2 = map.put(k, v);
        DelayedItem<K> tmpItem = new DelayedItem<K>(k, liveTime);
        if (v2 != null) {
            queue.remove(tmpItem);
        }
        queue.put(tmpItem);
    }

    public Cache() {
        Thread t = new Thread() {
            @Override
            public void run() {
                dameonCheckOverdueKey();
            }
        };
        t.setDaemon(true);
        t.start();
    }

    public void dameonCheckOverdueKey() {
        while (true) {
            DelayedItem<K> delayedItem = queue.poll();
            if (delayedItem != null) {
                map.remove(delayedItem.getT());
                System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");
            }
            try {
                Thread.sleep(300);
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }

    /**
     * 测试
     */
    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        int cacheNumber = 10;
        int liveTime = 0;
        Cache<String, Integer> cache = new Cache<String, Integer>();
        for (int i = 0; i < cacheNumber; i++) {
            liveTime = random.nextInt(3000);
            System.out.println(i + "  " + liveTime);
            cache.put(i + "", i, random.nextInt(liveTime));
            if (random.nextInt(cacheNumber) > 7) {
                liveTime = random.nextInt(3000);
                System.out.println(i + "  " + liveTime);
                cache.put(i + "", i, random.nextInt(liveTime));
            }
        }
        Thread.sleep(3000);
        System.out.println();
    }
}

class DelayedItem<T> implements Delayed {
    private T t;
    private long liveTime;
    private long removeTime;

    public DelayedItem(T t, long liveTime) {
        this.setT(t);
        this.liveTime = liveTime;
        this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime();
    }

    @Override
    public int compareTo(Delayed o) {
        if (o == null) return 1;
        if (o == this) return 0;
        if (o instanceof DelayedItem) {
            DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o;
            if (liveTime > tmpDelayedItem.liveTime) {
                return 1;
            } else if (liveTime == tmpDelayedItem.liveTime) {
                return 0;
            } else {
                return -1;
            }
        }
        long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        return diff > 0 ? 1 : diff == 0 ? 0 : -1;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(removeTime - System.nanoTime(), unit);
    }

    public T getT() {
        return t;
    }

    public void setT(T t) {
        this.t = t;
    }

    @Override
    public int hashCode() {
        return t.hashCode();
    }

    @Override
    public boolean equals(Object object) {
        if (object instanceof DelayedItem) {
            return object.hashCode() == hashCode() ? true : false;
        }
        return false;
    }
}

程序执行结果:

0  153
1  429
2  2939
3  2956
4  938
5  402
6  2630
6  704
7  1442
8  2426
9  2020
681925572172900 remove 5 from cache
681925885295400 remove 0 from cache
681926196063000 remove 7 from cache
681926508786900 remove 2 from cache
681926819140900 remove 1 from cache
681927126473500 remove 4 from cache
681927434597400 remove 6 from cache
681927746837200 remove 9 from cache
681928057644200 remove 8 from cache

4.3 多考生考试

模拟一个考试的日子,考试时间为120分钟,30分钟后才可交卷,当时间到了,或学生都交完卷了考试结束。这个场景中有几个点需要注意:

  • 考试时间为120分钟,30分钟后才可交卷,初始化考生完成试卷时间最小应为30分钟
  • 对于能够在120分钟内交卷的考生,如何实现这些考生交卷
  • 对于120分钟内没有完成考试的考生,在120分钟考试时间到后需要让他们强制交卷
  • 在所有的考生都交完卷后,需要将控制线程关闭

实现思想:用DelayQueue存储考生(Student类),每一个考生都有自己的名字和完成试卷的时间,Teacher线程对DelayQueue进行监控,收取完成试卷小于120分钟的学生的试卷。当考试时间120分钟到时,先关闭Teacher线程,然后强制DelayQueue中还存在的考生交卷。每一个考生交卷都会进行一次countDownLatch.countDown(),当countDownLatch.await()不再阻塞说明所有考生都交完卷了,而后结束考试。

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * 多考生考试
 */
public class Exam {
    /**
     *
     *2014-1-10 下午9:43:48 by 孙振超
     *
     *@param args
     *void
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub
        int studentNumber = 20;
        CountDownLatch countDownLatch = new CountDownLatch(studentNumber+1);
        DelayQueue< Student> students = new DelayQueue<Student>();
        Random random = new Random();
        for (int i = 0; i < studentNumber; i++) {
            students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch));
        }
        Thread teacherThread =new Thread(new Teacher(students));
        students.put(new EndExam(students, 120,countDownLatch,teacherThread));
        teacherThread.start();
        countDownLatch.await();
        System.out.println(" 考试时间到,全部交卷!");
    }
}
class Student implements Runnable,Delayed{
    private String name;
    private long workTime;
    private long submitTime;
    private boolean isForce = false;
    private CountDownLatch countDownLatch;
    public Student(){}
    public Student(String name,long workTime,CountDownLatch countDownLatch){
        this.name = name;
        this.workTime = workTime;
        this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime();
        this.countDownLatch = countDownLatch;
    }
    @Override
    public int compareTo(Delayed o) {
        // TODO Auto-generated method stub
        if(o == null || ! (o instanceof Student)) return 1;
        if(o == this) return 0;
        Student s = (Student)o;
        if (this.workTime > s.workTime) {
            return 1;
        }else if (this.workTime == s.workTime) {
            return 0;
        }else {
            return -1;
        }
    }
    @Override
    public long getDelay(TimeUnit unit) {
        // TODO Auto-generated method stub
        return unit.convert(submitTime - System.nanoTime(),  TimeUnit.NANOSECONDS);
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        if (isForce) {
            System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 120分钟" );
        }else {
            System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 "+workTime +" 分钟");
        }
        countDownLatch.countDown();
    }
    public boolean isForce() {
        return isForce;
    }
    public void setForce(boolean isForce) {
        this.isForce = isForce;
    }
}
class EndExam extends Student{
    private DelayQueue<Student> students;
    private CountDownLatch countDownLatch;
    private Thread teacherThread;
    public EndExam(DelayQueue<Student> students, long workTime, CountDownLatch countDownLatch,Thread teacherThread) {
        super("强制收卷", workTime,countDownLatch);
        this.students = students;
        this.countDownLatch = countDownLatch;
        this.teacherThread = teacherThread;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        teacherThread.interrupt();
        Student tmpStudent;
        for (Iterator<Student> iterator2 = students.iterator(); iterator2.hasNext();) {
            tmpStudent = iterator2.next();
            tmpStudent.setForce(true);
            tmpStudent.run();
        }
        countDownLatch.countDown();
    }
}
class Teacher implements Runnable{
    private DelayQueue<Student> students;
    public Teacher(DelayQueue<Student> students){
        this.students = students;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            System.out.println(" test start");
            while(!Thread.interrupted()){
                students.take().run();
            }
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
}

程序执行结果:

student14 交卷, 希望用时34分钟 ,实际用时 34 分钟
student1 交卷, 希望用时44分钟 ,实际用时 44 分钟
student19 交卷, 希望用时49分钟 ,实际用时 49 分钟
student16 交卷, 希望用时50分钟 ,实际用时 50 分钟
student2 交卷, 希望用时61分钟 ,实际用时 61 分钟
student9 交卷, 希望用时77分钟 ,实际用时 77 分钟
student4 交卷, 希望用时80分钟 ,实际用时 80 分钟
student5 交卷, 希望用时83分钟 ,实际用时 83 分钟
student13 交卷, 希望用时83分钟 ,实际用时 83 分钟
student7 交卷, 希望用时94分钟 ,实际用时 94 分钟
student18 交卷, 希望用时99分钟 ,实际用时 99 分钟
student8 交卷, 希望用时100分钟 ,实际用时 100 分钟
student20 交卷, 希望用时107分钟 ,实际用时 107 分钟
student17 交卷, 希望用时112分钟 ,实际用时 112 分钟
student10 交卷, 希望用时123分钟 ,实际用时 120分钟
student11 交卷, 希望用时130分钟 ,实际用时 120分钟
student6 交卷, 希望用时130分钟 ,实际用时 120分钟
student3 交卷, 希望用时146分钟 ,实际用时 120分钟
student12 交卷, 希望用时146分钟 ,实际用时 120分钟
student15 交卷, 希望用时141分钟 ,实际用时 120分钟
 考试时间到,全部交卷!
0

评论区