0

fast_local_dev_server should spew outputs as soon as they come

There is no point to queueing outputs, we should just print them
when we receive them. Also increase idle timeout to 5 minutes from 30s

Bug: 40763082
Change-Id: I5411f19e4c3c5e26861eff052b651def1aa097f6
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/6021042
Commit-Queue: Andrew Grieve <agrieve@chromium.org>
Auto-Submit: Mohamed Heikal <mheikal@chromium.org>
Reviewed-by: Andrew Grieve <agrieve@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1383252}
This commit is contained in:
Mohamed Heikal
2024-11-14 21:50:31 +00:00
committed by Chromium LUCI CQ
parent ec907c0730
commit ee1abc2690
2 changed files with 17 additions and 61 deletions

@ -26,7 +26,7 @@ from typing import Callable, Dict, List, Optional, Tuple, IO
sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp'))
from util import server_utils
_SOCKET_TIMEOUT = 30 # seconds
_SOCKET_TIMEOUT = 300 # seconds
_LOGFILES = {}
_LOGFILE_NAME = 'buildserver.log'
@ -100,42 +100,6 @@ def create_logfile(build_id, outdir):
return logfile
class QueuedOutputs:
"""Class to store outputs for completed tasks until autoninja asks for them."""
_lock = threading.Lock()
_pending_outputs = collections.defaultdict(list)
_output_pipes = {}
@classmethod
def add_output(cls, task: Task, output_str: str):
with cls._lock:
build_id = task.build_id
cls._pending_outputs[build_id].append(output_str)
# All ttys should be the same.
if task.tty:
cls._output_pipes[build_id] = task.tty
@classmethod
def get_pending_outputs(cls, build_id: str):
with cls._lock:
pending_outputs = cls._pending_outputs[build_id]
cls._pending_outputs[build_id] = []
return pending_outputs
@classmethod
def flush_messages(cls):
with cls._lock:
for build_id, messages in cls._pending_outputs.items():
if messages:
pipe = cls._output_pipes.get(build_id)
if pipe:
pipe.write('\nfast_local_dev_server.py shutting down with queued ' +
'task outputs. Flushing now:\n')
for message in messages:
pipe.write(message + '\n')
cls._pending_outputs = collections.defaultdict(list)
class TaskStats:
"""Class to keep track of aggregate stats for all tasks across threads."""
_num_processes = 0
@ -398,7 +362,8 @@ class Task:
# in the Traceback section obscure the actual error(s).
print('\n' + message)
if self.remote_print:
QueuedOutputs.add_output(self, message)
self.tty.write(message)
self.tty.flush()
log(f'{status_string} {self.name}',
quiet=self.options.quiet,
build_id=self.build_id)
@ -449,14 +414,12 @@ def _handle_add_task(data, tasks: Dict[Tuple[str, str], Task], options):
def _handle_query_build(data, connection: socket.socket):
build_id = data['build_id']
pending_outputs = QueuedOutputs.get_pending_outputs(build_id)
pending_tasks = TaskStats.num_pending_tasks(build_id)
completed_tasks = TaskStats.num_completed_tasks(build_id)
response = {
'build_id': build_id,
'completed_tasks': completed_tasks,
'pending_tasks': pending_tasks,
'pending_outputs': pending_outputs,
}
try:
with connection:
@ -515,7 +478,6 @@ def _process_requests(sock: socket.socket, options):
# Terminate all currently running tasks.
for task in tasks.values():
task.terminate()
QueuedOutputs.flush_messages()
log('STOPPED', end='\n', quiet=options.quiet)
@ -538,9 +500,6 @@ def _wait_for_build(build_id):
while True:
build_info = query_build_info(build_id)
pending_tasks = build_info['pending_tasks']
pending_outputs = build_info['pending_outputs']
for pending_message in pending_outputs:
print('\n' + pending_message)
if pending_tasks == 0:
print(f'\nAll tasks completed for build_id: {build_id}.')

@ -62,12 +62,6 @@ class TasksTest(unittest.TestCase):
def tearDown(self):
if os.path.exists(self._TTY_FILE):
with open(self._TTY_FILE, 'rt') as tty:
contents = tty.read()
# TTY should only be written to if the server crashes which is probably
# unexpected.
if contents:
self.fail('Found non-empty tty:\n' + repr(contents))
os.unlink(self._TTY_FILE)
self._process.terminate()
self._process.wait()
@ -87,23 +81,26 @@ class TasksTest(unittest.TestCase):
'stamp_file': _stamp_file.name,
})
def getTtyContents(self):
if os.path.exists(self._TTY_FILE):
with open(self._TTY_FILE, 'rt') as tty:
return tty.read()
return ''
def getBuildInfo(self):
build_info = server.query_build_info(self.id())
pending_tasks = build_info['pending_tasks']
completed_tasks = build_info['completed_tasks']
pending_outputs = build_info['pending_outputs']
return pending_tasks, completed_tasks, pending_outputs
return pending_tasks, completed_tasks
def waitForTasksDone(self, timeout_seconds=3):
timeout_duration = datetime.timedelta(seconds=timeout_seconds)
start_time = datetime.datetime.now()
all_pending_outputs = []
while True:
pending_tasks, completed_tasks, pending_outputs = self.getBuildInfo()
all_pending_outputs.extend(pending_outputs)
pending_tasks, completed_tasks = self.getBuildInfo()
if completed_tasks > 0 and pending_tasks == 0:
return all_pending_outputs
return
current_time = datetime.datetime.now()
duration = current_time - start_time
@ -113,14 +110,14 @@ class TasksTest(unittest.TestCase):
def testRunsQuietTask(self):
self.sendTask(['true'])
pending_outputs = self.waitForTasksDone()
self.assertEqual(len(pending_outputs), 0)
self.waitForTasksDone()
self.assertEqual(self.getTtyContents(), '')
def testRunsNoisyTask(self):
self.sendTask(['echo', 'some_output'])
pending_outputs = self.waitForTasksDone()
self.assertEqual(len(pending_outputs), 1)
self.assertIn('some_output', pending_outputs[0])
self.waitForTasksDone()
tty_contents = self.getTtyContents()
self.assertIn('some_output', tty_contents)
if __name__ == '__main__':