0

Switch UPFs to use ThreadPoolExecutor

Switches unexpected pass finder code to use
concurrent.futures.ThreadPoolExecutor instead of the third party pathos
module. This has a number of benefits while retaining a very similar
API surface:

1. Reduced system memory usage since everything is handled in one
   process and we aren't forking a process that has potentially
   gigabytes of results stored (initial attempts to use spawn instead of
   fork for process creation did not seem to have an effect)
2. Reduced latency and memory usage when merging results since results
   are handled as they become available instead of all at once at the
   end. This benefit could have been achieved via pathos' imap(), as
   well.
3. Less reliance on third party code. While pathos does a good job of
   improving on Python's multiprocessing module, mostly by
   automagically handling pickling issues that multiprocessing complains
   about, it is still a third party dependency that is better not to
   keep around if we can use purely built-ins instead.

This has the potential to be a bit slower than the pathos implementation
if multiple queries finish at the same time due to Python's lack of true
multithreading with the default interpreter, but in practice, that does
not seem to be a significant issue.

Anecdotally, this resulted in a ~20 GB reduction in peak memory usage
locally when running the WebGPU UPF with 100 samples and 4
concurrent jobs, which works out to 75-80% less memory.

Bug: 326277799
Change-Id: I5d75ad16e2a06e8f8d4a094e56c03b694936f813
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5347480
Reviewed-by: Ben Pastene <bpastene@chromium.org>
Reviewed-by: Yuly Novikov <ynovikov@chromium.org>
Commit-Queue: Brian Sheedy <bsheedy@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1269152}
This commit is contained in:
Brian Sheedy
2024-03-06 19:35:01 +00:00
committed by Chromium LUCI CQ
parent d992447fa9
commit 5cf68ce8d5
7 changed files with 18 additions and 129 deletions

@ -470,29 +470,6 @@ wheel: <
version: "version:5.5.chromium.3"
>
# Used by:
# //content/test/gpu
wheel: <
name: "infra/python/wheels/pathos/${vpython_platform}"
version: "version:0.2.7.chromium.5"
not_match_tag <
abi: "cp27mu"
platform: "manylinux1_i686"
>
not_match_tag <
abi: "cp27mu"
platform: "linux_mips64"
>
not_match_tag <
abi: "cp27mu"
platform: "linux_armv6l"
>
not_match_tag <
abi: "cp27mu"
platform: "linux_armv7l"
>
>
wheel: <
name: "infra/python/wheels/websockets-py3"
version: "version:11.0.3"

@ -5,6 +5,7 @@
from __future__ import print_function
import concurrent.futures
import json
import logging
import os
@ -15,7 +16,6 @@ import six
from unexpected_passes_common import constants
from unexpected_passes_common import data_types
from unexpected_passes_common import multiprocessing_utils
TESTING_BUILDBOT_DIR = os.path.realpath(
os.path.join(os.path.dirname(__file__), '..', 'buildbot'))
@ -157,13 +157,15 @@ class Builders():
mirrored_builders = set()
no_output_builders = set()
with multiprocessing_utils.GetProcessPoolContext() as pool:
results = pool.map(self._GetMirroredBuildersForCiBuilder, ci_builders)
for (builders, found_mirror) in results:
if found_mirror:
mirrored_builders |= builders
else:
no_output_builders |= builders
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()) as pool:
results_iter = pool.map(self._GetMirroredBuildersForCiBuilder,
ci_builders)
for (builders, found_mirror) in results_iter:
if found_mirror:
mirrored_builders |= builders
else:
no_output_builders |= builders
if no_output_builders:
raise RuntimeError(

@ -21,7 +21,6 @@ from pyfakefs import fake_filesystem_unittest
from unexpected_passes_common import builders
from unexpected_passes_common import constants
from unexpected_passes_common import data_types
from unexpected_passes_common import multiprocessing_utils
from unexpected_passes_common import unittest_utils
@ -395,11 +394,6 @@ class GetTryBuildersUnittest(FakeFilesystemTestCaseWithFileCreation):
'_BuilderRunsTestOfInterest')
self._runs_test_mock = self._runs_test_patcher.start()
self.addCleanup(self._runs_test_patcher.stop)
self._pool_patcher = mock.patch.object(multiprocessing_utils,
'GetProcessPool')
self._pool_mock = self._pool_patcher.start()
self._pool_mock.return_value = unittest_utils.FakePool()
self.addCleanup(self._pool_patcher.stop)
self.setUpPyfakefs()
# Make sure the directory exists.

