0

Improve GPU unexpected pass finder performance

Improves the GPU unexpected pass finder's performance when working with
large amounts of data by properly parallelizing the querying and
processing code instead of only using a thread pool (which isn't
actually multithreaded). This results in a ~130% speedup when run on
against the WebGL test suites with 100 samples.

Bug: 1175852
Change-Id: I80f6d42977f8a6cc777bb56559bbb8f6f45b3854
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2704952
Reviewed-by: Yuly Novikov <ynovikov@chromium.org>
Reviewed-by: Robbie Iannucci <iannucci@chromium.org>
Commit-Queue: Brian Sheedy <bsheedy@chromium.org>
Auto-Submit: Brian Sheedy <bsheedy@chromium.org>
Cr-Commit-Position: refs/heads/master@{#855556}
This commit is contained in:
Brian Sheedy
2021-02-19 00:52:50 +00:00
committed by Chromium LUCI CQ
parent 1ad2aff10c
commit 616110d140
7 changed files with 320 additions and 32 deletions

@ -389,3 +389,26 @@ wheel: <
platform: "win_amd64"
>
>
# Used by:
# //content/test/gpu
wheel: <
name: "infra/python/wheels/pathos/${vpython_platform}"
version: "version:0.2.7.chromium.3"
not_match_tag <
abi: "cp27mu"
platform: "manylinux1_i686"
>
not_match_tag <
abi: "cp27mu"
platform: "linux_mips64"
>
not_match_tag <
abi: "cp27mu"
platform: "linux_armv6l"
>
not_match_tag <
abi: "cp27mu"
platform: "linux_armv7l"
>
>

@ -6,10 +6,11 @@
import fnmatch
import json
import logging
import multiprocessing
import os
import subprocess
from unexpected_passes import multiprocessing_utils
TESTING_BUILDBOT_DIR = os.path.realpath(
os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'testing',
'buildbot'))
@ -154,7 +155,7 @@ def GetTryBuilders(ci_builders):
mirrored_builders = set()
no_output_builders = set()
pool = _GetPool()
pool = multiprocessing_utils.GetProcessPool()
results = pool.map(_GetMirroredBuildersForCiBuilder, ci_builders)
for (builders, found_mirror) in results:
if found_mirror:
@ -216,12 +217,6 @@ def _GetMirroredBuildersForCiBuilder(ci_builder):
return mirrored_builders, True
def _GetPool():
# Split out for testing - multiprocessing very much dislikes mocking/monkey
# patching functions that it's trying to use.
return multiprocessing.Pool()
def _GetBuildbucketOutputForCiBuilder(ci_builder):
# Ensure the user is logged in to bb.
if not _GetBuildbucketOutputForCiBuilder.authenticated:

@ -11,6 +11,8 @@ import mock
from pyfakefs import fake_filesystem_unittest
from unexpected_passes import builders
from unexpected_passes import multiprocessing_utils
from unexpected_passes import unittest_utils
class GetCiBuildersUnittest(fake_filesystem_unittest.TestCase):
@ -163,21 +165,15 @@ class GetMirroredBuildersForCiBuilderUnittest(unittest.TestCase):
class GetTryBuildersUnittest(unittest.TestCase):
class FakePool(object):
def map(self, f, inputs):
retval = []
for i in inputs:
retval.append(f(i))
return retval
def setUp(self):
self._get_patcher = mock.patch.object(builders,
'_GetMirroredBuildersForCiBuilder')
self._get_mock = self._get_patcher.start()
self.addCleanup(self._get_patcher.stop)
self._pool_patcher = mock.patch.object(builders, '_GetPool')
self._pool_patcher = mock.patch.object(multiprocessing_utils,
'GetProcessPool')
self._pool_mock = self._pool_patcher.start()
self._pool_mock.return_value = GetTryBuildersUnittest.FakePool()
self._pool_mock.return_value = unittest_utils.FakePool()
self.addCleanup(self._pool_patcher.stop)
def testNoOutputCausesFailure(self):

@ -0,0 +1,17 @@
# Copyright 2021 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from pathos import pools
def GetProcessPool():
"""Returns a pathos.pools.ProcessPool instance.
Split out for ease of unittesting since pathos can still run into pickling
issues with MagicMocks used in tests.
Returns:
A pathos.pools.ProcessPool instance.
"""
return pools.ProcessPool()

