44"""
55
66from __future__ import annotations
7- import asyncio
87import logging
98
9+ from threading import Timer
1010from typing import Optional
1111
1212from microsoft_agents .hosting .core import TurnContext
@@ -20,60 +20,36 @@ class TypingIndicator:
2020 Encapsulates the logic for sending "typing" activity to the user.
2121 """
2222
23- def __init__ (self , intervalSeconds = 1 ) -> None :
24- self ._intervalSeconds = intervalSeconds
25- self ._task : Optional [asyncio .Task ] = None
26- self ._running : bool = False
27- self ._lock = asyncio .Lock ()
23+ _interval : int
24+ _timer : Optional [Timer ] = None
2825
29- async def start (self , context : TurnContext ) -> None :
30- async with self ._lock :
31- if self ._running :
32- return
26+ def __init__ (self , interval = 1000 ) -> None :
27+ self ._interval = interval
3328
34- logger .debug (
35- f"Starting typing indicator with interval: { self ._intervalSeconds } seconds"
36- )
37- self ._running = True
38- self ._task = asyncio .create_task (self ._typing_loop (context ))
29+ async def start (self , context : TurnContext ) -> None :
30+ if self ._timer is not None :
31+ return
3932
40- async def stop (self ) -> None :
41- async with self ._lock :
42- if not self ._running :
43- return
33+ logger .debug (f"Starting typing indicator with interval: { self ._interval } ms" )
34+ func = self ._on_timer (context )
35+ self ._timer = Timer (self ._interval , func )
36+ self ._timer .start ()
37+ await func ()
4438
39+ def stop (self ) -> None :
40+ if self ._timer :
4541 logger .debug ("Stopping typing indicator" )
46- self ._running = False
47- task = self ._task
48- self ._task = None
42+ self ._timer .cancel ()
43+ self ._timer = None
4944
50- # Cancel outside the lock to avoid blocking
51- if task and not task .done ():
52- task .cancel ()
45+ def _on_timer (self , context : TurnContext ):
46+ async def __call__ ():
5347 try :
54- await task
55- except asyncio .CancelledError :
56- pass
57-
58- async def _typing_loop (self , context : TurnContext ):
59- """Continuously send typing indicators at the specified interval."""
60- try :
61- while True :
62- # Check running status under lock
63- async with self ._lock :
64- if not self ._running :
65- break
66-
67- try :
68- logger .debug ("Sending typing activity" )
69- await context .send_activity (Activity (type = ActivityTypes .typing ))
70- except Exception as e :
71- logger .error (f"Error sending typing activity: { e } " )
72- async with self ._lock :
73- self ._running = False
74- break
75-
76- await asyncio .sleep (self ._intervalSeconds )
77- except asyncio .CancelledError :
78- logger .debug ("Typing indicator loop cancelled" )
79- raise
48+ logger .debug ("Sending typing activity" )
49+ await context .send_activity (Activity (type = ActivityTypes .typing ))
50+ except Exception as e :
51+ # TODO: Improve when adding logging
52+ logger .error (f"Error sending typing activity: { e } " )
53+ self .stop ()
54+
55+ return __call__
0 commit comments