侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 297 篇文章
  • 累计创建 134 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

java多线程 - 死锁

孔子说JAVA
2019-11-10 / 0 评论 / 0 点赞 / 85 阅读 / 11,292 字 / 正在检测是否收录...

1、前言

尽管锁被持有的时间通常很短,但是作为商业产品的应用程序每天可能要执行数十亿次获取锁->释放锁的操作,只要在这数十亿次操作中只要有一次发生了错误,就可能导致程序中发生死锁,并且即使通过压力测试也不可能找出所有潜在的死锁。

2、死锁(Deadlock)概述

多线程以及多进程改善了系统资源的利用率并提高了系统 的处理能力。然而,并发执行也带来了新的问题——死锁。所谓死锁是指多个线程因竞争资源而造成的一种僵局(互相等待),若无外力作用,这些进程都将无法向前推进。

当一个线程永远地持有一个锁,并且其他线程都尝试去获得这个锁时,那么它们将永远被阻塞,这个我们都知道。如果线程A持有锁L并且想获得锁M,线程B持有锁M并且想获得锁L,那么这两个线程将永远等待下去,这种情况就是最简单的死锁形式。

  • 在数据库系统的设计中考虑了监测死锁以及从死锁中恢复,数据库如果监测到了一组事务发生了死锁时,将选择一个牺牲者并放弃这个事务。Java虚拟机解决死锁问题方面并没有数据库这么强大,当一组Java线程发生死锁时,这两个线程就永远不能再使用了,并且由于两个线程分别持有了两个锁,那么这两段同步代码/代码块也无法再运行了----除非终止并重启应用。

  • 死锁是设计的BUG,问题比较隐晦。不过死锁造成的影响很少会立即显现出来,一个类可能发生死锁,并不意味着每次都会发生死锁,这只是表示有可能。当死锁出现时,往往是在最糟糕的情况----高负载的情况下。

3、死锁产生的原因

产生死锁必须同时满足以下四个条件,只要其中任一条件不成立,死锁就不会发生。

  • 互斥条件
    即某个资源在一段时间内只能由一个进程占有,不能同时被两个或两个以上的进程占有。

  • 不剥夺条件
    进程所获得的资源在未使用完毕之前,资源申请者不能强行地从资源占有者手中夺取资源,而只能由该资源的占有者进程自行释放。

  • 请求和保持条件
    进程至少已经占有一个资源,但又申请新的资源;由于该资源已被另外进程占有,此时该进程阻塞;但是,它在等待新资源之时,仍继续占用已占有的资源。(一个进程因请求被占用资源而发生阻塞时,对已获得的资源保持不放)

  • 环路条件(循环等待条件)
    当发生死锁时,所等待的进程必定会形成一个环路(类似于死循环),造成永久阻塞。

存在一种进程资源的循环等待链,链中每一个进程已获得的资源同时被 链中下一个进程所请求。即存在一个处于等待状态的进程集合{Pl, P2, …, pn},其中Pi等 待的资源被P(i+1)占有(i=0, 1, …, n-1),Pn等待的资源被P0占有,如图2-15所示。

image-1649316463535

造成死锁的原因可以概括成三句话:当前线程拥有其他线程需要的资源,当前线程等待其他线程已拥有的资源,都不放弃自己拥有的资源。

4、死锁分类

4.1 锁顺序死锁(最简单的死锁)

public class LeftRightDeadlock {
    private final Object left = new Object();
    private final Object right = new Object();

    public void leftRight() {
        // 得到left锁
        synchronized (left) {
            Thread.sleep(2000);
            // 得到right锁
            synchronized (right) {
                System.out.println("leftRight end!");
            }
        }
    }

    public void rightLeft() {
        // 得到right锁
        synchronized (right) {
            Thread.sleep(2000);
            // 得到left锁
            synchronized (left) {
                System.out.println("rightLeft end!");
            }
        }
    }
}
// 注意这里一定要有"Thread.sleep(2000)"让线程睡一觉,不然一个线程运行了,另一个线程还没有运行,
// 先运行的线程很有可能就已经连续获得两个锁了。然后写两个线程分别调用它们。

如果我们的线程是交错执行的,那么就很有可能出现以下的情况:

  • 线程A调用leftRight()方法,得到left锁
  • 同时线程B调用rightLeft()方法,得到right锁
  • 线程A和线程B都继续执行,此时线程A需要right锁才能继续往下执行。此时线程B需要left锁才能继续往下执行。
  • 但是:线程A的left锁并没有释放,线程B的right锁也没有释放。
  • 所以他们都只能等待,而这种等待是无期限的–>永久等待–>死锁
