Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions chronos/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ const
## using `AsyncProcessOption.EvalCommand` and API calls such as
## ``execCommand(command)`` and ``execCommandEx(command)``.

chronosEventsCount* {.intdefine.} = 64
chronosEventsCount* {.intdefine.} = 4096
## Number of OS poll events retrieved by syscall (epoll, kqueue, poll).

chronosInitialSize* {.intdefine.} = 64
chronosInitialSize* {.intdefine.} = 32
## Initial size of Selector[T]'s array of file descriptors.

chronosEventEngine* {.strdefine.}: string =
Expand Down
21 changes: 12 additions & 9 deletions chronos/internal/asyncengine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,9 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
var res = PDispatcher(
selector: selector,
timers: initHeapQueue[TimerCallback](),
callbacks: initDeque[AsyncCallback](chronosEventsCount),
callbacks: initDeque[AsyncCallback](chronosInitialSize),
idlers: initDeque[AsyncCallback](),
keys: newSeq[ReadyKey](chronosEventsCount),
keys: newSeq[ReadyKey](chronosInitialSize),
trackers: initTable[string, TrackerBase](),
counters: initTable[string, TrackerCounter]()
)
Expand Down Expand Up @@ -1026,12 +1026,8 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
loop.processTimersGetTimeout(curTimeout)

# Processing IO descriptors and all hardware events.
let count =
block:
let res = loop.selector.selectInto2(curTimeout, loop.keys)
if res.isErr():
raiseOsDefect(res.error(), "poll(): Unable to get OS events")
res.get()
let count = loop.selector.selectInto2(curTimeout, loop.keys).valueOr:
raiseOsDefect(error, "poll(): Unable to get OS events")

for i in 0 ..< count:
let fd = loop.keys[i].fd
Expand All @@ -1057,12 +1053,19 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
if not isNil(adata.reader.function):
loop.callbacks.addLast(adata.reader)

if count == loop.keys.len() and loop.keys.len() < chronosEventsCount:
# If we filled the event seq, it's likely that we could have fetched
# more events in a single call - fetching more events means less work
# since we don't have to poll as often under load and we can
# batch more work in a single event loop iteration.
loop.keys.setLen(min(loop.keys.len * 2, chronosEventsCount))

# Moving expired timers to `loop.callbacks`.
loop.processTimers()

# We move idle callbacks to `loop.callbacks` only if there no pending
# network events.
if count == 0:
if loop.keys.len == 0:
loop.processIdlers()

# We move tick callbacks to `loop.callbacks` always.
Expand Down
23 changes: 10 additions & 13 deletions chronos/ioselects/ioselectors_epoll.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type
virtualId: int32
childrenExited: bool
pendingEvents: Deque[ReadyKey]
queueEvents: seq[EpollEvent]

Selector*[T] = ref SelectorImpl[T]

Expand Down Expand Up @@ -97,7 +98,7 @@ proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] =
var nmask: Sigset
if sigemptyset(nmask) < 0:
return err(osLastError())
let epollFd = epoll_create(chronosEventsCount)
let epollFd = epoll_create(chronosInitialSize)
if epollFd < 0:
return err(osLastError())
let selector = Selector[T](
Expand All @@ -107,7 +108,8 @@ proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] =
virtualId: -1'i32, # Should start with -1, because `InvalidIdent` == -1
childrenExited: false,
virtualHoles: initDeque[int32](),
pendingEvents: initDeque[ReadyKey]()
pendingEvents: initDeque[ReadyKey](),
queueEvents: newSeq[EpollEvent](chronosInitialSize),
)
ok(selector)

Expand Down Expand Up @@ -518,11 +520,9 @@ proc prepareKey[T](s: Selector[T], event: EpollEvent): Opt[ReadyKey] =

if (event.events and EPOLLERR) != 0:
rkey.events.incl(Event.Error)
rkey.errorCode = oserrno.ECONNRESET

if (event.events and EPOLLHUP) != 0 or (event.events and EPOLLRDHUP) != 0:
rkey.events.incl(Event.Error)
rkey.errorCode = oserrno.ECONNRESET

if (event.events and EPOLLOUT) != 0:
rkey.events.incl(Event.Write)
Expand All @@ -537,7 +537,6 @@ proc prepareKey[T](s: Selector[T], event: EpollEvent): Opt[ReadyKey] =
let res = handleEintr(osdefs.read(fdi32, addr data, sizeof(uint64)))
if res != sizeof(uint64):
rkey.events.incl(Event.Error)
rkey.errorCode = osLastError()

elif Event.Signal in pkey.events:
var data: SignalFdInfo
Expand Down Expand Up @@ -582,15 +581,13 @@ proc prepareKey[T](s: Selector[T], event: EpollEvent): Opt[ReadyKey] =
return Opt.none(ReadyKey)
else:
rkey.events.incl({Event.User, Event.Error})
rkey.errorCode = errorCode
else:
rkey.events.incl(Event.User)

if Event.Oneshot in rkey.events:
if Event.Timer in rkey.events:
if epoll_ctl(s.epollFd, EPOLL_CTL_DEL, fdi32, nil) != 0:
rkey.events.incl(Event.Error)
rkey.errorCode = osLastError()
# we are marking key with `Finished` event, to avoid double decrease.
rkey.events.incl(Event.Finished)
pkey.events.incl(Event.Finished)
Expand Down Expand Up @@ -627,20 +624,20 @@ proc selectInto2*[T](s: Selector[T], timeout: int,
readyKeys: var openArray[ReadyKey]
): SelectResult[int] =
var
queueEvents: array[chronosEventsCount, EpollEvent]
k: int = 0

