<返回更多

使用 PHP 处理十亿行数据,如何极致提升处理速度?

2024-04-23  51CTO  
加入收藏

译者 | 刘汪洋

审校 | 重楼

如果在阅读这篇文章之前,你还不了解“十亿行挑战”( The One Billion Row Challenge,1brc ),我推荐你访问 Gunnar Morling 的 1brc Github 代码仓库了解更多详情。

我有两位同事已经参与这项挑战并成功上榜,因此我也选择加入。

虽然 php 的执行速度并不出名,但我正开发一个 PHP 分析器,因此我想亲自测试一下 PHP的处理速度。

第一种尝试:简单直接的方法

我首先克隆了挑战的代码仓库,并生成了一个包含十亿行数据的文件measurements.txt。接下来,我开始尝试第一个解决方案:

<?php

$stations = [];

$fp = fopen('measurements.txt', 'r');

while ($data = fgetcsv($fp, null, ';')) {
    if (!isset($stations[$data[0]])) {
        $stations[$data[0]] = [
            $data[1],
            $data[1],
            $data[1],
            1
        ];
    } else {
        $stations[$data[0]][3]++;
        $stations[$data[0]][2] += $data[1];
        if ($data[1] < $stations[$data[0]][0]) {
            $stations[$data[0]][0] = $data[1];
        }
        if ($data[1] > $stations[$data[0]][1]) {
            $stations[$data[0]][1] = $data[1];
        }
    }
}

ksort($stations);

echo '{';
foreach ($stations as $k => &$station) {
    $station[2] = $station[2] / $station[3];
    echo $k, '=', $station[0], '/', $station[2], '/', $station[1], ', ';
}
echo '}';

这段代码逻辑简单明了:打开文件并通过 fgetcsv()读取数据。若之前未记录过该站点,则创建一个新条目;否则,进行计数器增加、温度累加,并检查当前温度是否刷新了最低或最高记录,如是,则进行更新。

处理完所有数据后,我使用ksort()对数组$stations进行排序,并输出每个站点的最低温度、平均温度(总温度/记录数)和最高温度。

令我惊讶的是,在我的笔记本电脑上运行这段简单脚本竟然耗时达到了25分钟。

很明显,我需要对这段代码进行优化,并对其进行性能分析:

通过可视化的时间线,我们可以分析出脚本运行明显受到 CPU 限制,脚本开始时的文件编译时间可以忽略不计,且几乎没有垃圾收集事件发生。

火焰图清晰地显示出,fgetcsv()函数占据了约 46% 的 CPU 时间。

使用 fgets() 替代 fgetcsv()

为了提升性能,我决定用fgets()替换fgetcsv()函数来逐行读取数据,并手动按;字符进行分割。

