From be0de2b808843afb0adaeb32ada32d3d46d31146 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 6 Apr 2026 09:01:04 +0200 Subject: [PATCH 1/2] selector: dynamically adjust event count When fetching events with epoll/kqueue, we currently use a fixed size array to fetch system events. Taking an idea from [libevent](https://github.com/libevent/libevent/blob/48296514d8fd9c0b3812b11d45ad80b0c002c14e/epoll.c#L568), we can instead dynamically grow the number of events fetched as the system comes under load thus reducing the number of poll calls, allowing more work to be done per batch. The change also has the effect that data that already arrived at the time of the poll call gets processed before timers are fired - this means that timeouts are less likely to be triggered unfairly due to event queueing order and the time it takes to process each event. * increase max events per loop to 4096 (same as libevent) * reduce initial event list allocation to 32 * get rid of `ReadyKey.errorCode` which is unused --- chronos/config.nim | 4 ++-- chronos/internal/asyncengine.nim | 21 ++++++++++++--------- chronos/ioselects/ioselectors_epoll.nim | 23 ++++++++++------------- chronos/ioselects/ioselectors_kqueue.nim | 17 ++++++++--------- chronos/ioselects/ioselectors_poll.nim | 1 - chronos/selectors2.nim | 3 --- 6 files changed, 32 insertions(+), 37 deletions(-) diff --git a/chronos/config.nim b/chronos/config.nim index 26d110f1b..877faa086 100644 --- a/chronos/config.nim +++ b/chronos/config.nim @@ -65,10 +65,10 @@ const ## using `AsyncProcessOption.EvalCommand` and API calls such as ## ``execCommand(command)`` and ``execCommandEx(command)``. - chronosEventsCount* {.intdefine.} = 64 + chronosEventsCount* {.intdefine.} = 4096 ## Number of OS poll events retrieved by syscall (epoll, kqueue, poll). - chronosInitialSize* {.intdefine.} = 64 + chronosInitialSize* {.intdefine.} = 32 ## Initial size of Selector[T]'s array of file descriptors. chronosEventEngine* {.strdefine.}: string = diff --git a/chronos/internal/asyncengine.nim b/chronos/internal/asyncengine.nim index 45197a038..aeb560e87 100644 --- a/chronos/internal/asyncengine.nim +++ b/chronos/internal/asyncengine.nim @@ -752,9 +752,9 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or var res = PDispatcher( selector: selector, timers: initHeapQueue[TimerCallback](), - callbacks: initDeque[AsyncCallback](chronosEventsCount), + callbacks: initDeque[AsyncCallback](chronosInitialSize), idlers: initDeque[AsyncCallback](), - keys: newSeq[ReadyKey](chronosEventsCount), + keys: newSeq[ReadyKey](chronosInitialSize), trackers: initTable[string, TrackerBase](), counters: initTable[string, TrackerCounter]() ) @@ -1026,12 +1026,8 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or loop.processTimersGetTimeout(curTimeout) # Processing IO descriptors and all hardware events. - let count = - block: - let res = loop.selector.selectInto2(curTimeout, loop.keys) - if res.isErr(): - raiseOsDefect(res.error(), "poll(): Unable to get OS events") - res.get() + let count = loop.selector.selectInto2(curTimeout, loop.keys).valueOr: + raiseOsDefect(error, "poll(): Unable to get OS events") for i in 0 ..< count: let fd = loop.keys[i].fd @@ -1057,12 +1053,19 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or if not isNil(adata.reader.function): loop.callbacks.addLast(adata.reader) + if count == loop.keys.len() and loop.keys.len() < chronosEventsCount: + # If we filled the event seq, it's likely that we could have fetched + # more events in a single call - fetching more events means less work + # since we don't have to poll as often under load and we can + # batch more work in a single event loop iteration. + loop.keys.setLen(min(loop.keys.len * 2, chronosEventsCount)) + # Moving expired timers to `loop.callbacks`. loop.processTimers() # We move idle callbacks to `loop.callbacks` only if there no pending # network events. - if count == 0: + if loop.keys.len == 0: loop.processIdlers() # We move tick callbacks to `loop.callbacks` always. diff --git a/chronos/ioselects/ioselectors_epoll.nim b/chronos/ioselects/ioselectors_epoll.nim index 2156a390c..95a09f778 100644 --- a/chronos/ioselects/ioselectors_epoll.nim +++ b/chronos/ioselects/ioselectors_epoll.nim @@ -26,6 +26,7 @@ type virtualId: int32 childrenExited: bool pendingEvents: Deque[ReadyKey] + queueEvents: seq[EpollEvent] Selector*[T] = ref SelectorImpl[T] @@ -97,7 +98,7 @@ proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] = var nmask: Sigset if sigemptyset(nmask) < 0: return err(osLastError()) - let epollFd = epoll_create(chronosEventsCount) + let epollFd = epoll_create(chronosInitialSize) if epollFd < 0: return err(osLastError()) let selector = Selector[T]( @@ -107,7 +108,8 @@ proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] = virtualId: -1'i32, # Should start with -1, because `InvalidIdent` == -1 childrenExited: false, virtualHoles: initDeque[int32](), - pendingEvents: initDeque[ReadyKey]() + pendingEvents: initDeque[ReadyKey](), + queueEvents: newSeq[EpollEvent](chronosInitialSize), ) ok(selector) @@ -518,11 +520,9 @@ proc prepareKey[T](s: Selector[T], event: EpollEvent): Opt[ReadyKey] = if (event.events and EPOLLERR) != 0: rkey.events.incl(Event.Error) - rkey.errorCode = oserrno.ECONNRESET if (event.events and EPOLLHUP) != 0 or (event.events and EPOLLRDHUP) != 0: rkey.events.incl(Event.Error) - rkey.errorCode = oserrno.ECONNRESET if (event.events and EPOLLOUT) != 0: rkey.events.incl(Event.Write) @@ -537,7 +537,6 @@ proc prepareKey[T](s: Selector[T], event: EpollEvent): Opt[ReadyKey] = let res = handleEintr(osdefs.read(fdi32, addr data, sizeof(uint64))) if res != sizeof(uint64): rkey.events.incl(Event.Error) - rkey.errorCode = osLastError() elif Event.Signal in pkey.events: var data: SignalFdInfo @@ -582,7 +581,6 @@ proc prepareKey[T](s: Selector[T], event: EpollEvent): Opt[ReadyKey] = return Opt.none(ReadyKey) else: rkey.events.incl({Event.User, Event.Error}) - rkey.errorCode = errorCode else: rkey.events.incl(Event.User) @@ -590,7 +588,6 @@ proc prepareKey[T](s: Selector[T], event: EpollEvent): Opt[ReadyKey] = if Event.Timer in rkey.events: if epoll_ctl(s.epollFd, EPOLL_CTL_DEL, fdi32, nil) != 0: rkey.events.incl(Event.Error) - rkey.errorCode = osLastError() # we are marking key with `Finished` event, to avoid double decrease. rkey.events.incl(Event.Finished) pkey.events.incl(Event.Finished) @@ -627,20 +624,20 @@ proc selectInto2*[T](s: Selector[T], timeout: int, readyKeys: var openArray[ReadyKey] ): SelectResult[int] = var - queueEvents: array[chronosEventsCount, EpollEvent] k: int = 0 verifySelectParams(timeout, -1, int(high(cint))) let - maxEventsCount = min(len(queueEvents), len(readyKeys)) + maxEventsCount = len(readyKeys) maxPendingEventsCount = min(maxEventsCount, len(s.pendingEvents)) maxNewEventsCount = max(maxEventsCount - maxPendingEventsCount, 0) - - let eventsCount = if maxNewEventsCount > 0: - let res = handleEintr(epoll_wait(s.epollFd, addr(queueEvents[0]), + if maxNewEventsCount > s.queueEvents.len: + s.queueEvents.setLen(maxNewEventsCount) + + let res = handleEintr(epoll_wait(s.epollFd, addr(s.queueEvents[0]), cint(maxNewEventsCount), cint(timeout))) if res < 0: @@ -652,7 +649,7 @@ proc selectInto2*[T](s: Selector[T], timeout: int, s.childrenExited = false for i in 0 ..< eventsCount: - let rkey = s.prepareKey(queueEvents[i]).valueOr: continue + let rkey = s.prepareKey(s.queueEvents[i]).valueOr: continue readyKeys[k] = rkey inc(k) diff --git a/chronos/ioselects/ioselectors_kqueue.nim b/chronos/ioselects/ioselectors_kqueue.nim index e39f96892..9556aa09b 100644 --- a/chronos/ioselects/ioselectors_kqueue.nim +++ b/chronos/ioselects/ioselectors_kqueue.nim @@ -25,6 +25,7 @@ type fds: Table[int32, SelectorKey[T]] virtualHoles: Deque[int32] virtualId: int32 + queueEvents: seq[KEvent] Selector*[T] = ref SelectorImpl[T] @@ -112,7 +113,8 @@ proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] = kqFd: kqFd, fds: initTable[int32, SelectorKey[T]](chronosInitialSize), virtualId: -1'i32, # Should start with -1, because `InvalidIdent` == -1 - virtualHoles: initDeque[int32]() + virtualHoles: initDeque[int32](), + queueEvents: newSeq[KEvent](chronosInitialSize) ) ok(selector) @@ -497,7 +499,6 @@ proc prepareKey[T](s: Selector[T], event: KEvent): Opt[ReadyKey] = of EVFILT_READ: if (event.flags and EV_EOF) != 0: rkey.events.incl(Event.Error) - rkey.errorCode = oserrno.ECONNRESET if Event.User in pkey.events: var data: uint64 = 0 @@ -509,7 +510,6 @@ proc prepareKey[T](s: Selector[T], event: KEvent): Opt[ReadyKey] = return Opt.none(ReadyKey) else: rkey.events.incl(Event.Error) - rkey.errorCode = errorCode rkey.events.incl(Event.User) else: rkey.events.incl(Event.Read) @@ -517,7 +517,6 @@ proc prepareKey[T](s: Selector[T], event: KEvent): Opt[ReadyKey] = of EVFILT_WRITE: if (event.flags and EV_EOF) != 0: rkey.events.incl(Event.Error) - rkey.errorCode = oserrno.ECONNRESET rkey.events.incl(Event.Write) @@ -557,12 +556,12 @@ proc prepareKey[T](s: Selector[T], event: KEvent): Opt[ReadyKey] = proc selectInto2*[T](s: Selector[T], timeout: int, readyKeys: var openArray[ReadyKey] ): SelectResult[int] = - var - tv: Timespec - queueEvents: array[chronosEventsCount, KEvent] + var tv: Timespec verifySelectParams(timeout, -1, high(int)) + if readyKeys.len() > s.queueEvents.len(): + s.queueEvents.setLen(readyKeys.len()) let ptrTimeout = if timeout != -1: @@ -575,12 +574,12 @@ proc selectInto2*[T](s: Selector[T], timeout: int, addr tv else: nil - maxEventsCount = cint(min(chronosEventsCount, len(readyKeys))) + maxEventsCount = cint(len(readyKeys)) eventsCount = block: var res = 0 while true: - res = kevent(s.kqFd, nil, cint(0), addr(queueEvents[0]), + res = kevent(s.kqFd, nil, cint(0), addr(s.queueEvents[0]), maxEventsCount, ptrTimeout) if res < 0: let errorCode = osLastError() diff --git a/chronos/ioselects/ioselectors_poll.nim b/chronos/ioselects/ioselectors_poll.nim index 49e3650fd..44db7772e 100644 --- a/chronos/ioselects/ioselectors_poll.nim +++ b/chronos/ioselects/ioselectors_poll.nim @@ -198,7 +198,6 @@ proc prepareKey[T](s: Selector[T], event: var TPollfd): Opt[ReadyKey] = return Opt.none(ReadyKey) else: rkey.events.incl({Event.User, Event.Error}) - rkey.errorCode = errorCode else: rkey.events.incl(Event.User) else: diff --git a/chronos/selectors2.nim b/chronos/selectors2.nim index db8791a59..48ec4fd07 100644 --- a/chronos/selectors2.nim +++ b/chronos/selectors2.nim @@ -64,8 +64,6 @@ when defined(nimdoc): ## An object which holds result for descriptor fd* : int ## file/socket descriptor events*: set[Event] ## set of events - errorCode*: OSErrorCode ## additional error code information for - ## Error events SelectEvent* = object ## An object which holds user defined event @@ -238,7 +236,6 @@ else: ReadyKey* = object fd* : int events*: set[Event] - errorCode*: OSErrorCode SelectorKey[T] = object ident: int From afcb5e24b5445f7c21e0e397508d3084f6450781 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 6 Apr 2026 11:46:17 +0200 Subject: [PATCH 2/2] oops --- chronos/ioselects/ioselectors_kqueue.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chronos/ioselects/ioselectors_kqueue.nim b/chronos/ioselects/ioselectors_kqueue.nim index 9556aa09b..a10745609 100644 --- a/chronos/ioselects/ioselectors_kqueue.nim +++ b/chronos/ioselects/ioselectors_kqueue.nim @@ -592,7 +592,7 @@ proc selectInto2*[T](s: Selector[T], timeout: int, var k = 0 for i in 0 ..< eventsCount: - let rkey = s.prepareKey(queueEvents[i]).valueOr: continue + let rkey = s.prepareKey(s.queueEvents[i]).valueOr: continue readyKeys[k] = rkey inc(k)