基于Pipeline模式的架构设计

Pipeline 模式也称为流水线模式。将一个任务处理分解为若干个处理阶段,其中每个处理阶段的输出作为下一个处理阶段的输入,并且各个处理阶段都有相应的工作者线程去执行相应的计算。

hw5 需求分析

模拟多线程实时电梯系统。系统从标准输入中输入请求信息,程序进行接收和处理,模拟电梯运行,将必要的运行信息通过输出接口进行输出。 具体而言,本次作业电梯系统具有的功能为:上下行,开关门,以及模拟乘客的进出。

输入输出定义:

​ 输入:[时间戳]ID-FROM-x-TO-y:乘客从x层到y层

​ 输出:[时间戳]电梯ID-动作

​ 电梯动作:开门、关门、到达楼层

​ 乘客动作:进入、离开电梯

电梯:

​ 电梯各项参数静态不可变(运行速度、开关门耗时、限乘人数、楼层数、初始位置)

​ 6部电梯

  1. 线程的设计。系统获取请求,由调度器进行分配,电梯处理请求。考虑请求输入装置、电梯和调度器是线程的Pipeline 模式的架构设计,以及如何实现线程交互。
  2. 电梯调度策略。电梯调度策略是指把请求分配各电梯时采取的策略,需要综合考虑电梯总运行时间、耗电量和最长等待时间等性能优化。
  3. 电梯运行策略。电梯运行策略是指电梯处理自己的请求队列采取的策略。

hw5 整体架构

线程设计

本设计基于 Pipeline 模式,线程有输入装置、调度器、电梯三类。输入线程和调度器线程共享请求队列。调度器线程和电梯线程共享电梯内等待队列。如图:

image-20230415150238302

输入线程:输入线程从标准输入中获取请求,存入请求队列。

调度器线程:调度器从请求队列获取线程,根据调度策略将请求放入电梯内的等待队列。

电梯线程:电梯线程模拟电梯运行,处理电梯内等待队列的请求。

请求队列:内有 requests 队列和 end属性(用于线程终止判断)。

电梯内等待对列:内有 requests 队列和 end属性(用于线程终止判断)。除了存储获取请求外,还有一些配合电梯运行的处理。

线程终止

线程应该在什么时候终止是一个必须要解决的问题,否则线程一直无法终止会引发 RTLE 问题。这一部分内容有学长推荐看《图解Java多线程设计模式》中的 Two Phase Termination 这一章。我主要是通过实验代码理解。当输入结束并且请求队列为空,那么调度器线程结束;调度线程结束并且电梯处理完分配到的请求,那么电梯线程结束。在介绍每个线程的结束条件之前,我们先来看看线程是如何结束的。run 方式是线程的入口,也是线程结束的位置。

public class InputThread extends Thread {
public void run() {
while (true) {
Request request = elevatorInput.nextRequest();
if (request == null) {
waitQueue.setEnd(true);
break;
}
// do something
}
}
}

public class Schedule extends Thread {
public void run() {
while (true) {
if (waitQueue.isEmpty() && waitQueue.getEnd()) {
elevators.setEnd();
break;
}
// do something
}
}
}

public class Elevator extends Thread {
public void run() {
while (true) {
if (curInRequests.isEmpty() && requestQueue.isEmpty() && requestQueue.getEnd()) {
break;
}
// do something
}
}
}

电梯调度策略

电梯调度有很多策略:

(1) 模拟电梯保证局部最优解的策略,在输入线程获得一个请求时,深克隆电梯类进行模拟从而将请求分配各所花费时间最少的电梯。

(2) 较为均衡的分配方式(第 i 位乘客分配给第 (i % 6) 座电梯),当输入请求数量较多的时候,性能上也还不错。

(3) 随机分配方式 random,其实和前者策略差不多。

(4) 自由竞争策略,所有电梯共用输入线程的请求队列,当输入一个请求时,所有电梯都努力去获取这个请求,这种策略在时间性能上占据优势,但是电量消耗会比较高。

我实现了电梯模拟策略,也就是往届所说的影子电梯。其主要思想为:对于每一个待分配的请求,“深克隆”电梯此时的状态,将请求放入电梯的等待队列,进行模拟(将上下行开门的 sleep(400) 改为 time += 400),选择最早结束的电梯接收这个请求。和周围同学比较发现这种策略在运行时间和耗电量都比较优。

电梯运行策略