verifySelectParams(timeout, -1, int(high(cint)))

let
maxEventsCount = min(len(queueEvents), len(readyKeys))
maxEventsCount = len(readyKeys)
maxPendingEventsCount = min(maxEventsCount, len(s.pendingEvents))
maxNewEventsCount = max(maxEventsCount - maxPendingEventsCount, 0)

let
eventsCount =
if maxNewEventsCount > 0:
let res = handleEintr(epoll_wait(s.epollFd, addr(queueEvents[0]),
if maxNewEventsCount > s.queueEvents.len:
s.queueEvents.setLen(maxNewEventsCount)

let res = handleEintr(epoll_wait(s.epollFd, addr(s.queueEvents[0]),
cint(maxNewEventsCount),
cint(timeout)))
if res < 0:
Expand All @@ -652,7 +649,7 @@ proc selectInto2*[T](s: Selector[T], timeout: int,
s.childrenExited = false

for i in 0 ..< eventsCount:
let rkey = s.prepareKey(queueEvents[i]).valueOr: continue
let rkey = s.prepareKey(s.queueEvents[i]).valueOr: continue
readyKeys[k] = rkey
inc(k)

Expand Down
19 changes: 9 additions & 10 deletions chronos/ioselects/ioselectors_kqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type
fds: Table[int32, SelectorKey[T]]
virtualHoles: Deque[int32]
virtualId: int32
queueEvents: seq[KEvent]

Selector*[T] = ref SelectorImpl[T]

Expand Down Expand Up @@ -112,7 +113,8 @@ proc new*(t: typedesc[Selector], T: typedesc): SelectResult[Selector[T]] =
kqFd: kqFd,
fds: initTable[int32, SelectorKey[T]](chronosInitialSize),
virtualId: -1'i32, # Should start with -1, because `InvalidIdent` == -1
virtualHoles: initDeque[int32]()
virtualHoles: initDeque[int32](),
queueEvents: newSeq[KEvent](chronosInitialSize)
)
ok(selector)

Expand Down Expand Up @@ -497,7 +499,6 @@ proc prepareKey[T](s: Selector[T], event: KEvent): Opt[ReadyKey] =
of EVFILT_READ:
if (event.flags and EV_EOF) != 0:
rkey.events.incl(Event.Error)
rkey.errorCode = oserrno.ECONNRESET

if Event.User in pkey.events:
var data: uint64 = 0
Expand All @@ -509,15 +510,13 @@ proc prepareKey[T](s: Selector[T], event: KEvent): Opt[ReadyKey] =
return Opt.none(ReadyKey)
else:
rkey.events.incl(Event.Error)
rkey.errorCode = errorCode
rkey.events.incl(Event.User)
else:
rkey.events.incl(Event.Read)

of EVFILT_WRITE:
if (event.flags and EV_EOF) != 0:
rkey.events.incl(Event.Error)
rkey.errorCode = oserrno.ECONNRESET

rkey.events.incl(Event.Write)

Expand Down Expand Up @@ -557,12 +556,12 @@ proc prepareKey[T](s: Selector[T], event: KEvent): Opt[ReadyKey] =
proc selectInto2*[T](s: Selector[T], timeout: int,
readyKeys: var openArray[ReadyKey]
): SelectResult[int] =
var
tv: Timespec
queueEvents: array[chronosEventsCount, KEvent]
var tv: Timespec

verifySelectParams(timeout, -1, high(int))

if readyKeys.len() > s.queueEvents.len():
s.queueEvents.setLen(readyKeys.len())
let
ptrTimeout =
if timeout != -1:
Expand All @@ -575,12 +574,12 @@ proc selectInto2*[T](s: Selector[T], timeout: int,
addr tv
else:
nil
maxEventsCount = cint(min(chronosEventsCount, len(readyKeys)))
maxEventsCount = cint(len(readyKeys))
eventsCount =
block:
var res = 0
while true:
res = kevent(s.kqFd, nil, cint(0), addr(queueEvents[0]),
res = kevent(s.kqFd, nil, cint(0), addr(s.queueEvents[0]),
maxEventsCount, ptrTimeout)
if res < 0:
let errorCode = osLastError()
Expand All @@ -593,7 +592,7 @@ proc selectInto2*[T](s: Selector[T], timeout: int,

var k = 0
for i in 0 ..< eventsCount:
let rkey = s.prepareKey(queueEvents[i]).valueOr: continue
let rkey = s.prepareKey(s.queueEvents[i]).valueOr: continue
readyKeys[k] = rkey
inc(k)

Expand Down
1 change: 0 additions & 1 deletion chronos/ioselects/ioselectors_poll.nim
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ proc prepareKey[T](s: Selector[T], event: var TPollfd): Opt[ReadyKey] =
return Opt.none(ReadyKey)
else:
rkey.events.incl({Event.User, Event.Error})
rkey.errorCode = errorCode
else:
rkey.events.incl(Event.User)
else:
Expand Down
3 changes: 0 additions & 3 deletions chronos/selectors2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ when defined(nimdoc):
## An object which holds result for descriptor
fd* : int ## file/socket descriptor
events*: set[Event] ## set of events
errorCode*: OSErrorCode ## additional error code information for
## Error events

SelectEvent* = object
## An object which holds user defined event
Expand Down Expand Up @@ -238,7 +236,6 @@ else:
ReadyKey* = object
fd* : int
events*: set[Event]
errorCode*: OSErrorCode

SelectorKey[T] = object
ident: int
Expand Down
Loading