<返回更多

并发编程常见方案

2023-09-18    洞窝技术
加入收藏

今天,我将通过一个例子向大家介绍几种常见的并发编程方案。

我们通过一个程序模拟统计一批文档的字数。

首先我们先看无并发情况下的DEMO:

// 用Doc代表文档
public class Doc {

    public final String content;

    public Doc(String content) {
        this.content = content;
    }
}

// 模拟目录和文件
public class Dir {
    public final static List<Doc> docs = new ArrayList<>();
    static {
        docs.add( new Doc("""
        《三国演义》是综合民间传说和戏曲、话本,结合陈寿的《三国志》、范晔《后汉书》、
        元代《三国志平话》、和裴松之注的史料,以及作者个人对社会人生的体悟写成。现所见刊本
        以明嘉靖本最早,分24卷,240则。清初毛宗岗父子又做了一些修改,并成为现在最常见的120回本。""") );

        docs.add( new Doc("""
        《水浒传》是中国历史上第一部用古白话文写成的歌颂农民起义的长篇章回体版块结构小说,
        以宋江领导的起义军为主要题材,通过一系列梁山英雄反抗压迫、英勇斗争的生动故事,暴露
        了北宋末年统治阶级的腐朽和残暴,揭露了当时尖锐对立的社会矛盾和“官逼民反的残酷现实。""") );

        docs.add( new Doc("""
        《西游记》前七回叙述孙悟空出世,有大闹天宫等故事。此后写孙悟空随唐僧西天取经,
        沿途除妖降魔、战胜困难的故事。书中唐僧、孙悟空、猪八戒、沙僧等形象刻画生动,规模宏大,
        结构完整,并且《西游记》富有浓厚的中国佛教色彩,其隐含意义非常深远,众说纷纭,见仁见智。
        可以从佛、道、俗等多个角度欣赏,是中国古典小说中伟大的浪漫主义文学作品。""") );

        docs.add( new Doc("""
        《红楼梦》讲述的是发生在一个虚构朝代的封建大家庭中的人事物,其中以贾宝玉、
        林黛玉、薛宝钗三个人之间的感情纠葛为主线通过对一些日常事件的描述体现了在贾府
        的大观园中以金陵十二钗为主体的众女子的爱恨情愁。""") );


    }

}

public class wordCount {

    int countDoc( Doc doc ) {
        return doc.content.length();
    }

    void count() {

        int c = 0;

        for ( Doc doc : Dir.docs ) {
            c += countDoc(doc);
        }

        System.out.println( "所有文档字数总计: " + c + "个" );
    }

    public static void mAIn(String[] args) {
        new WordCount().count();
    }

}

执行结果:

1.共享内存与锁

这是一种非常熟悉和常见的技术,JAVA在这方面的实现非常出色。使用它通常会经历三个阶段:初学时感觉复杂和可怕,掌握后变得非常好和强大,进一步深入学习后又变得复杂和具有一定危险性。

共享内存允许多个进程同时读写一块或多块常规内存区域。有时候,进程需要在这些内存区域上执行一系列具有原子性的操作,其他进程在这些操作完成之前不能访问这些区域。为了解决这个问题,我们可以使用锁,这是一种只允许一个进程访问某种资源的机制。

这个方案存在多个缺点:

  1. 即使冲突的概率很低,锁的开销仍然无法忽略。

  2. 这些锁也是内存系统中的竞争热点。

  3. 如果出现错误的进程不处理锁,可能会导致正在加锁的锁被丢弃。

  4. 当锁出现问题时,调试极为困难。

此外,当使用锁来同步两三个进程时,可能没有太大问题。然而,随着进程数量的增加,情况会变得越来越难以控制。最终,这可能导致复杂的死锁问题,即使是最经验丰富的开发者也无法预见。

这个方案大家都比较熟熟悉,通过多个线共同处理文章:

// 计数器通过同步保障多线计数
public class Counter1 {
    private int c = 0;
    public synchronized void inc( int n ) {
        this.c += n;
    }
    public synchronized int totalNumber() {
        return c;
    }

}

// 文档字数计算
public class DocProc1 implements Runnable {

    private final Counter1 counter;

    public final Doc doc;

    public DocProc1(Counter1 counter, Doc doc) {
        this.counter = counter;
        this.doc = doc;
    }

