-
Notifications
You must be signed in to change notification settings - Fork 172
fix: instance grpc client once per process in benchmarks #1725
Changes from all commits
558d13d
2c0dc3a
dcc7932
2be3c4b
98f85ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -300,44 +300,46 @@ def target_wrapper(*args, **kwargs): | |
| ) | ||
|
|
||
|
|
||
| def _download_files_worker(files_to_download, other_params, chunks, bucket_type): | ||
| # For regional buckets, a new client must be created for each process. | ||
| # For zonal, the same is done for consistency. | ||
| # --- Global Variables for Worker Process --- | ||
| worker_loop = None | ||
| worker_client = None | ||
| worker_json_client = None | ||
|
|
||
|
|
||
| def _worker_init(bucket_type): | ||
| """Initializes a persistent event loop and client for each worker process.""" | ||
| global worker_loop, worker_client, worker_json_client | ||
| if bucket_type == "zonal": | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
| client = loop.run_until_complete(create_client()) | ||
| try: | ||
| # download_files_using_mrd_multi_coro returns max latency of coros | ||
| result = download_files_using_mrd_multi_coro( | ||
| loop, client, files_to_download, other_params, chunks | ||
| ) | ||
| finally: | ||
| tasks = asyncio.all_tasks(loop=loop) | ||
| for task in tasks: | ||
| task.cancel() | ||
| loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) | ||
| loop.close() | ||
| return result | ||
| worker_loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(worker_loop) | ||
| worker_client = worker_loop.run_until_complete(create_client()) | ||
| else: # regional | ||
| from google.cloud import storage | ||
|
|
||
| json_client = storage.Client() | ||
| worker_json_client = storage.Client() | ||
|
Comment on lines
+309
to
+319
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The resources created in this initializer ( While terminating worker processes will cause the OS to reclaim these resources, it's better to perform a graceful shutdown. You can use the I suggest updating def _worker_init(bucket_type):
"""Initializes a persistent event loop and client for each worker process."""
global worker_loop, worker_client, worker_json_client
import atexit
if bucket_type == "zonal":
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
worker_client = worker_loop.run_until_complete(create_client())
def _cleanup_zonal():
# Ensure resources are cleaned up when the worker process exits.
if worker_client and worker_loop and not worker_loop.is_closed():
try:
worker_loop.run_until_complete(worker_client.close())
finally:
worker_loop.close()
atexit.register(_cleanup_zonal)
else: # regional
from google.cloud import storage
worker_json_client = storage.Client()
def _cleanup_regional():
# Ensure resources are cleaned up when the worker process exits.
if worker_json_client:
worker_json_client.close()
atexit.register(_cleanup_regional)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. json_client doesn't have
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, json has |
||
|
|
||
|
|
||
| def _download_files_worker(files_to_download, other_params, chunks, bucket_type): | ||
| if bucket_type == "zonal": | ||
| # The loop and client are already initialized in _worker_init. | ||
| # download_files_using_mrd_multi_coro returns max latency of coros | ||
| return download_files_using_mrd_multi_coro( | ||
| worker_loop, worker_client, files_to_download, other_params, chunks | ||
| ) | ||
| else: # regional | ||
| # download_files_using_json_multi_threaded returns max latency of threads | ||
| return download_files_using_json_multi_threaded( | ||
| None, json_client, files_to_download, other_params, chunks | ||
| None, worker_json_client, files_to_download, other_params, chunks | ||
| ) | ||
|
|
||
|
|
||
| def download_files_mp_mc_wrapper(files_names, params, chunks, bucket_type): | ||
| num_processes = params.num_processes | ||
| def download_files_mp_mc_wrapper(pool, files_names, params, chunks, bucket_type): | ||
| num_coros = params.num_coros # This is n, number of files per process | ||
|
|
||
| # Distribute filenames to processes | ||
| filenames_per_process = [ | ||
| files_names[i : i + num_coros] for i in range(0, len(files_names), num_coros) | ||
| ] | ||
|
|
||
| args = [ | ||
| ( | ||
| filenames, | ||
|
|
@@ -348,10 +350,7 @@ def download_files_mp_mc_wrapper(files_names, params, chunks, bucket_type): | |
| for filenames in filenames_per_process | ||
| ] | ||
|
|
||
| ctx = multiprocessing.get_context("spawn") | ||
| with ctx.Pool(processes=num_processes) as pool: | ||
| results = pool.starmap(_download_files_worker, args) | ||
|
|
||
| results = pool.starmap(_download_files_worker, args) | ||
| return max(results) | ||
|
|
||
|
|
||
|
|
@@ -386,10 +385,16 @@ def test_downloads_multi_proc_multi_coro( | |
| logging.info("randomizing chunks") | ||
| random.shuffle(chunks) | ||
|
|
||
| ctx = multiprocessing.get_context("spawn") | ||
| pool = ctx.Pool( | ||
| processes=params.num_processes, | ||
| initializer=_worker_init, | ||
| initargs=(params.bucket_type,), | ||
| ) | ||
| output_times = [] | ||
|
|
||
| def target_wrapper(*args, **kwargs): | ||
| result = download_files_mp_mc_wrapper(*args, **kwargs) | ||
| result = download_files_mp_mc_wrapper(pool, *args, **kwargs) | ||
| output_times.append(result) | ||
| return output_times | ||
|
|
||
|
|
@@ -407,6 +412,8 @@ def target_wrapper(*args, **kwargs): | |
| ), | ||
| ) | ||
| finally: | ||
| pool.close() | ||
| pool.join() | ||
| publish_benchmark_extra_info(benchmark, params, true_times=output_times) | ||
| publish_resource_metrics(benchmark, m) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.