// ...
while ($data = fgets($fp, 999)) {
    $pos = strpos($data, ';');
    $city = substr($data, 0, $pos);
    $temp = substr($data, $pos + 1, -1);
// ...

同时,我还把代码中的$data[0]重命名为$city,$data[1]重命名为$temp,以增强代码的可读性。

这个简单的修改使得脚本运行时间大幅减少到 19 分钟 49 秒,虽然时间仍然较长,但相比之前已经减少了 21%。

通过火焰图的比较,可以看到在替换后 CPU 的时间利用率发生了变化,详细的根帧分析也揭示了具体的性能瓶颈位置:

在脚本的第 18 行和第 23 行花费了大约 38% 的CPU时间。

18 | $stations[$city][3]++;
   | // ...
23 | if ($temp > $stations[$city][1]) {

第 18 行是数组$stations的首次访问和增量操作,而第 23 行进行了一次看似不那么耗时的比较操作。尽管如此,进一步优化有助于揭示这些操作中潜在的性能开销。

尽可能使用引用

为了提高性能,我决定在处理数组时使用引用,以避免每次访问数组时都对$stations数组中的键进行搜索。这相当于为数组中的"当前"站点设置了一个缓存。

代码如下:

$station = &$stations[$city];
$station[3]++;
$station[2] += $temp;
// 替代原有的
$stations[$city][3]++;
$stations[$city][2] += $temp;

这一改变实际上大大减少了执行时间,将其缩短到 17 分钟 48 秒,进一步减少了 **10% **的运行时间。

条件判断优化

在审查代码的过程中,我注意到了以下片段:

if ($temp < $station[0]) {
    $station[0] = $temp;
} elseif ($temp > $station[1]) {
    $station[1] = $temp;
}

考虑到一个温度值如果低于最小值,则不可能同时高于最大值,因此我使用elseif来优化条件判断,这可能会节省一些 CPU 周期。

需要指出的是,由于我不知道measurements.txt中温度值的排列顺序,根据这个顺序,首先检查最小值还是最大值可能会有所不同。

这次优化将时间进一步缩短到 17 分钟 30 秒,节省了大约 2% 的时间,虽然这个提升并不是非常显著。

执行类型转换

PHP是一种动态类型语言,我在编程初期非常欣赏它这一特点,因为它简化了许多问题。然而,另一方面,明确变量类型能帮助解释引擎更高效地执行代码。

$temp = (float)substr($data, $pos + 1, -1);

令人惊讶的是,这个简单的类型转换把脚本执行时间缩短至 13 分钟 32 秒,性能提升达到了惊人的 **21% **!

18 | $station = &$stations[$city];
   | // ...
23 | } elseif ($temp > $station[1]) {

在优化后,第 18 行显示数组访问的 CPU 时间消耗从 11% 减少,这是因为减少了在 PHP 的哈希映射(关联数组的底层数据结构)中搜索键的次数。

第 23 行的 CPU 时间从约 32% 减少到约 15%。这是因为避免了类型转换的开销。在优化之前,$temp、$station[0]和$station[1]是字符串类型,因此 PHP 在每次比较时必须将它们转换为浮点数。

引入 JIT

在优化过程中,我还尝试启用了 PHP 的 JIT(即时编译器),它是 OPCache 的一部分。默认情况下,OPCache 在 CLI(命令行界面)模式下被禁用,因此需通过将opcache.enable_cli 设置为 on来启用。此外,虽然JIT默认为开启状态,但由于缓冲区大小默认设置为0,实际上处于禁用状态。通过将opcache.jit-buffer-size设置为10M,我有效地启用了 JIT。

启用 JIT 后,脚本执行时间惊人地缩减至 7 分钟 19 秒,速度提升了 45.9%。

进一步优化

通过这系列优化,我将脚本的执行时间从最初的 25 分钟大幅降低到了约 7 分钟。在这个过程中,我注意到使用fgets()读取一个 13GB 的文件时,竟然分配了大约 56GiB 每分钟的 RAM,这显然是不合理的。经过调查,我发现省略fgets()的长度参数可以大量减少内存分配:

while ($data = fgets($fp)) {
// 替代之前的
while ($data = fgets($fp, 999)) {

这个简单变化虽然只使性能提高了约 1%,但将内存分配从每分钟 56GiB 降至每分钟 6GiB,显著减少了内存占用。这一改进虽然对执行时间影响不大,但减少内存消耗对于大规模数据处理仍然是一个重要的优化方向。

以上优化展示了在 PHP 性能调优中考虑各种因素的重要性,包括代码逻辑优化、类型明确、JIT编译以及内存管理等,共同作用下可以显著提升应用性能。

还能更快吗?

到目前为止,我使用的单线程方法,与许多PHP程序默认的单线程方式相符,但通过使用parallel 扩展,PHP 实际上能在用户空间内实现多线程操作。

性能分析明确指出,在 PHP 中进行数据读取成为了性能瓶颈。虽然从 fgetcsv() 切换到 fgets() 并手动进行字符串分割有所改进,但这种方式仍旧相对耗时。因此,我们考虑采用多线程的方式来并行地读取和处理数据,并在之后将各个工作线程的中间结果合并起来。

<?php

$file = 'measurements.txt';

$threads_cnt = 16;

/**
 * 计算并返回每个线程应处理的文件块的起始和结束位置。
 * 这些位置将基于 n 字符进行对齐,因为我们使用 `fgets()` 进行读取,
 * 它会读取直到遇到 n 字符为止。
 *
 * @return array<int, array{0: int, 1: int}>
 */
function get_file_chunks(string $file, int $cpu_count): array {
    $size = filesize($file);

    if ($cpu_count == 1) {
        $chunk_size = $size;
    } else {
        $chunk_size = (int) ($size / $cpu_count);
    }

    $fp = fopen($file, 'rb');

    $chunks = [];
    $chunk_start = 0;
    while ($chunk_start < $size) {
        $chunk_end = min($size, $chunk_start + $chunk_size);

        if ($chunk_end < $size) {
            fseek($fp, $chunk_end);
            fgets($fp); // 将文件指针移动到下一个 n 字符
            $chunk_end = ftell($fp);
        }

        $chunks[] = [
            $chunk_start,
            $chunk_end
        ];

        $chunk_start = $chunk_end;
    }

    fclose($fp);
    return $chunks;
}

/**
 * 该函数负责打开指定的 `$file` 文件,并从 `$chunk_start` 开始读取处理数据,
 * 直到达到 `$chunk_end`。
 *
 * 返回的结果数组以城市名作为键,其值为一个数组,包含最低温度(键 0)、最高温度(键 1)、
 * 温度总和(键 2)及温度计数(键 3)。
 *
 * @return array<string, array{0: float, 1: float, 2: float, 3: int}>
 */
$process_chunk = function (string $file, int $chunk_start, int $chunk_end): array {
    $stations = [];
    $fp = fopen($file, 'rb');
    fseek($fp, $chunk_start);
    while ($data = fgets($fp)) {
        $chunk_start += strlen($data);
        if ($chunk_start > $chunk_end) {
            break;
        }
        $pos2 = strpos($data, ';');
        $city = substr($data, 0, $pos2);
        $temp = (float)substr($data, $pos2 + 1, -1);
        if (isset($stations[$city])) {
            $station = &$stations[$city];
            $station[3]++;
            $station[2] += $temp;
            if ($temp < $station[0]) {
                $station[0] = $temp;
            } elseif ($temp > $station[1]) {
                $station[1] = $temp;
            }
        } else {
            $stations[$city] = [
                $temp,
                $temp,
                $temp,
                1
            ];
        }
    }
    return $stations;
};

$chunks = get_file_chunks($file, $threads_cnt);

$futures = [];

for ($i = 0; $i < $threads_cnt; $i++) {
    $runtime = new parallelRuntime();
    $futures[$i] = $runtime->run(
        $process_chunk,
        [
            $file,
            $chunks[$i][0],
            $chunks[$i][1]
        ]
    );
}

$results = [];

for ($i = 0; $i < $threads_cnt; $i++) {
    // 等待线程结果,主线程在此处阻塞直至获取结果
    $chunk_result = $futures[$i]->value();
    foreach ($chunk_result as $city => $measurement) {
        if (isset($results[$city])) {
            $result = &$results[$city];
            $result[2] += $measurement[2];
            $result[3] += $measurement[3];
            if ($measurement[0] < $result[0]) {
                $result[0] = $measurement[0];
            }
            if ($measurement[1] > $result[1]) {
                $result[1] = $measurement[1];
            }
        } else {
            $results[$city] = $measurement;
        }
    }
}

ksort($results);

echo '{', PHP_EOL;
foreach ($results as $k => &$station) {
    echo "t", $k, '=', $station[0], '/', ($station[2] / $station[3]), '/', $station[1], ',', PHP_EOL;
}
echo '}', PHP_EOL;

该段代码主要执行以下操作:首先,它扫描文件并将其分割成以 n 为界的块(利用 fgets() 进行读取)。准备好这些块后,我启动了 $threads_cnt 个工作线程,它们分别打开相同的文件并跳转到分配给它们的块的起始位置,继续读取并处理数据直到块结束,返回中间结果。最后,在主线程中合并、排序并输出这些结果。

利用多线程处理,这个过程只需:

关键词:PHP      点击(8)
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多PHP相关>>>