@ -1,36 +0,0 @@
# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
import contextlib
from typing import Generator, Optional
from pathos import pools
def GetProcessPool(nodes: Optional[int] = None) -> pools.ProcessPool:
"""Returns a pathos.pools.ProcessPool instance.
Split out for ease of unittesting since pathos can still run into pickling
issues with MagicMocks used in tests.
Args:
nodes: How many processes processes to spawn in the process pool.
Returns:
A pathos.pools.ProcessPool instance.
"""
return pools.ProcessPool(nodes=nodes)
@contextlib.contextmanager
def GetProcessPoolContext(
nodes: Optional[int] = None) -> Generator[pools.ProcessPool, None, None]:
try:
pool = GetProcessPool(nodes)
yield pool
finally:
pool.close()
pool.join()

@ -3,6 +3,7 @@
# found in the LICENSE file.
"""Methods related to querying the ResultDB BigQuery tables."""
import concurrent.futures
import json
import logging
import math
@ -21,7 +22,6 @@ from unexpected_passes_common import builders as builders_module
from unexpected_passes_common import constants
from unexpected_passes_common import data_types
from unexpected_passes_common import expectations
from unexpected_passes_common import multiprocessing_utils
DEFAULT_NUM_SAMPLES = 100
MAX_ROWS = (2**31) - 1
@ -148,19 +148,15 @@ class BigQueryQuerier(object):
num_jobs = self._num_jobs or len(builders)
args = [(b, expectation_map) for b in builders]
with multiprocessing_utils.GetProcessPoolContext(num_jobs) as pool:
results = pool.map(self._QueryAddCombined, args)
tmp_expectation_map = data_types.TestExpectationMap()
all_unmatched_results = {}
# results is potentially rather large, so reduce the list as we iterate over
# it to avoid using excessive memory.
while results:
(unmatched_results, prefixed_builder_name, merge_map) = results.pop()
tmp_expectation_map.Merge(merge_map, expectation_map)
if unmatched_results:
all_unmatched_results[prefixed_builder_name] = unmatched_results
with concurrent.futures.ThreadPoolExecutor(max_workers=num_jobs) as pool:
for result in pool.map(self._QueryAddCombined, args):
unmatched_results, prefixed_builder_name, merge_map = result
tmp_expectation_map.Merge(merge_map, expectation_map)
if unmatched_results:
all_unmatched_results[prefixed_builder_name] = unmatched_results
expectation_map.clear()
expectation_map.update(tmp_expectation_map)

@ -16,7 +16,6 @@ from unexpected_passes_common import builders
from unexpected_passes_common import constants
from unexpected_passes_common import data_types
from unexpected_passes_common import expectations
from unexpected_passes_common import multiprocessing_utils
from unexpected_passes_common import queries
from unexpected_passes_common import unittest_utils
@ -329,11 +328,6 @@ class FillExpectationMapForBuildersUnittest(unittest.TestCase):
self._query_patcher = mock.patch.object(self._querier, 'QueryBuilder')
self._query_mock = self._query_patcher.start()
self.addCleanup(self._query_patcher.stop)
self._pool_patcher = mock.patch.object(multiprocessing_utils,
'GetProcessPool')
self._pool_mock = self._pool_patcher.start()
self._pool_mock.return_value = unittest_utils.FakePool()
self.addCleanup(self._pool_patcher.stop)
self._filter_patcher = mock.patch.object(self._querier,
'_FilterOutInactiveBuilders')
self._filter_mock = self._filter_patcher.start()

@ -5,7 +5,7 @@
from __future__ import print_function
from typing import Any, Callable, Iterable, List, Optional, Set, Tuple, Type
from typing import Iterable, List, Optional, Set, Tuple, Type
import unittest.mock as mock
from unexpected_passes_common import builders
@ -97,44 +97,6 @@ def GetArgsForMockCall(call_args_list: List[tuple],
return args, kwargs
class FakePool():
"""A fake pathos.pools.ProcessPool instance.
Real pools don't like being given MagicMocks, so this allows testing of
code that uses pathos.pools.ProcessPool by returning this from
multiprocessing_utils.GetProcessPool().
"""
def map(self, f: Callable[[Any], Any], inputs: Iterable[Any]) -> List[Any]:
retval = []
for i in inputs:
retval.append(f(i))
return retval
def apipe(self, f: Callable[[Any], Any],
inputs: Iterable[Any]) -> 'FakeAsyncResult':
return FakeAsyncResult(f(inputs))
def close(self) -> None:
pass
def join(self) -> None:
pass
class FakeAsyncResult():
"""A fake AsyncResult like the one from multiprocessing or pathos."""
def __init__(self, result: Any):
self._result = result
def ready(self) -> bool:
return True
def get(self) -> Any:
return self._result
class FakeProcess():
"""A fake subprocess Process object."""