@ -3,6 +3,7 @@
# found in the LICENSE file.
"""Methods related to querying the ResultDB BigQuery tables."""
import copy
import json
import logging
import multiprocessing.pool
@ -12,6 +13,7 @@ import subprocess
from typ import expectations_parser
from unexpected_passes import builders as builders_module
from unexpected_passes import data_types
from unexpected_passes import multiprocessing_utils
DEFAULT_NUM_SAMPLES = 100
MAX_ROWS = (2**31) - 1
@ -125,26 +127,89 @@ def _FillExpectationMapForBuilders(expectation_map, builders, builder_type,
}
"""
all_unmatched_results = {}
# This is not officially documented, but comes up frequently when looking for
# information on Python thread pools. It has the same API as
# multiprocessing.Pool(), but uses threads instead of subprocesses. This
# results in us being able to avoid jumping through some hoops with regards to
# pickling that come with multiple processes. Since each thread is going to be
# I/O bound on the query, there shouldn't be a noticeable performance loss
# despite the default CPython interpreter not being truly multithreaded.
pool = multiprocessing.pool.ThreadPool()
curried_query = lambda b: QueryBuilder(b, builder_type, suite, project,
num_samples)
results_list = pool.map(curried_query, builders)
for (builder_name, results) in results_list:
prefixed_builder_name = '%s:%s' % (builder_type, builder_name)
unmatched_results = _AddResultListToMap(expectation_map,
prefixed_builder_name, results)
# We use two separate pools since each is better for a different task. Adding
# retrieved results to the expectation map is computationally intensive, so
# properly parallelizing it results in large speedup. Python's default
# interpreter does not support true multithreading, and the multiprocessing
# module throws a fit when using custom data types due to pickling, so use
# pathos' ProcessPool for this, which is like multiprocessing but handles all
# the pickling automatically.
#
# However, ProcessPool appears to add a non-trivial amount of overhead when
# passing data back and forth, so use a thread pool for triggering BigQuery
# queries. Each query is already started in its own subprocess, so the lack
# of multithreading is not an issue. multiprocessing.pool.ThreadPool() is not
# officially documented, but comes up frequently when looking for information
# on Python thread pools and is used in other places in the Chromium code
# base.
result_pool = multiprocessing_utils.GetProcessPool()
query_pool = multiprocessing.pool.ThreadPool()
arguments = [(b, builder_type, suite, project, num_samples) for b in builders]
results_list = query_pool.map(_QueryBuilderMultiprocess, arguments)
arguments = [(expectation_map, builder_type, bn, r)
for (bn, r) in results_list]
add_results = result_pool.map(_AddResultToMapMultiprocessing, arguments)
tmp_expectation_map = {}
for (unmatched_results, prefixed_builder_name, merge_map) in add_results:
_MergeExpectationMaps(tmp_expectation_map, merge_map, expectation_map)
if unmatched_results:
all_unmatched_results[prefixed_builder_name] = unmatched_results
expectation_map.clear()
expectation_map.update(tmp_expectation_map)
return all_unmatched_results
def _MergeExpectationMaps(base_map, merge_map, reference_map=None):
"""Merges |merge_map| into |base_map|.
Args:
base_map: A dict to be updated with the contents of |merge_map|. Will be
modified in place.
merge_map: A dict in the format returned by
expectations.CreateTestExpectationMap() whose contents will be merged
into |base_map|.
reference_map: A dict containing the information that was originally in
|base_map|. Used for ensuring that a single expectation/builder/step
combination is only ever updated once. If None, a copy of |base_map|
will be used.
"""
# We should only ever encounter a single updated BuildStats for an
# expectation/builder/step combination. Use the reference map to determine
# if a particular BuildStats has already been updated or not.
reference_map = reference_map or copy.deepcopy(base_map)
for key, value in merge_map.iteritems():
if key not in base_map:
base_map[key] = value
else:
if isinstance(value, dict):
_MergeExpectationMaps(base_map[key], value, reference_map.get(key, {}))
else:
assert isinstance(value, data_types.BuildStats)
# Ensure we haven't updated this BuildStats already. If the reference
# map doesn't have a corresponding BuildStats, then base_map shouldn't
# have initially either, and thus it would have been added before
# reaching this point. Otherwise, the two values must match, meaning
# that base_map's BuildStats hasn't been updated yet.
reference_stats = reference_map.get(key, None)
assert reference_stats is not None
assert reference_stats == base_map[key]
base_map[key] = value
def _AddResultToMapMultiprocessing(inputs):
expectation_map, builder_type, builder_name, results = inputs
prefixed_builder_name = '%s:%s' % (builder_type, builder_name)
unmatched_results = _AddResultListToMap(expectation_map,
prefixed_builder_name, results)
return unmatched_results, prefixed_builder_name, expectation_map
def _AddResultListToMap(expectation_map, builder, results):
"""Adds |results| to |expectation_map|.
@ -217,6 +282,11 @@ def _AddResultToMap(result, builder, expectation_map):
return found_matching_expectation
def _QueryBuilderMultiprocess(inputs):
builder, builder_type, suite, project, num_samples = inputs
return QueryBuilder(builder, builder_type, suite, project, num_samples)
def QueryBuilder(builder, builder_type, suite, project, num_samples):
"""Queries ResultDB for results from |builder|.

@ -2,6 +2,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import copy
import json
import subprocess
import unittest
@ -9,7 +10,9 @@ import unittest
import mock
from unexpected_passes import data_types
from unexpected_passes import multiprocessing_utils
from unexpected_passes import queries
from unexpected_passes import unittest_utils
class AddResultToMapUnittest(unittest.TestCase):
@ -498,6 +501,11 @@ class FillExpectationMapForBuildersUnittest(unittest.TestCase):
self._patcher = mock.patch.object(queries, 'QueryBuilder')
self._query_mock = self._patcher.start()
self.addCleanup(self._patcher.stop)
self._pool_patcher = mock.patch.object(multiprocessing_utils,
'GetProcessPool')
self._pool_mock = self._pool_patcher.start()
self._pool_mock.return_value = unittest_utils.FakePool()
self.addCleanup(self._pool_patcher.stop)
def testValidResults(self):
"""Tests functionality when valid results are returned by the query."""
@ -544,5 +552,169 @@ class FillExpectationMapForBuildersUnittest(unittest.TestCase):
})
class MergeExpectationMapsUnittest(unittest.TestCase):
maxDiff = None
def testEmptyBaseMap(self):
"""Tests that a merge with an empty base map copies the merge map."""
base_map = {}
merge_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
},
},
},
}
original_merge_map = copy.deepcopy(merge_map)
queries._MergeExpectationMaps(base_map, merge_map)
self.assertEqual(base_map, merge_map)
self.assertEqual(merge_map, original_merge_map)
def testEmptyMergeMap(self):
"""Tests that a merge with an empty merge map is a no-op."""
base_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
},
},
},
}
merge_map = {}
original_base_map = copy.deepcopy(base_map)
queries._MergeExpectationMaps(base_map, merge_map)
self.assertEqual(base_map, original_base_map)
self.assertEqual(merge_map, {})
def testMissingKeys(self):
"""Tests that missing keys are properly copied to the base map."""
base_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
},
},
},
}
merge_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step2': data_types.BuildStats(),
},
'builder2': {
'step': data_types.BuildStats(),
},
},
data_types.Expectation('foo', ['mac'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
}
}
},
'bar': {
data_types.Expectation('bar', ['win'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
},
},
},
}
expected_base_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
'step2': data_types.BuildStats(),
},
'builder2': {
'step': data_types.BuildStats(),
},
},
data_types.Expectation('foo', ['mac'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
}
}
},
'bar': {
data_types.Expectation('bar', ['win'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
},
},
},
}
queries._MergeExpectationMaps(base_map, merge_map)
self.assertEqual(base_map, expected_base_map)
def testMergeBuildStats(self):
"""Tests that BuildStats for the same step are merged properly."""
base_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
},
},
},
}
merge_stats = data_types.BuildStats()
merge_stats.AddFailedBuild('1')
merge_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': merge_stats,
},
},
},
}
expected_stats = data_types.BuildStats()
expected_stats.AddFailedBuild('1')
expected_base_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': expected_stats,
},
},
},
}
queries._MergeExpectationMaps(base_map, merge_map)
self.assertEqual(base_map, expected_base_map)
def testInvalidMerge(self):
"""Tests that updating a BuildStats instance twice is an error."""
base_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': data_types.BuildStats(),
},
},
},
}
merge_stats = data_types.BuildStats()
merge_stats.AddFailedBuild('1')
merge_map = {
'foo': {
data_types.Expectation('foo', ['win'], 'Failure'): {
'builder': {
'step': merge_stats,
},
},
},
}
original_base_map = copy.deepcopy(base_map)
queries._MergeExpectationMaps(base_map, merge_map, original_base_map)
with self.assertRaises(AssertionError):
queries._MergeExpectationMaps(base_map, merge_map, original_base_map)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -13,3 +13,18 @@ def CreateStatsWithPassFails(passes, fails):
for i in xrange(fails):
stats.AddFailedBuild('build_id%d' % i)
return stats
class FakePool(object):
"""A fake pathos.pools.ProcessPool instance.
Real pools don't like being given MagicMocks, so this allows testing of
code that uses pathos.pools.ProcessPool by returning this from
multiprocessing_utils.GetProcessPool().
"""
def map(self, f, inputs):
retval = []
for i in inputs:
retval.append(f(i))
return retval