fix: add the error to the global result queue#315
Conversation
Signed-off-by: Vigith Maurice <[email protected]>
|
To reproduce this bug we injected a runtime error in the pynumaflow code: def _process_requests(
self,
context: NumaflowServicerContext,
request_iterator: Iterator[map_pb2.MapRequest],
result_queue: SyncIterator,
):
try:
# read through all incoming requests and submit to the
# threadpool for invocation
for request in request_iterator:
_ = self.executor.submit(self._invoke_map, context, request, result_queue)
if should_we_panic(): # <--- inject raising error
raise RuntimeError("random runtime error in MapFn during reading requests!!")
# wait for all tasks to finish after all requests exhausted
self.executor.shutdown(wait=True)
# Indicate to the result queue that no more messages left to process
result_queue.put(STREAM_EOF)
except BaseException:
_LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)BeforeWithout the fix, the container gets stuck on exception as the main thread is still listening for events from the request_queue. UDF logs: Numa logs: After def _process_requests(
self,
context: NumaflowServicerContext,
request_iterator: Iterator[map_pb2.MapRequest],
result_queue: SyncIterator,
):
try:
# read through all incoming requests and submit to the
# threadpool for invocation
for request in request_iterator:
_ = self.executor.submit(self._invoke_map, context, request, result_queue)
if should_we_panic():
raise RuntimeError("random runtime error in MapFn during reading requests!!")
# wait for all tasks to finish after all requests exhausted
self.executor.shutdown(wait=True)
# Indicate to the result queue that no more messages left to process
result_queue.put(STREAM_EOF)
except BaseException as e:
_LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)
result_queue.put(e) # <------- fixWith the fix in place, the udf is able to restart along with the numa container exiting gracefully: container restarts: udf logs: numa logs: PendingReproduce scenario where we get the following error: |
|
Further issue reproduction attempt details. We tried sending SIGTERM to the numa container after the UDF gets stuck with the manually added runtime exception in the |
|
Further issue reproduction attempt details. Simply added shutdown of the executor followed by submitting another job to it: for request in request_iterator:
if should_we_panic() and not hasattr(self, '_shutdown_once'):
self._shutdown_once = True
self.executor.shutdown(wait=False)
_ = self.executor.submit(self._invoke_map, context, request, result_queue)Can see the same error getting raised in the UDF udf: Tried the fix and the error remains the same, but the containers restart: |
|
I was able to reproduce the error (similar to what @vaibhavtiwari33 did). The only possible scenario I could come up with is that the stream ends normally and gets recreated. def _process_requests(
self,
context: NumaflowServicerContext,
request_iterator: Iterable[map_pb2.MapRequest],
result_queue: SyncIterator,
):
_LOGGER.info("Starting _process_requests")
try:
# read through all incoming requests and submit to the
# threadpool for invocation
count = 0
for request in request_iterator:
count += 1
_ = self.executor.submit(self._invoke_map, context, request, result_queue)
if count > 5:
_LOGGER.info("simulating stream end after %d requests", count)
break
# wait for all tasks to finish after all requests exhausted
self.executor.shutdown(wait=True)
# Indicate to the result queue that no more messages left to process
result_queue.put(STREAM_EOF)Error: |
|
Tested with current change. It is causing UDF to restart |
Closes numaproj/numaflow#3216
The top level error handler for map and source-transformer was missing pushing to global result queue. This means, UDF errors will be handled, but not the top level service errors.