我采用了 look 算法:当电梯在往上走时,如果遇到向上的请求就捎带上,当电梯内没有乘客并且电梯前方没有等待的请求时就换方向。当到达某一楼层时,如果电梯未满并且请求与电梯运行方向一致时,就开门捎带上请求。

String nextState = strategy.getStrategy(floor, direction, fullNum, curInRequests, requestQueue); // strategy 获取电梯下一状态
boolean isEnd = false;
switch (nextState) {
case "open":
openAndClose();
preTime = System.currentTimeMillis();
break;
case "keep":
if (direction) { up(); }
else { down(); }
preTime = System.currentTimeMillis();
break;
case "reverse":
direction = !direction;
break;
case "wait":
direction = true;
requestQueue.waitRequest();
break;
case "end":
isEnd = true;
break;
default:
}

性能优化

往届学长提出了“量子电梯”的概念:

当电梯在等待时,电梯的请求队列获得了请求,电梯开始运动:

[time1] ARRIVE floor1
[time2] begin wait
[time3] end wait
[time4] ARRIVE floor2

按照一般同学们的处理,有 time4=time3+0.4time4 = time3 + 0.4

然而我们可以利用 wait 的时间,因为我们仅需保证 time4 - time1 > 0.4

具体实现就是记录电梯最后一次关门、上下行的时间,然后:

