Java 多线程:锁(二)

Java 多线程:锁(二)

作者:Grey

原文地址:

博客园:Java 多线程:锁(二)

CSDN:Java 多线程:锁(二)

AtomicLong VS LongAddr VS Synchronized

需要实际测试一下。

示例代码见:

package git.snippets.juc;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * 对比AddByAdder, AddByAtomic, AddBySync几个程序,在数据量比较大的情况下,AddByAdder的效率最高
 */
public class AddWays {
    public static void main(String[] args) {
        addBySync();
        addByAtomicLong();
        addByLongAdder();
    }

    // 使用AtomicLong
    public static void addByAtomicLong() {
        AtomicLong count = new AtomicLong(0);
        Thread[] all = new Thread[1000];
        AddWays t = new AddWays();
        for (int i = 0; i < all.length; i++) {
            all[i] = new Thread(() -> {
                for (int j = 0; j < 1000000; j++) {
                    count.incrementAndGet();
                }
            });
        }
        long start = System.currentTimeMillis();
        for (Thread thread : all) {
            thread.start();
        }
        for (Thread thread : all) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("result is " + count.get() + " time is " + (end - start) + "ms (by AtomicLong)");

    }

    // 使用LongAdder
    public static void addByLongAdder() {
        Thread[] all = new Thread[1000];
        LongAdder count = new LongAdder();
        for (int i = 0; i < all.length; i++) {
            all[i] = new Thread(() -> {
                for (int j = 0; j < 1000000; j++) {
                    count.increment();
                }
            });
        }
        long start = System.currentTimeMillis();
        for (Thread thread : all) {
            thread.start();
        }
        for (Thread thread : all) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("result is " + count + " time is " + (end - start) + "ms (by LongAdder)");

    }

    static long count = 0;

    public static void addBySync() {


        Thread[] all = new Thread[1000];
        Object o = new Object();
        for (int i = 0; i < all.length; i++) {
            all[i] = new Thread(() -> {
                for (int j = 0; j < 1000000; j++) {
                    synchronized (o) {
                        count++;
                    }
                }
            });
        }
        long start = System.currentTimeMillis();
        for (Thread thread : all) {
            thread.start();
        }
        for (Thread thread : all) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("result is " + count + " time is " + (end - start) + "ms (by synchronized)");

    }
}

Java SE 11 下,运行得到的执行结果是:

result is 1000000000 time is 10035ms (by synchronized)
result is 1000000000 time is 15818ms (by AtomicLong)
result is 1000000000 time is 963ms (by LongAdder)

可以看到,在大数据量的情况下,LongAdder 的效率最高。关于 LongAdder 的一些说明,参考如下两篇博客:

ReentrantLock

其中“ReentrantReadWriteLock”,“读锁的插队策略”,”锁的升降级” 部分参考了如下文档中的内容

Java中的共享锁和排他锁(以读写锁ReentrantReadWriteLock为例)

ReentrantLock vs sychronized

ReentrantLock是可重入锁,可以替代sychronizedReentrantLocksychronized的区别在于:

ReentrantLock可以tryLock,尝试若干时间片内获取锁。

代码如下:

package git.snippets.juc;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTryLock {
    ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        ReentrantLockTryLock t = new ReentrantLockTryLock();
        new Thread(t::m).start();
        try {
            SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 由于前一个线程先执行m1,锁定this,所以只能等前一个线程执行完毕后才能执行下面线程的操作
        new Thread(t::m2).start();

    }

    void m() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                try {
                    SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
                if (i == 2) {
                    m2();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void m2() {
        boolean locked = false;
        try {
            // 在1s内尝试获取锁
            locked = lock.tryLock(1, SECONDS);
            if (locked) {
                System.out.println("get lock");
                System.out.println("start m2");
            } else {
                System.out.println("not get m2");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (locked) {
                lock.unlock();
            }
        }

    }
}


ReentrantLock可以用lockInterruptibly,在lock的时候可以被打断,一旦被打断,可以作出响应,而sychronized一旦wait后,必须得让别人notify,才能醒来。

代码如下:

package git.snippets.juc;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockInterrupt {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("a thread started and sleep forever");
                SECONDS.sleep(Integer.MAX_VALUE);
                System.out.println("a thread stopped");
            } catch (InterruptedException e) {
                System.out.println("the thread has been interrupted");
            } finally {
                lock.unlock();
            }
        });
        t1.start();
        Thread t2 = new Thread(() -> {
            try {
                lock.lockInterruptibly();
                System.out.println("if lock thread is interrupted, it will run");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                System.out.println("interrupted");
            } finally {
                lock.unlock();
            }
        });
        t2.start();
        try {
            SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t2.interrupt();

    }
}

ReentrantLock可以设置公平与否,公平的概念是,每个线程来了以后会检查等待队列里面会不会有等待的线程,如果有,则进入队列等待。

代码如下

