Skip to content

fix: add the error to the global result queue#315

Merged
BulkBeing merged 1 commit intomainfrom
fix-udf-exit
Feb 11, 2026
Merged

fix: add the error to the global result queue#315
BulkBeing merged 1 commit intomainfrom
fix-udf-exit

Conversation

@vigith
Copy link
Copy Markdown
Member

@vigith vigith commented Feb 9, 2026

Closes numaproj/numaflow#3216

The top level error handler for map and source-transformer was missing pushing to global result queue. This means, UDF errors will be handled, but not the top level service errors.

@vaibhavtiwari33
Copy link
Copy Markdown
Contributor

vaibhavtiwari33 commented Feb 10, 2026

To reproduce this bug we injected a runtime error in the pynumaflow code:

    def _process_requests(
        self,
        context: NumaflowServicerContext,
        request_iterator: Iterator[map_pb2.MapRequest],
        result_queue: SyncIterator,
    ):
        try:
            # read through all incoming requests and submit to the
            # threadpool for invocation
            for request in request_iterator:
                _ = self.executor.submit(self._invoke_map, context, request, result_queue)
                if should_we_panic():               # <--- inject raising error
                    raise RuntimeError("random runtime error in MapFn during reading requests!!")
            # wait for all tasks to finish after all requests exhausted
            self.executor.shutdown(wait=True)
            # Indicate to the result queue that no more messages left to process
            result_queue.put(STREAM_EOF)
        except BaseException:
            _LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)

Before

Without the fix, the container gets stuck on exception as the main thread is still listening for events from the request_queue.

UDF logs:

2026-02-10 17:08:53 CRITICAL MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/even_odd/.venv/lib/python3.12/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 97, in _process_requests
    raise RuntimeError("random runtime error in MapFn during reading requests!!")
RuntimeError: random runtime error in MapFn during reading requests!!

Numa logs:

2026-02-10T17:09:51.042475Z  INFO numaflow_core::tracker: Processed messages per second processed=0
2026-02-10T17:09:52.042167Z  INFO numaflow_core::tracker: Processed messages per second processed=0
2026-02-10T17:09:53.041661Z  INFO numaflow_core::tracker: Processed messages per second processed=0
2026-02-10T17:09:53.060125Z  INFO numaflow_core::metrics: Pending messages Some(39979), partition: numaflow-system-mp-bug-even-or-odd-0
2026-02-10T17:09:54.041524Z  INFO numaflow_core::tracker: Processed messages per second processed=0

After

    def _process_requests(
        self,
        context: NumaflowServicerContext,
        request_iterator: Iterator[map_pb2.MapRequest],
        result_queue: SyncIterator,
    ):
        try:
            # read through all incoming requests and submit to the
            # threadpool for invocation
            for request in request_iterator:
                _ = self.executor.submit(self._invoke_map, context, request, result_queue)
                if should_we_panic():
                    raise RuntimeError("random runtime error in MapFn during reading requests!!")
            # wait for all tasks to finish after all requests exhausted
            self.executor.shutdown(wait=True)
            # Indicate to the result queue that no more messages left to process
            result_queue.put(STREAM_EOF)
        except BaseException as e:            
            _LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)
            result_queue.put(e)                      # <------- fix

With the fix in place, the udf is able to restart along with the numa container exiting gracefully:

container restarts:

mp-bug-even-or-odd-0-k94gc             1/3     Completed   8 (28s ago)     2m9s
mp-bug-even-or-odd-1-nsvw9             1/3     Completed   8 (28s ago)     2m9s
mp-bug-even-or-odd-2-cqi2r             1/3     Completed   8 (17s ago)     2m9s
mp-bug-even-or-odd-3-mukgw             1/3     Completed   8 (17s ago)     2m9s
mp-bug-even-or-odd-4-ubv5k             1/3     Completed   8 (29s ago)     2m9s

udf logs:

2026-02-10 17:46:55 CRITICAL MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/even_odd/.venv/lib/python3.12/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 97, in _process_requests
    raise RuntimeError("random runtime error in MapFn during reading requests!!")
RuntimeError: random runtime error in MapFn during reading requests!!

numa logs:

2026-02-10T17:46:30.360266Z  INFO numaflow_core::pipeline::isb::reader: ISBReader cleanup on shutdown completed.
2026-02-10T17:46:30.360294Z  INFO numaflow_core::mapper::map: Map input stream ended, waiting for inflight messages to finish
2026-02-10T17:46:30.360299Z  INFO numaflow_core::mapper::map: Map component is completed with status status=Err(Grpc(Status { code: Unknown, message: "h2 protocol error: error reading a body from connection", source: Some(hyper::Error(Body, Error { kind: Io(Kind(ConnectionReset)) })) }))
2026-02-10T17:46:30.360598Z ERROR numaflow_core::pipeline::forwarder::map_forwarder: Error while mapping messages e=Grpc(Status { code: Unknown, message: "h2 protocol error: error reading a body from connection", source: Some(hyper::Error(Body, Error { kind: Io(Kind(ConnectionReset)) })) })
2026-02-10T17:46:30.360629Z  INFO numaflow_core::pipeline::forwarder::map_forwarder: Forwarder task completed result=Err(Grpc(Status { code: Unknown, message: "h2 protocol error: error reading a body from connection", source: Some(hyper::Error(Body, Error { kind: Io(Kind(ConnectionReset)) })) }))
2026-02-10T17:46:30.360648Z  INFO numaflow_core::metrics: Stopped the Lag-Reader Expose tasks
2026-02-10T17:46:30.360659Z ERROR numaflow_core: Pipeline failed because of UDF failure error=Status { code: Unknown, message: "h2 protocol error: error reading a body from connection", source: Some(hyper::Error(Body, Error { kind: Io(Kind(ConnectionReset)) })) }
2026-02-10T17:46:30.360819Z  INFO numaflow_core: Gracefully Exiting...
2026-02-10T17:46:30.360840Z  INFO numaflow: Exited.

Pending

Reproduce scenario where we get the following error:

2026-02-07T01:04:21.366240821+01:00   File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 171, in submit
2026-02-07T01:04:21.366245261+01:00     raise RuntimeError('cannot schedule new futures after shutdown')
2026-02-07T01:04:21.366249651+01:00 RuntimeError: cannot schedule new futures after shutdown

@vaibhavtiwari33
Copy link
Copy Markdown
Contributor

Further issue reproduction attempt details.

We tried sending SIGTERM to the numa container after the UDF gets stuck with the manually added runtime exception in the _process_request loop. It leads to the numa container waiting for pending messages. This is likely due to the numa bug described in the PR.
Updating the numa container to the latest image allows the numa container to exit and restart gracefully, but the UDF remains stuck with the Runtime Error getting raised every time the numa container restarts:

----- first -----> 2026-02-10 19:08:16 CRITICAL MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/even_odd/.venv/lib/python3.12/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 93, in _process_requests
    raise RuntimeError("random runtime error in MapFn during reading requests!!")
RuntimeError: random runtime error in MapFn during reading requests!!
----- numa restarts -----> 2026-02-10 19:09:27 CRITICAL MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/even_odd/.venv/lib/python3.12/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 93, in _process_requests
    raise RuntimeError("random runtime error in MapFn during reading requests!!")
RuntimeError: random runtime error in MapFn during reading requests!!
----- numa restarts -----> 2026-02-10 19:11:40 CRITICAL MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/even_odd/.venv/lib/python3.12/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 93, in _process_requests
    raise RuntimeError("random runtime error in MapFn during reading requests!!")
RuntimeError: random runtime error in MapFn during reading requests!!

@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as ready for review February 10, 2026 20:42
@vaibhavtiwari33
Copy link
Copy Markdown
Contributor

Further issue reproduction attempt details.

Simply added shutdown of the executor followed by submitting another job to it:

for request in request_iterator:
    if should_we_panic() and not hasattr(self, '_shutdown_once'):
        self._shutdown_once = True
        self.executor.shutdown(wait=False)
    _ = self.executor.submit(self._invoke_map, context, request, result_queue)

Can see the same error getting raised in the UDF

udf:

2026-02-10 20:13:17 CRITICAL MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/even_odd/.venv/lib/python3.12/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 95, in _process_requests
    _ = self.executor.submit(self._invoke_map, context, request, result_queue)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 171, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

Tried the fix and the error remains the same, but the containers restart:

mp-bug-daemon-6b48995f7-94kqw          1/1     Running     0                2m45s
mp-bug-even-or-odd-0-3gl9c             1/3     Completed   8 (53s ago)      2m45s
mp-bug-even-or-odd-1-soh4v             1/3     Completed   8 (60s ago)      2m45s
mp-bug-even-or-odd-2-o2ikt             1/3     Completed   8 (56s ago)      2m45s
mp-bug-even-or-odd-3-qbbtk             1/3     Completed   8 (55s ago)      2m45s
mp-bug-even-or-odd-4-ys7vk             1/3     Completed   8 (60s ago)      2m45s

@BulkBeing
Copy link
Copy Markdown
Contributor

I was able to reproduce the error (similar to what @vaibhavtiwari33 did). The only possible scenario I could come up with is that the stream ends normally and gets recreated.

def _process_requests(
    self,
    context: NumaflowServicerContext,
    request_iterator: Iterable[map_pb2.MapRequest],
    result_queue: SyncIterator,
):
    _LOGGER.info("Starting _process_requests")
    try:
        # read through all incoming requests and submit to the
        # threadpool for invocation
        count = 0
        for request in request_iterator:
            count += 1
            _ = self.executor.submit(self._invoke_map, context, request, result_queue)
            if count > 5:
                _LOGGER.info("simulating stream end after %d requests", count)
                break
        # wait for all tasks to finish after all requests exhausted
        self.executor.shutdown(wait=True)
        # Indicate to the result queue that no more messages left to process
        result_queue.put(STREAM_EOF)

Error:

INFO:pynumaflow._constants:simulating stream end after 6 requests
2026-02-11 01:21:51 INFO     Mapper is invoked. context=<grpc._server._Context object at 0xffff8c512f50>, time=583.973137938
INFO:pynumaflow._constants:Mapper is invoked. context=<grpc._server._Context object at 0xffff8c512f50>, time=583.973137938
2026-02-11 01:21:51 INFO     Starting _process_requests
INFO:pynumaflow._constants:Starting _process_requests
2026-02-11 01:21:52 CRITICAL MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/flatmap/.venv/lib/python3.10/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 95, in _process_requests
    _ = self.executor.submit(self._invoke_map, context, request, result_queue)
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
CRITICAL:pynumaflow._constants:MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/flatmap/.venv/lib/python3.10/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 95, in _process_requests
    _ = self.executor.submit(self._invoke_map, context, request, result_queue)
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

@BulkBeing
Copy link
Copy Markdown
Contributor

Tested with current change. It is causing UDF to restart

INFO:pynumaflow._constants:simulating stream end after 3 requests (once)
INFO:pynumaflow._constants:Starting _process_requests
2026-02-11 05:39:42 INFO     Starting _process_requests
2026-02-11 05:39:43 CRITICAL MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/flatmap/.venv/lib/python3.10/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 93, in _process_requests
    _ = self.executor.submit(self._invoke_map, context, request, result_queue)
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
CRITICAL:pynumaflow._constants:MapFn Error, re-raising the error
Traceback (most recent call last):
  File "/opt/pysetup/examples/map/flatmap/.venv/lib/python3.10/site-packages/pynumaflow/mapper/_servicer/_sync_servicer.py", line 93, in _process_requests
    _ = self.executor.submit(self._invoke_map, context, request, result_queue)
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2026-02-11 05:39:43 INFO     Killing process: Got exception UDF_EXECUTION_ERROR(udf): RuntimeError('cannot schedule new futures after shutdown')
INFO:pynumaflow._constants:Killing process: Got exception UDF_EXECUTION_ERROR(udf): RuntimeError('cannot schedule new futures after shutdown')
Killed

@BulkBeing BulkBeing merged commit 255d20b into main Feb 11, 2026
2 checks passed
@BulkBeing BulkBeing deleted the fix-udf-exit branch February 11, 2026 09:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Map UDF stuck with error: cannot schedule new futures after shutdown

4 participants