public synchronized void up() {
long currentTime = System.currentTimeMillis();
if (currentTime - preTime < speed) {
try {
sleep(speed - currentTime + preTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
floor++;
TimableOutput.println(String.format("ARRIVE-%d-%d", floor, id));
}

hw6 & hw7 需求分析

题目新增要求

  • 楼层的特殊性(电梯停靠服务数量限制)

  • 电梯的特殊性(电梯能够到达的楼层、电梯动态维护和新增)

  • 调度的特殊性(换乘、捎带、队列平衡、等待时间平衡)

分析:

  • 新增或维护电梯后,考虑旧电梯内原有的乘客请求如何处理。
  • 线程的终止条件发生变化。
  • 考虑如何限制每层楼电梯停靠的服务数量。
  • 由于hw7出现换乘,需要重新规划电梯调度策略。

hw6 & hw7 架构的迭代变化

类图

image-20230415153226797

MainClass:初始化实例和开启线程。

InputThread:获取输入的请求

RequestQueue:总乘客队列。

Schedule:根据电梯调度策略,将乘客请求分配给电梯内部等待队列。

Elevator:模拟电梯运行过程。

Strategy:得到电梯下一状态。

InternalQueue:电梯内等待队列。

Person:乘客请求。

Floor:限制楼层服务电梯数量。

Counter:计数器,判断乘客请求是否都处理完,用于线程终止判断。

电梯的新增和维护

新增 ElevatorQueue 类,管理电梯的新增和维护。

对于维护操作:Elevator 类内新增 maintain 属性。当输入线程获取到维护请求时,将响应电梯的 maintain 属性标记为 true 。当电梯线程访问 maintain 为 true 时,开门放下乘客,将所有未完成的请求返回总队列,被维护的电梯结束线程。

image-20230415153047175

在这里可以进行一些优化:

  • 新增电梯时,电梯队列状态发生改变,之前的调度已经不是最优解。比如说当请求已被分配完毕时,新增一部电梯,该电梯并不会运作。所以可以考虑将电梯内等待队列所有请求返回总队列进行重新调度。
  • 维护电梯时,若电梯线程在开门状态中被通知maintain,直接放人进行维护。

线程的终止条件

在 hw5 中线程终止条件为:hw5:输入线程结束-(请求队列为空)->调度线程结束-(电梯内请求为空)->电梯线程结束。但是由于电梯可能会把请求返回给请求队列,就会产生问题:当某台电梯被维护将请求返回请求队列但电梯线程均已结束时,没有电梯可以响应请求。我们可以进一步约束线程终止条件:当输入线程结束并且所有请求都被处理完成后结束调度和电梯线程。此处判断所有请求是否处理完成可以设置一个单例模式的计数器。以下为主要进行的修改:

public class Schedule extends Thread {
public void run() {
while (true) {
if (waitQueue.isEmpty() && waitQueue.getEnd()) {
try {
sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (Counter.equalZero()) {
elevators.setEnd();
break;
}
}
// do something
}
}
}

楼层停靠服务中电梯数限制

该部分可以参考exp4中的信号量。信号量(Semaphore)本质上就是一个比较特殊的整数,代表了某种可用资源的剩余数量。通过信号量的加减,各线程可以申请和释放可用资源,当没有可用资源可以申请时(此时信号量为0),线程将挂起,直到别的线程释放了该信号量对应的资源。

public synchronized void openAndClose(boolean receive) {
if (receive) { floorHashMap.get(floor).acquireReceive(); } //
else { floorHashMap.get(floor).acquireServe(); } // 开门前获取
TimableOutput.println(String.format("OPEN-%d-%d", floor, id));
putOffRequests();
try {
sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
putOnRequests();
TimableOutput.println(String.format("CLOSE-%d-%d", floor, id));
if (receive) { floorHashMap.get(floor).releaseReceive(); } //
else { floorHashMap.get(floor).releaseServe(); } // 开门后释放
}

电梯调度策略

换乘路径搜索

  • 深度优先搜索,搜到一条可达路径,就更新请求路径,把请求放入相应电梯。
  • 广度优先搜索,搜索满足最少换乘数的可达路径。
  • 加权最短路,考虑换乘数、电梯已有的请求数、经过的楼层数等因素进行合适的权重分配,求解加权最短路。

路径更新

  • 静态路径更新:对于每个新请求,在调度时记录完整路径,仅当路径中电梯被维护时重新规划。
  • 动态路径更新:每次路径规划仅记录 nextToFloor,当请求下电梯但未到达目的地时,就放回总队列重新调度。

架构中的变与不变

image-20230415154802723

在迭代中可以发现,整体架构并没有发生改变,电梯运行也没有发生改变。由于新增电梯操作而新增了电梯队列类并对电梯队列进行维护,线程终止条件也由于请求可能需要返回请求队列重新分配而进一步约束。调度算法发生改变。楼层停靠服务电梯数的限制需要新增 Floor 类进行管理。

如何让频繁变化的设计稳定下来

  1. 提高程序架构的可扩展性

    • 设计能够面向未来扩展
    • eg. 电梯可动态变化,电梯参数可变,设置电梯队列类。
  2. 分层架构

    • 分层架构的核心是隔离,将不同职责的对象划分到不同的层中实现。
    • 输入装置 | 调度器 请求队列 电梯队列 | 电梯 电梯内等待队列
  3. 遵循单一职责原则

    • 每个模块只负责自己的事情,而不是变成万能的。
    • eg. 输入线程获取到维护电梯请求时,仅将电梯维护属性设为 true和从电梯队列中删除,由电梯线程自行维护,而非直接处理电梯请求。
  4. 高内聚低耦合设计

    • 内聚是从功能角度来度量模块内的联系;耦合是软件结构中各模块之间相互连接的一种度量。高内聚低耦合设计可以有效提高代码的可读性和可维护性,降低软件复杂度。

      • 简化接口设计

        • 只对外暴露最小限度的接口。

        • 把简单留给别人,把复杂留给自己。

        • public class Schedule {
              private final ElevatorQueue elevators;
              public void run() {
                  ...
                  Person person = requestQueue.getPerson();
                  Integer elevatorId = getPersonPath(person);
                  elevators.get(elevatorId).addPerson(person);
              }
              private int getPersonPath(Person person) {
                  // 加权最短路求解路径
                  // 更新 person 的 nextToFloor
                  // 返回 person 需乘坐的电梯 ID
              }
          }
          

          - 隐藏实现细节

          - 只给调用者暴露重要的信息,把不重要的细节隐藏起来。

          - ```java
          public class InternalQueue() {
              // 方案一
              public synchronized Vector<Person> getPeoPle(
                  int floor, boolean direction, int resCapacity) {
                  // 返回能上电梯的乘客,并将其从等待队列删除
              }
              // 方案二
              public Vector<Person> getPeoPle() {
                  // 返回整个等待队列
              }
              // 电梯调用该方法获取等待队列,选择能上电梯的乘客,并将其从等待队列删除
          }

线程的互斥与协作

线程安全

线程安全(thread-safe)是关于对象是否能被多个线程安全访问的特性。如果一个对象是线程安全的,则无论多个线程以什么样的交叠次序来访问都不会影响该对象的行为结果。

如何会导致线程不安全:

  • 读写冲突
    • check-then-act
    • read-modify-write
  • 写写冲突(覆盖)

线程安全的设计考虑:

  • 使用不可变对象
  • 使用可变对象
    • 操作的原子性
    • 共享对象要始终处于严密控制之下
    • 设置范围适当的临界区(临界区最小化)
    • 使用合适的监控器

此外,对象共享是产生线程安全问题的根本原因。在思考某一部分代码是否会产生线程安全问题时,可以思考其是否会被多个线程访问,线程访问的读写关系等。

线程互斥

一个实例中的 synchronized 方法每次只能由一个线程运行,对非 synchronized 方法没有影响。

每个实例拥有独立的锁。

只在共享对象中设置同步块,仅使用同步块中的共享对象来守护该同步块。

// synchronized 代码块
synchronized(expr) { // expr 为获取锁的实例
...
}

// synchronized 实例方法和 synchronized 代码块
synchronized void method { // 使用this的锁来执行线程的互斥处理
...
}
void method {
synchronized (this) {
...
}
}

由上文线程安全的内容可知,当对象会被多个线程读写时需要进行互斥操作。在我的设计中,线程互斥主要在请求队列和电梯内的等待队列这两个共享对象实现,主要是通过 synchronized 方法。

线程协作

wait, notify, notifyAll 都是 java.lang.Object 类的方法。

每个实例都拥有一个等待队列,放置在执行实例的 wait 方法后停止操作的线程。

若要执行 wait, notify, notifyAll 操作,线程必须持有锁。如果线程进入等待队列,便会释放其实例的锁。

notifyall 唤醒在该实例中等待的线程时,并不是立即让锁,而是先执行完当前方法中的语句。

obj.wait(); // 线程正在obj上等待
wait; // 线程正在this上等待
this.wait(); // 线程正在this上等待

obj.notifyAll(); // 在obj实例的等待队列中休眠的所有线程都会被唤醒
notifyAll(); // 在this实例的等待队列中休眠的所有线程都会被唤醒
this.notifyAll(); // 在this实例的等待队列中休眠的所有线程都会被唤醒

当某个线程卡在临界区或一直获取不到资源时,会发生轮询,会比较耗费CPU。当你提交时如果遇到 CTLE 可以考虑代码中是否有轮询。比如说当 Schedule 想要获取乘客请求时,waitQueue 为空,此时我们可以让该线程进入 waitQueue 的等待队列,也就是让它 wait 一下。当 waitQueue 非空(即输入线程把乘客请求给了 waitQueue)时,唤醒 waitQueue 的等待队列 (notifyAll)。该部分可以参考 Producer-Consumer 模式或 Guard Suspension 模式。

复杂度分析

class OCavg OCmax WMC
Cell 1.0 1.0 11.0
Compare 2.0 2.0 2.0
Counter 1.0 1.0 3.0
Elevator 2.7142857142857144 13.0 38.0
ElevatorQueue 2.1666666666666665 4.0 13.0
Floor 1.0 1.0 5.0
InputThread 3.5 6.0 7.0
MainClass 1.0 1.0 1.0
Person 1.0 1.0 8.0
RequestQueue 2.090909090909091 5.0 23.0
Schedule 6.666666666666667 14.0 20.0
Strategy 4.333333333333333 8.0 13.0
WaitQueue 1.5 4.0 12.0
Total 156.0
Average 2.0526315789473686 4.6923076923076925 12.0

Elevator 需要向 Strategy 类传入电梯参数从而获得电梯下一刻状态,耦合度比较高,并且电梯的状态更新比较多导致run 方法比较复杂。Schedule 类需要通过 dijkstra 算法求解最短路径并更新乘客请求这里复杂度比较高。

多线程程序的 debug 方式

  • 利用调试功能。可以根据输出大致判断出bug触发地,据此选定断点。比如说电梯门开了之后乘客却没有进来,此时就将断点设置在开门的方法。
  • 利用 printf 输出。可以输出电梯状态改变、请求状态改变(进入等待队列等信息)、线程状态改变以及观察时间戳等判断。
  • 有同学跟我分享了一种方法:设置一个 Debug 类,通过 stderr 抛出异常,这样也不会影响输出检测。

程序中出现的 bug

  • 线程的互斥。电梯队列由输入线程管理,会被调度线程分配请求时访问。一开始没有对其进行互斥处理。

心得体会

线程安全

处理好线程安全的关键时判断清楚对象是否会被多个线程访问,然后进行互斥操作。在本次作业中我主要采用 synchronized 进行互斥,并没有用到读写锁,因为其对性能影响不大,就没有改变原有设计。

层次化设计

层次清晰的架构可以更容易理清每个模块的职责,易于实现高内聚低耦合。那么当需要迭代更新时就可以方便的将要求拆分到具体的层次中处理。