package git.snippets.juc;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockFair extends Thread {
    static ReentrantLock lock = new ReentrantLock(true/*false*/);

    public static void main(String[] args) {
        ReentrantLockFair tl = new ReentrantLockFair();
        Thread t1 = new Thread(tl);
        Thread t2 = new Thread(tl);
        t1.start();
        t2.start();
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            lock.lock();
            try {
                System.out.println("current thread :" + Thread.currentThread().getName() + " get the lock");
                SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

        }
    }
}

注:不管是公平锁还是非公平锁,一旦没有竞争到锁,都会进行排队,当锁释放时,都是唤醒排在最前面的线程,所以非公平锁只是体现在了线程加锁阶段,而没有体现在线程被唤醒阶段。

synchronized锁的是对象,锁信息保存在对象头中,ReentrantLock通过代码中int类型的state标识来标识锁的状态。

注:在使用 ReentrantLock 的时候一定要记得 unlock,因为如果使用 synchronized 遇到异常,JVM 会自动释放锁,但是用 ReentrantLock 必须手动释放锁,因此经常在finally 中进行锁的释放

代码如下:

package git.snippets.juc;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockAndSynchronized {
    ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        ReentrantLockAndSynchronized t = new ReentrantLockAndSynchronized();
        new Thread(t::m).start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 由于前一个线程先执行m1,锁定this,所以只能等前一个线程执行完毕后才能执行下面线程的操作
        new Thread(t::m2).start();

    }

    void m() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
                if (i == 2) {
                    m2();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void m2() {
        lock.lock();
        try {
            System.out.println("start m2");
            int i = 1 / 0;
        } finally {
            // 如果不加这句unlock,程序会一直卡在这里
            lock.unlock();
        }

    }
}

package git.snippets.juc;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * 程序在执行过程中,如果出现异常,默认情况锁会被释放
 * 所以,在并发处理的过程中,有异常要多加小心,不然可能会发生不一致的情况。
 * 比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,
 * 在第一个线程中抛出异常,其他线程就会进入同步代码区,有可能会访问到异常产生时的数据。
 * 因此要非常小心的处理同步业务逻辑中的异常
 */
public class SynchronizedException implements Runnable {
    int count = 0;

    public static void main(String[] args) throws IOException {
        SynchronizedException myRun = new SynchronizedException();
        Thread thread = new Thread(myRun, "t1");
        Thread thread2 = new Thread(myRun, "t2");
        thread.start();
        thread2.start();
        System.in.read();

    }

    @Override
    public void run() {
        synchronized (this) {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("current thread is " + Thread.currentThread().getName() + " count is " + count);
                if (count == 5) {
                    count++;
                    // 遇到异常,synchronized 会自动释放锁
                    int m = 1 / 0;
                }
                count++;
            }
        }
    }

    synchronized void m1(String content) {
        System.out.println(this);
        System.out.println("m1 get content is " + content);
        m2(content);
    }

    synchronized void m2(String content) {
        System.out.println(this);
        System.out.println("m2 get content is " + content);

    }
}

ReentrantReadWriteLock

在 ReentrantReadWriteLock 中包含读锁和写锁,其中读锁是可以多线程共享的,即共享锁, 而写锁是排他锁,在更改时候不允许其他线程操作。读写锁其实是一把锁,所以会有同一时刻不允许读写锁共存的规定。之所以要细分读锁和写锁也是为了提高效率,将读和写分离,

示例:

package git.snippets.juc;


import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * ReentrantReadWriteLock读写锁示例
 **/
public class ReentrantLockReadAndWrite {

    private static ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private static ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
    private static ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();

    public static void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "获取读锁,开始执行");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "释放读锁");
        }
    }

    public static void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "获取写锁,开始执行");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + "释放写锁");
        }
    }

    public static void main(String[] args) {
        new Thread(() -> read(), "Thread1").start();
        new Thread(() -> read(), "Thread2").start();
        new Thread(() -> write(), "Thread3").start();
        new Thread(() -> write(), "Thread4").start();
    }
}

读锁的插队策略

设想如下场景:

在非公平的ReentrantReadWriteLock锁中,线程2和线程4正在同时读取,线程3想要写入,拿不到锁(同一时刻是不允许读写锁共存的),于是进入等待队列, 线程5不在队列里,现在过来想要读取,

策略1

如果允许读插队,就是说线程5读先于线程3写操作执行,因为读锁是共享锁,不影响后面的线程3的写操作,
这种策略可以提高一定的效率,却可能导致像线程3这样的线程一直在等待中,因为可能线程5读操作之后又来了n个线程也进行读操作,造成线程饥饿;

策略2

不允许插队,即线程5的读操作必须排在线程3的写操作之后,放入队列中,排在线程3之后,这样能避免线程饥饿。
事实上 ReentrantReadWriteLock 在非公平情况下,读锁采用的就是策略2:不允许读锁插队,避免线程饥饿。更加确切的说是:在非公平锁情况下,允许写锁插队,也允许读锁插队,

但是读锁插队的前提是队列中的头节点不能是想获取写锁的线程。

以上是在非公平ReentrantReadWriteLock锁中,

在公平锁中,读写锁都是是不允许插队的,严格按照线程请求获取锁顺序执行。

代码如下

package git.snippets.juc;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * ReentrantLock的读锁插队策略
 */