// 线程1
public class Thread0 extends Thread{
    private LeftRightDeadlock dl;
   
    public Thread0(LeftRightDeadlock dl){
        this.dl = dl;
    }
    
    public void run(){
        try{
            dl.leftRight();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// 线程2
public class Thread1 extends Thread{
    private LeftRightDeadlock dl;
   
    public Thread0(LeftRightDeadlock dl){
        this.dl = dl;
    }
    
    public void run(){
        try{
            dl.rightLeft();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// 写个main函数调用一下
public static void main(String[] args) {
    LeftRightDeadlock dl = new LeftRightDeadlock ();
    Thread0 t0 = new Thread0(dl);
    Thread1 t1 = new Thread1(dl);
    t0.start();
    t1.start();

    while(true);   
}

至于结果,没有结果,什么语句都不会打印,因为死锁了。

4.2 动态锁顺序死锁

// 转账
public static void transferMoney(Account fromAccount,Account toAccount,
          DollarAmount amount) throws InsufficientFundsException {
   // 锁定汇账账户
   synchronized (fromAccount) {
        // 锁定来账账户
        synchronized (toAccount) {
           // 判余额是否大于0
           if (fromAccount.getBalance().compareTo(amount) < 0) {
               throw new InsufficientFundsException();
           } else {
               // 汇账账户减钱
               fromAccount.debit(amount);
               // 来账账户增钱
               toAccount.credit(amount);
           }
        }
    }
}

上面的代码看起来是没有问题的:锁定两个账户来判断余额是否充足才进行转账!但是,同样有可能会发生死锁:

  • 如果两个线程同时调用transferMoney()
  • 线程A从X账户向Y账户转账
  • 线程B从账户Y向账户X转账
  • 那么就会发生死锁。
A:transferMoney(myAccount,yourAccount,10);

B:transferMoney(yourAccount,myAccount,20);

4.3 协作对象之间发生死锁

public class CooperatingDeadlock {
    // Warning: deadlock-prone!
    class Taxi {
        @GuardedBy("this") private Point location, destination;
        private final Dispatcher dispatcher;

        public Taxi(Dispatcher dispatcher) {
            this.dispatcher = dispatcher;
        }

        public synchronized Point getLocation() {
            return location;
        }

        // setLocation 需要Taxi内置锁
        public synchronized void setLocation(Point location) {
            this.location = location;
            if (location.equals(destination))
                // 调用notifyAvailable()需要Dispatcher内置锁
                dispatcher.notifyAvailable(this);
        }

        public synchronized Point getDestination() {
            return destination;
        }

        public synchronized void setDestination(Point destination) {
            this.destination = destination;
        }
    }

    class Dispatcher {
        @GuardedBy("this") private final Set<Taxi> taxis;
        @GuardedBy("this") private final Set<Taxi> availableTaxis;

        public Dispatcher() {
            taxis = new HashSet<Taxi>();
            availableTaxis = new HashSet<Taxi>();
        }

        public synchronized void notifyAvailable(Taxi taxi) {
            availableTaxis.add(taxi);
        }

        // 调用getImage()需要Dispatcher内置锁
        public synchronized Image getImage() {
            Image image = new Image();
            for (Taxi t : taxis)
                // 调用getLocation()需要Taxi内置锁
                image.drawMarker(t.getLocation());
            return image;
        }
    }

    class Image {
        public void drawMarker(Point p) {
        }
    }
}
  • 上面的getImage()和setLocation(Point location)都需要获取两个锁的, 并且在操作途中是没有释放锁的。
  • 这就是 隐式获取两个锁 (对象之间协作)…这种方式也很容易就造成死锁。

5、处理死锁

死锁预防、死锁避免、死锁检测、死锁恢复。

5.1 死锁预防

死锁的预防是保证系统不进入死锁状态的一种策略。它的基本思想是要求进程申请资源时遵循某种协议,从而打破产生死锁的四个必要条件中的一个或几个,保证系统不会进入死锁状态。

  • 破坏“请求和保持”条件
    即允许进程同时访问某些资源。但是,有的资源是不允许被同时访问的,像打印机等等,这是由资源本身的属性所决定的。所以,这种办法并无实用价值。

  • 破坏“不剥夺”条件
    即允许进程强行从占有者那里夺取某些资源。

  • 破坏“环路等待”条件
    可以实行资源预先分配策略。

5.2 死锁避免

1)加锁顺序:

当多个线程需要相同的一些锁,但是按照不同的顺序加锁,死锁就很容易发生。如果能确保所有的线程都是按照相同的顺序获得锁,那么死锁就不会发生。当然这种方式需要你事先知道所有可能会用到的锁,然而总有些时候是无法预知的。

2)加锁时限:

加上一个超时时间,若一个线程没有在给定的时限内成功获得所有需要的锁,则会进行回退并释放所有已经获得的锁,然后等待一段随机的时间再重试。但是如果有非常多的线程同一时间去竞争同一批资源,就算有超时和回退机制,还是可能会导致这些线程重复地尝试但却始终得不到锁。

3)死锁检测:

死锁检测即每当一个线程获得了锁,会在线程和锁相关的数据结构中(map、graph等等)将其记下。除此之外,每当有线程请求锁,也需要记录在这个数据结构中。死锁检测是一个更好的死锁预防机制,它主要是针对那些不可能实现按序加锁并且锁超时也不可行的场景。

5.2.1 固定锁顺序避免死锁

上面transferMoney()发生死锁的原因是因为 加锁顺序不一致 而出现的,上面的例子我们就可以改造成这样子:

public class InduceLockOrder {

    // 额外的锁、避免两个对象hash值相等的情况(即使很少)
    private static final Object tieLock = new Object();

    public void transferMoney(final Account fromAcct,
                              final Account toAcct,
                              final DollarAmount amount)
            throws InsufficientFundsException {
        class Helper {
            public void transfer() throws InsufficientFundsException {
                if (fromAcct.getBalance().compareTo(amount) < 0)
                    throw new InsufficientFundsException();
                else {
                    fromAcct.debit(amount);
                    toAcct.credit(amount);
                }
            }
        }
        // 得到锁的hash值
        int fromHash = System.identityHashCode(fromAcct);
        int toHash = System.identityHashCode(toAcct);

        // 根据hash值来上锁
        if (fromHash < toHash) {
            synchronized (fromAcct) {
                synchronized (toAcct) {
                    new Helper().transfer();
                }
            }

        } else if (fromHash > toHash) {// 根据hash值来上锁
            synchronized (toAcct) {
                synchronized (fromAcct) {
                    new Helper().transfer();
                }
            }
        } else {// 额外的锁、避免两个对象hash值相等的情况(即使很少)
            synchronized (tieLock) {
                synchronized (fromAcct) {
                    synchronized (toAcct) {
                        new Helper().transfer();
                    }
                }
            }
        }
    }
}

得到对应的hash值来固定加锁的顺序,这样我们就不会发生死锁的问题了!

5.2.2 开放调用避免死锁

在协作对象之间发生死锁的例子中,主要是因为在调用某个方法时就需要持有锁,并且在方法内部也调用了其他带锁的方法!

  • 如果在调用某个方法时不需要持有锁,那么这种调用被称为开放调用!
  • 同步代码块最好仅被用于保护那些涉及共享状态的操作!

class CooperatingNoDeadlock {
@ThreadSafe
class Taxi {
@GuardedBy(“this”) private Point location, destination;
private final Dispatcher dispatcher;

    public Taxi(Dispatcher dispatcher) {
        this.dispatcher = dispatcher;
    }

    public synchronized Point getLocation() {
        return location;
    }

    public synchronized void setLocation(Point location) {
        boolean reachedDestination;

        // 加Taxi内置锁
        synchronized (this) {
            this.location = location;
            reachedDestination = location.equals(destination);
        }
        // 执行同步代码块后完毕,释放锁

        if (reachedDestination)
            // 加Dispatcher内置锁
            dispatcher.notifyAvailable(this);
    }

    public synchronized Point getDestination() {
        return destination;
    }

    public synchronized void setDestination(Point destination) {
        this.destination = destination;
    }
}

@ThreadSafe
class Dispatcher {
    @GuardedBy("this") private final Set<Taxi> taxis;
    @GuardedBy("this") private final Set<Taxi> availableTaxis;

    public Dispatcher() {
        taxis = new HashSet<Taxi>();
        availableTaxis = new HashSet<Taxi>();
    }

    public synchronized void notifyAvailable(Taxi taxi) {
        availableTaxis.add(taxi);
    }

    public Image getImage() {
        Set<Taxi> copy;

        // Dispatcher内置锁
        synchronized (this) {
            copy = new HashSet<Taxi>(taxis);
        }
        // 执行同步代码块后完毕,释放锁

        Image image = new Image();
        for (Taxi t : copy)
            // 加Taix内置锁
            image.drawMarker(t.getLocation());
        return image;
    }
}

class Image {
    public void drawMarker(Point p) {
    }
}

}

5.2.3 使用定时锁

使用显式Lock锁,在获取锁时使用tryLock()方法。当等待超过时限的时候,tryLock()不会一直等待,而是返回错误信息。使用tryLock()能够有效避免死锁问题。

5.3 死锁检测

死锁检测是一个更好的死锁预防机制,它主要是针对那些不可能实现按序加锁并且锁超时也不可行的场景。

  • 每当一个线程获得了锁,会在线程和锁相关的数据结构中(map、graph等等)将其记下。除此之外,每当有线程请求锁,也需要记录在这个数据结构中。

  • 当一个线程请求锁失败时,这个线程可以遍历锁的关系图看看是否有死锁发生。例如,线程A请求锁7,但是锁7这个时候被线程B持有,这时线程A就可以检查一下线程B是否已经请求了线程A当前所持有的锁。如果线程B确实有这样的请求,那么就是发生了死锁(线程A拥有锁1,请求锁7;线程B拥有锁7,请求锁1)。

  • 当然,死锁一般要比两个线程互相持有对方的锁这种情况要复杂的多。线程A等待线程B,线程B等待线程C,线程C等待线程D,线程D又在等待线程A。线程A为了检测死锁,它需要递进地检测所有被B请求的锁。从线程B所请求的锁开始,线程A找到了线程C,然后又找到了线程D,发现线程D请求的锁被线程A自己持有着。这是它就知道发生了死锁。

下面是一幅关于四个线程(A,B,C和D)之间锁占有和请求的关系图。像这样的数据结构就可以被用来检测死锁。

image-1649316981388

那么当检测出死锁时,这些线程该做些什么呢?

  • 一个可行的做法是释放所有锁,回退,并且等待一段随机的时间后重试。这个和简单的加锁超时类似,不一样的是只有死锁已经发生了才回退,而不会是因为加锁的请求超时了。虽然有回退和等待,但是如果有大量的线程竞争同一批锁,它们还是会重复地死锁(编者注:原因同超时类似,不能从根本上减轻竞争)。

  • 一个更好的方案是给这些线程设置优先级,让一个(或几个)线程回退,剩下的线程就像没发生死锁一样继续保持着它们需要的锁。如果赋予这些线程的优先级是固定不变的,同一批线程总是会拥有更高的优先级。为避免这个问题,可以在死锁发生的时候设置随机的优先级。

5.4 死锁定位

虽然造成死锁的原因是因为我们设计得不够好,但是可能写代码的时候不知道哪里发生了死锁。JDK提供了两种方式来给我们检测:

5.4.1 Jconsole

JDK自带的图形化界面工具,可以通过打开cmd然后输入jconsole打开。

image-1649317022256

连接到需要查看的进程。

image-1649317029622

打开线程选项卡,然后点击左下角的“检测死锁” 。

image-1649317036251

jconsole就会给我们检测出该线程中造成死锁的线程,点击选中即可查看详情:

image-1649317051587

image-1649317059161

从上图中我们可以看出:

  • 在线程Thread-1中,从状态可以看出,它想申请java.lang.Object@35b4e829这个资源,但是这个资源已经被Thread-0拥有了,所以就堵塞了。

  • 在线程Thread-0中,从状态可以看出,它想申请java.lang.Object@2db8dc9这个资源,但是这个资源已经被Thread-1拥有了,所以就堵塞了。

  • Thread-1一直等待java.lang.Object@35b4e829资源,而Thread–0一直等待java.lang.Object@2db8dc9资源,于是这两个线程就这么僵持了下去,造成了死锁。

5.4.2 Jstack

Jstack是JDK自带的命令行工具,主要用于线程Dump分析。

-----(1)windows下分析过程-----

1、我们先用jps获得当前Java虚拟机进程的pid(或者Linux的ps命令)

image-1649317134712

2、jstack打印堆栈。jstack打印内容的最后其实已经报告发现了一个死锁,但因为我们是分析死锁产生的原因,而不是直接得到这里有一个死锁的结论,所以别管它,就看前面的部分。

image-1649317142444

先说明介绍一下每一部分的意思,以"Thread-1"为例:

  • (1)"Thread-1"表示线程名称

  • (2)"prio=6"表示线程优先级

  • (3)"tid=00000000497cec00"表示线程Id

  • (4)nid=0x219c

    • 线程对应的本地线程Id,这个重点说明下。因为Java线程是依附于Java虚拟机中的本地线程来运行的,实际上是本地线程在执行Java线程代码,只有本地线程才是真正的线程实体。Java代码中创建一个thread,虚拟机在运行期就会创建一个对应的本地线程,而这个本地线程才是真正的线程实体。Linux环境下可以使用"top -H -p JVM进程Id"来查看JVM进程下的本地线程(也被称作LWP)信息,注意这个本地线程是用十进制表示的,nid是用16进制表示的,转换一下就好了,0x219c对应的本地线程Id应该是8604。
  • (5)"[0x000000004a3bf000…0x000000004a3bf790]"表示线程占用的内存地址

  • (6)"java.lang.Thread.State:BLOCKED"表示线程的状态

解释完了每一部分的意思,看下Thread-1处于BLOCKED状态,Thread-0处于BLOCKED状态。对这两个线程分析一下:

  • (1)Thread-1获得了锁0x000000003416a4e8,在等待锁0x000000003416a4d8
  • (2)Thread-0获得了锁0x000000003416a4d8,在等待锁0x000000003416a4e8

由于两个线程都在等待获取对方持有的锁,所以就这么永久等待下去了。

3、注意一下使用Eclipse/MyEclipse, 这段程序如果不点击控制台上面的红色方框去Terminate掉它,而是右键->Run As->1 Java Application的话,这个进程会一直存在的,这时候可以利用taskkill命令去终止没有被Terminate的进程:

image-1649317263961

-----(2)linux下分析过程-----

1.我们先用ps来查看java进程pid

image-1649317373953

2.看一下jstack的使用

image-1649317385774

3.jstack输出线程dump信息到文件

image-1649317394313

  • 用比较工具查看带-l和不带-l的区别如下:

image-1649317410035

4.查看dump文件,然后进行分析

image-1649317417701

其中有一行是at DeadThread.run(DeadThread.java:37),说明Thread-1实在DeadThread类的37行处发生死锁,其中at DeadThread.run(DeadThread.java:21),说明Thread-0是在DeadThread类的21行处发生死锁。详细的jstack dump文件分析请参看:jstack和线程dump分析

public class DeadThread implements Runnable {

    public String username;
    public Object lock1 = new Object();
    public Object lock2 = new Object();

    @Override
    public void run() {
        // TODO Auto-generated method stub  
        if (username.equals("a")) {
            synchronized (lock1) {
                try {
                    System.out.println("username = " + username);
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(3000);
                } catch (Exception e) {
                    // TODO: handle exception  
                    e.printStackTrace();
                }
                synchronized (lock2) {
                    System.out.println("按lock1->lock2的顺序执行代码");
                }
            }
        }
        if (username.equals("b")) {
            synchronized (lock2) {
                try {
                    System.out.println("username = " + username);
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(3000);

                } catch (Exception e) {
                    // TODO: handle exception  
                    e.printStackTrace();
                }
                synchronized (lock1) {
                    System.out.println("按lock2->lock1顺序执行代码");
                }
            }

        }
    }

    public void setFlag(String username) {
        this.username = username;
    }
    
    public static void main(String[] args) {

        DeadThread dt1 = new DeadThread();
        dt1.setFlag("a");
        Thread t1 = new Thread(dt1);
        t1.start();

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        dt1.setFlag("b");
        Thread t2 = new Thread(dt1);

        t2.start();
    }
}
  • 从而定位到死锁发生的原因,及具体位置:Thread-0获得了锁lock1,接下来期望获得锁lock2,(第20行),但是此时Thread-1获得了锁lock2,接下来期望获得锁lock2,(第37行),因而发生了死锁。

6、总结

发生死锁的原因主要由于:

  • 线程之间交错执行
    • 解决:以固定的顺序加锁。
  • 执行某方法时就需要持有锁,且不释放
    • 解决:缩减同步代码块范围,最好仅操作共享变量时才加锁。
  • 永久等待
    • 解决:使用tryLock()定时锁,超过时限则返回错误信息。
0

评论区