    public void run() {
        int c = countDoc( this.doc );
        counter.inc( c );
    }
    
    int countDoc( Doc doc ) {
        return doc.content.length();
    }

}

public class WordCount1 {

    private final Counter1 counter = new Counter1();

    void count() throws InterruptedException {

        List<Thread> threads = new ArrayList<>();

        // 启动多个线程处理文章
        for ( Doc doc : Dir.docs ) {
            DocProc1 docProc1 = new DocProc1(counter , doc);
            Thread t = new Thread(docProc1);
            threads.add( t );
            t.start();
        }

        for (Thread t : threads ) {
            t.join();
        }

        System.out.println( "所有文档字数总计: " + this.counter.totalNumber() + "个" );
    }

    public static void main(String[] args) throws InterruptedException {
        new WordCount1().count();
    }

}

执行结果:

2.软件事务性内存(STM)

STM(Software Transactional Memory,软件事务性内存)是一种将内存视为传统数据库,并使用事务来确定何时写入什么内容的方法。

通常,这种实现采用乐观的方式来避免锁的使用。它将一组读写访问视为一个单独的操作,如果两个进程同时尝试访问共享区域,则它们各自启动一个事务,最终只有一个事务会成功。另一个进程会意识到事务失败,并在检查共享区域的新内容后进行重试。

该模型直截了当,谁都不需要等待其他进程释放锁。

STM的主要缺点是必须重试失败的事务,甚至可能多次失败。此外,事务系统本身也会带来相当大的开销,并且在确定哪个进程成功之前,需要额外的内存来存储试图写入的数据。理想情况下,系统应该像支持虚拟内存那样为事务性内存提供硬件支持。

对于程序员来说,STM的可控性似乎比锁更好,只要竞争不频繁导致事务重启,就能充分发挥并发的优势。我们认为这种方法本质上是持锁共享内存的一种变体,其在操作系统层面的作用比应用编程层面更为重要。然而,针对这个问题的研究仍然非常活跃,局势可能会发生改变。

Java并没有直接支持STM方案,因此要实现一个通用的库会相对复杂。在这里,我们简单介绍一下它的原理。

public class Counter2 {

    private final AtomicInteger c = new AtomicInteger();

    // 只为表达STM原理
    public boolean inc( int n ) {
        int oldVal = c.get();
        int newVal = oldVal + n;
        return c.compareAndSet(oldVal , newVal);
    }

    public int totalNumber() {
        return c.get();
    }

}

public class DocProc2 implements Runnable {

    private final Counter2 counter;

    public final Doc doc;

    public DocProc2(Counter2 counter, Doc doc) {
        this.counter = counter;
        this.doc = doc;
    }

    public void run() {
        int c = countDoc( this.doc );
        while (!counter.inc( c ));
    }

    int countDoc( Doc doc ) {
        return doc.content.length();
    }

}

public class WordCount2 {

    private final Counter2 counter = new Counter2();

    void count() throws InterruptedException {

        List<Thread> threads = new ArrayList<>();

        for ( Doc doc : Dir.docs ) {
            DocProc2 docProc = new DocProc2(counter , doc);
            Thread t = new Thread(docProc);
            threads.add( t );
            t.start();
        }

        for (Thread t : threads ) {
            t.join();
        }

        System.out.println( "所有文档字数总计: " + this.counter.totalNumber() + "个" );
    }

    public static void main(String[] args) throws InterruptedException {
        new WordCount2().count();
    }

}

执行结果:

3.Future

另一个更现代的手段是采用所谓的future。

该方法的基本思路是,每个future都代表一个计算结果,这个结果被外包给其他进程处理,这个进程可以在其他CPU甚至其他计算机上运行。

Future可以像其他对象一样被传递,但在计算完成之前无法读取结果,必须等待计算完成。虽然这种方法简化了并发系统中的数据传递,但也使得程序在远程进程故障和网络故障的情况下变得脆弱。当计算结果尚未准备好而连接断开时,试图访问值的代码将无法执行。

public class DocProc3 implements Callable<Integer> {

    public final Doc doc;

    public DocProc3(Doc doc) {
        this.doc = doc;
    }

    public Integer call() {
        return countDoc( this.doc );
    }

    int countDoc( Doc doc ) {
        return doc.content.length();
    }

}

public class WordCount3 {