public class ReentrantLockCut {
    private static final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private static final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
    private static final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();

    public static void read() {
        System.out.println(Thread.currentThread().getName() + "开始尝试获取读锁");
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "获取读锁,开始执行");
            Thread.sleep(20);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "释放读锁");
        }
    }

    public static void write() {
        System.out.println(Thread.currentThread().getName() + "开始尝试获取写锁");
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "获取写锁,开始执行");
            Thread.sleep(40);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "释放写锁");
            writeLock.unlock();
        }
    }

    public static void main(String[] args) {
        new Thread(ReentrantLockCut::write, "Thread1").start();
        new Thread(ReentrantLockCut::read, "Thread2").start();
        new Thread(ReentrantLockCut::read, "Thread3").start();
        new Thread(ReentrantLockCut::write, "Thread4").start();
        new Thread(ReentrantLockCut::read, "Thread5").start();
        new Thread(() -> {
            Thread[] threads = new Thread[1000];
            for (int i = 0; i < 1000; i++) {
                threads[i] = new Thread(ReentrantLockCut::read, "子线程创建的Thread" + i);
            }
            for (int i = 0; i < 1000; i++) {
                threads[i].start();
            }
        }).start();
    }

}

锁的升降级

ReentrantReadWriteLock读写锁中,只支持写锁降级为读锁,而不支持读锁升级为写锁,

之所以ReentrantReadWriteLock不支持锁的升级(其它锁可以支持),主要是避免死锁,

例如两个线程A和B都在读, A升级要求B释放读锁,B升级要求A释放读锁,互相等待形成死循环。

如果能严格保证每次都只有一个线程升级那也是可以的。

代码如下

package git.snippets.juc;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 读锁无法升级为写锁
 * 写锁可以降级成读锁
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/21
 * @since
 */
public class ReentrantReadWriteLockUpAndDown {
    private static final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private static final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
    private static final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();

    public static void read() {
        System.out.println(Thread.currentThread().getName() + "开始尝试获取读锁");
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "获取读锁,开始执行");
            Thread.sleep(20);
            System.out.println(Thread.currentThread().getName() + "尝试升级读锁为写锁");
            //读锁升级为写锁(失败)
            writeLock.lock();
            System.out.println(Thread.currentThread().getName() + "读锁升级为写锁成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "释放读锁");
        }
    }

    public static void write() {
        System.out.println(Thread.currentThread().getName() + "开始尝试获取写锁");
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "获取写锁,开始执行");
            Thread.sleep(40);
            System.out.println(Thread.currentThread().getName() + "尝试降级写锁为读锁");
            //写锁降级为读锁(成功)
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "写锁降级为读锁成功");
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "释放写锁");
            writeLock.unlock();
            readLock.unlock();
        }
    }

    public static void main(String[] args) {
        new Thread(ReentrantReadWriteLockUpAndDown::write, "Thread1").start();
        new Thread(ReentrantReadWriteLockUpAndDown::read, "Thread2").start();
    }
}

CAS,Synchronized,Lock的使用情景

对于资源竞争较少(线程冲突较轻)的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源;而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能。

对于资源竞争严重(线程冲突严重)的情况,CAS 自旋的概率会比较大,从而浪费更多的 CPU 资源,效率低于synchronized

注: synchronized在jdk1.6之后,已经改进优化。synchronized的底层实现主要依靠 Lock-Free 的队列,基本思路是自旋后阻塞,竞争切换后继续竞争锁,稍微牺牲了公平性,但获得了高吞吐量。在线程冲突较少的情况下,可以获得和 CAS 类似的性能;而线程冲突严重的情况下,性能远高于 CAS。

synchronized作为悲观锁,比较适合写入操作比较频繁的场景,如果出现大量的读取操作,每次读取的时候都会进行加锁,这样会增加大量的锁的开销,降低了系统的吞吐量。

在资源竞争不是很激烈的情况下,偶尔会有同步的情形下,synchronized是很合适的。原因在于,编译程序通常会尽可能的进行优化synchronized,另外可读性非常好,不管用没用过5.0多线程包的程序员都能理解。默认是非公平锁:后等待的线程可以先获得锁。

ReentrantLock比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量。

Atomic和上面的类似,不激烈情况下,性能比synchronized略逊,而激烈的时候,也能维持常态。激烈的时候,Atomic的性能会优于ReentrantLock一倍左右。但是其有一个缺点,就是只能同步一个值,一段代码中只能出现一个Atomic的变量,多于一个同步无效。因为他不能在多个Atomic之间同步。

说明

本文涉及到的所有代码和图例

图例

代码

更多内容见:Java 多线程

参考资料

实战Java高并发程序设计(第2版)

深入浅出Java多线程

多线程与高并发-马士兵

Java并发编程实战

设计模式学习笔记

从LONGADDER看更高效的无锁实现

Java 8 Performance Improvements: LongAdder vs AtomicLong

Java中的共享锁和排他锁(以读写锁ReentrantReadWriteLock为例)

【并发编程】面试官:有没有比读写锁更快的锁?

图解Java多线程设计模式