    void count() throws InterruptedException, ExecutionException {

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        List<Future<Integer>> futures = new ArrayList<>();

        for ( Doc doc : Dir.docs ) {
            Future<Integer> future = executorService.submit(new DocProc3(doc));
            futures.add(future);
        }

        int c = 0;
        for (Future<Integer> f : futures ) {
            c += f.get();
        }

        System.out.println( "所有文档字数总计: " + c + "个" );

        executorService.shutdownNow();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        new WordCount3().count();
    }

}

执行结果:

4.函数式编程

函数式编程是一种计算机科学中的编程范式,它通过应用和组合函数来构建程序。它是一种声明式编程范式,其中函数定义是将值映射到其他值的表达式树,而不是通过一系列命令式语句来更新程序运行状态。

在函数式编程中,函数被认为是一种重要的元素,它们可以被赋予名称(包括本地标识符),作为参数传递,并且可以像其他数据类型一样从其他函数返回。这种特性使得程序可以以声明性和可组合的方式编写,通过模块化地组合小功能。

篇幅有限,这里粘贴了一段百度百科的内容。函数通过保持不变性和没有副作用,天然具备线程安全性,可以放心地在并发环境中使用。

public class WordCount4 {

    int countDoc( Doc doc ) {
        return doc.content.length();
    }

    void count() {

        long total = Dir.docs.parallelStream().mapToInt(this::countDoc).sum();

        System.out.println( "所有文档字数总计: " + total + "个" );
    }
    
    public static void main(String[] args) {
        new WordCount4().count();
    }

}

执行结果:

5.消息传递

这是一个比较现实的工作场景,团队成员需要协同工作。当某人需要另一个人处理事情时,他会发送一条消息给对方。收到消息后,另一个人会处理事情,并在完成后回复一条消息。

消息传递是一种通信方式,它意味着接收进程实际上获得了一份独立的数据副本,而发送方无法感知接收方对该副本的任何操作。唯一能向发送方回传信息的方式是通过发送另一条消息。因此,不论收发双方是在同一台机器上还是被网络隔离,它们都能以相同的方式进行通信。

消息传递一般可分为同步方式和异步方式。在同步方式下,发送方在消息抵达接收端之前无法进行其他操作;而在异步方式下,一旦消息被投递,发送方就可以立即开始处理其他事务。(在现实世界中,机器之间的同步通信通常要求接收方给发送方发送一个确认消息,以告知一切正常,但这些细节对程序员来说可以是透明的。

java 自身没有实现该模式,AKKA开源的库实现了此模式。

严格说所有线程间通讯都应该用消息,我用个例子简单表达一下原理:

public class Counter5 {

    public Counter5() {
        this.procMail();
    }

    private final BlockingQueue<Integer> box = new ArrayBlockingQueue<>(100);

    private int c;

    public void inc( int n )  {
        try {
            box.put(n);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private volatile boolean stop = false;
    private void procMail() {
        new Thread() {
            @Override
            public void run() {
                while (!stop) {
                    try {
                        Integer n = box.poll(100 , TimeUnit.MILLISECONDS);
                        if ( n != null ) {
                            c += n;
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }.start();
    }

    public int totalNumber() throws InterruptedException {

        while ( ! box.isEmpty() ) {

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        this.stop = true;

        return c;
    }

}

public class DocProc5 implements Runnable {

    private final Counter5 counter;

    public final Doc doc;

    public DocProc5(Counter5 counter, Doc doc) {
        this.counter = counter;
        this.doc = doc;
    }

    public void run() {
        int n = countDoc( this.doc );
        counter.inc(n);
    }


    int countDoc( Doc doc ) {
        return doc.content.length();
    }

}

public class WordCount5 {

    private final Counter5 counter = new Counter5();

    void count() throws InterruptedException {

        List<Thread> threads = new ArrayList<>();

        for ( Doc doc : Dir.docs ) {
            DocProc5 docProc = new DocProc5(counter , doc);
            Thread t = new Thread(docProc);
            threads.add( t );
            t.start();
        }

        for (Thread t : threads ) {
            t.join();
        }

        System.out.println( "所有文档字数总计: " + this.counter.totalNumber() + "个" );
    }

    public static void main(String[] args) throws InterruptedException {
        new WordCount5().count();
    }

}

执行结果

 

关键词:并发编程      点击(5)
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多并发编程相关>>>