Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions chronos/asyncsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
{.push raises: [].}

import std/[sequtils, math, deques, tables, typetraits]
import ./asyncloop
import ./asyncloop, ./shutdown
export asyncloop

type
Expand Down Expand Up @@ -247,12 +247,14 @@ proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} =
(len(aq.queue) == 0)

proc addFirstImpl[T](aq: AsyncQueue[T], item: T) =
aq.queue.addFirst(item)
aq.getters.wakeupNext()
if not isShutdownInProgress():
aq.queue.addFirst(item)
aq.getters.wakeupNext()

proc addLastImpl[T](aq: AsyncQueue[T], item: T) =
aq.queue.addLast(item)
aq.getters.wakeupNext()
if not isShutdownInProgress():
aq.queue.addLast(item)
aq.getters.wakeupNext()

proc popFirstImpl[T](aq: AsyncQueue[T]): T =
let res = aq.queue.popFirst()
Expand Down
5 changes: 4 additions & 1 deletion chronos/handles.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

{.push raises: [].}

import "."/[asyncloop, osdefs, osutils]
import "."/[asyncloop, osdefs, osutils, shutdown]
import results
from nativesockets import Domain, Protocol, SockType, toInt
export Domain, Protocol, SockType, results
Expand Down Expand Up @@ -129,6 +129,9 @@ proc createAsyncSocket2*(domain: Domain, sockType: SockType,
protocol: Protocol,
inherit = true): Result[AsyncFD, OSErrorCode] =
## Creates new asynchronous socket.

checkShutdownInProgress()

when defined(windows):
let flags =
if inherit:
Expand Down
71 changes: 62 additions & 9 deletions chronos/internal/asyncengine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from nativesockets import Port
import std/[tables, heapqueue, deques]
import results
import ".."/[config, futures, osdefs, oserrno, osutils, timer]
import ".."/[config, futures, osdefs, oserrno, osutils, timer, shutdown]

import ./[asyncmacro, errors]

Expand Down Expand Up @@ -64,6 +64,7 @@ type
ticks*: Deque[AsyncCallback]
trackers*: Table[string, TrackerBase]
counters*: Table[string, TrackerCounter]
networkEventsCount*: int

proc sentinelCallbackImpl(arg: pointer) {.gcsafe, noreturn.} =
raiseAssert "Sentinel callback MUST not be scheduled"
Expand Down Expand Up @@ -185,7 +186,7 @@ when defined(nimdoc):
## Perform single asynchronous step, processing timers and completing
## tasks. Blocks until at least one event has completed.
##
## Exceptions raised during `async` task exection are stored as outcome
## Exceptions raised during `async` task exception are stored as outcome
## in the corresponding `Future` - `poll` itself does not raise.

proc register2*(fd: AsyncFD): Result[void, OSErrorCode] = discard
Expand Down Expand Up @@ -327,6 +328,9 @@ elif defined(windows):
if port == osdefs.INVALID_HANDLE_VALUE:
raiseOsDefect(osLastError(), "newDispatcher(): Unable to create " &
"IOCP port")

initShutdownInProgressToFalse() ## defined in shutdown.nim

var res = PDispatcher(
ioPort: port,
handles: initHashSet[AsyncFD](),
Expand All @@ -353,6 +357,8 @@ elif defined(windows):

proc register2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Register file descriptor ``fd`` in thread's dispatcher.
checkShutdownInProgress()

let loop = getThreadDispatcher()
if createIoCompletionPort(HANDLE(fd), loop.ioPort, cast[CompletionKey](fd),
1) == osdefs.INVALID_HANDLE_VALUE:
Expand Down Expand Up @@ -404,6 +410,9 @@ elif defined(windows):
##
## NOTE: This is private procedure, not supposed to be publicly available,
## please use ``waitForSingleObject()``.

checkShutdownInProgress()

let loop = getThreadDispatcher()
var ovl = RefCustomOverlapped(data: CompletionData(cb: cb))

Expand Down Expand Up @@ -457,7 +466,11 @@ elif defined(windows):
## Registers callback ``cb`` to be called when process with process
## identifier ``pid`` exited. Returns process identifier, which can be
## used to clear process callback via ``removeProcess``.

checkShutdownInProgress()

doAssert(pid > 0, "Process identifier must be positive integer")

let
hProcess = openProcess(SYNCHRONIZE, WINBOOL(0), DWORD(pid))
flags = WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE
Expand Down Expand Up @@ -533,6 +546,9 @@ elif defined(windows):
##
## NOTE: On Windows only subset of signals are supported: SIGINT, SIGTERM,
## SIGQUIT

checkShutdownInProgress()

const supportedSignals = [SIGINT, SIGTERM, SIGQUIT]
doAssert(cint(signal) in supportedSignals, "Signal is not supported")
let loop = getThreadDispatcher()
Expand Down Expand Up @@ -598,7 +614,7 @@ elif defined(windows):
# Moving expired timers to `loop.callbacks` and calculate timeout
loop.processTimersGetTimeout(curTimeout)

let networkEventsCount =
loop.networkEventsCount =
if isNil(loop.getQueuedCompletionStatusEx):
let res = getQueuedCompletionStatus(
loop.ioPort,
Expand Down Expand Up @@ -635,7 +651,7 @@ elif defined(windows):
else:
int(eventsReceived)

for i in 0 ..< networkEventsCount:
for i in 0 ..< loop.networkEventsCount:
var customOverlapped = PtrCustomOverlapped(events[i].lpOverlapped)
customOverlapped.data.errCode =
block:
Expand All @@ -654,7 +670,7 @@ elif defined(windows):

# We move idle callbacks to `loop.callbacks` only if there no pending
# network events.
if networkEventsCount == 0:
if loop.networkEventsCount == 0:
loop.processIdlers()

# We move tick callbacks to `loop.callbacks` always.
Expand Down Expand Up @@ -697,6 +713,19 @@ elif defined(windows):
if not(isNil(aftercb)):
loop.callbacks.addLast(AsyncCallback(function: aftercb, udata: param))

proc safeCloseHandle(h: HANDLE): Result[void, string] =
let res = closeHandle(h)
if res == 0: # WINBOOL FALSE
return err("Failed to close handle error code: " & osErrorMsg(osLastError()))
ok()

proc closeDispatcher*(loop: PDispatcher): Result[void, string] =
? safeCloseHandle(loop.ioPort)
for i in loop.handles.items:
closeHandle(i)
loop.handles.clear()
ok()

proc unregisterAndCloseFd*(fd: AsyncFD): Result[void, OSErrorCode] =
## Unregister from system queue and close asynchronous socket.
##
Expand Down Expand Up @@ -749,6 +778,8 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
"Could not initialize selector")
res.get()

initShutdownInProgressToFalse() ## defined in shutdown.nim

var res = PDispatcher(
selector: selector,
timers: initHeapQueue[TimerCallback](),
Expand Down Expand Up @@ -777,6 +808,7 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or

proc register2*(fd: AsyncFD): Result[void, OSErrorCode] =
## Register file descriptor ``fd`` in thread's dispatcher.
checkShutdownInProgress()
var data: SelectorData
getThreadDispatcher().selector.registerHandle2(cint(fd), {}, data)

Expand All @@ -788,6 +820,8 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
udata: pointer = nil): Result[void, OSErrorCode] =
## Start watching the file descriptor ``fd`` for read availability and then
## call the callback ``cb`` with specified argument ``udata``.
checkShutdownInProgress()

let loop = getThreadDispatcher()
var newEvents = {Event.Read}
withData(loop.selector, cint(fd), adata) do:
Expand Down Expand Up @@ -816,6 +850,7 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
udata: pointer = nil): Result[void, OSErrorCode] =
## Start watching the file descriptor ``fd`` for write availability and then
## call the callback ``cb`` with specified argument ``udata``.
checkShutdownInProgress()
let loop = getThreadDispatcher()
var newEvents = {Event.Write}
withData(loop.selector, cint(fd), adata) do:
Expand Down Expand Up @@ -936,6 +971,14 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
## You can execute ``aftercb`` before actual socket close operation.
closeSocket(fd, aftercb)

proc closeDispatcher*(loop: PDispatcher): Result[void, string] =
## Close selector associated with current thread's dispatcher.
try:
loop.selector.close()
except IOSelectorsException as e:
return err("Exception in closeDispatcher: " & e.msg)
ok()

when chronosEventEngine in ["epoll", "kqueue"]:
type
ProcessHandle* = distinct int
Expand All @@ -950,6 +993,8 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
## callback ``cb`` with specified argument ``udata``. Returns signal
## identifier code, which can be used to remove signal callback
## via ``removeSignal``.
checkShutdownInProgress()

let loop = getThreadDispatcher()
var data: SelectorData
let sigfd = ? loop.selector.registerSignal(signal, data)
Expand All @@ -967,6 +1012,8 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
## Registers callback ``cb`` to be called when process with process
## identifier ``pid`` exited. Returns process' descriptor, which can be
## used to clear process callback via ``removeProcess``.
checkShutdownInProgress()

let loop = getThreadDispatcher()
var data: SelectorData
let procfd = ? loop.selector.registerProcess(pid, data)
Expand Down Expand Up @@ -1026,14 +1073,14 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
loop.processTimersGetTimeout(curTimeout)

# Processing IO descriptors and all hardware events.
let count =
loop.networkEventsCount =
block:
let res = loop.selector.selectInto2(curTimeout, loop.keys)
if res.isErr():
raiseOsDefect(res.error(), "poll(): Unable to get OS events")
res.get()

for i in 0 ..< count:
for i in 0 ..< loop.networkEventsCount:
let fd = loop.keys[i].fd
let events = loop.keys[i].events

Expand Down Expand Up @@ -1062,7 +1109,7 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or

# We move idle callbacks to `loop.callbacks` only if there no pending
# network events.
if count == 0:
if loop.networkEventsCount == 0:
loop.processIdlers()

# We move tick callbacks to `loop.callbacks` always.
Expand Down Expand Up @@ -1104,8 +1151,13 @@ proc setTimer*(at: Moment, cb: CallbackFunc,
udata: pointer = nil): TimerCallback =
## Arrange for the callback ``cb`` to be called at the given absolute
## timestamp ``at``. You can also pass ``udata`` to callback.
let finishAt = if isShutdownInProgress():
## Schedule timer to now so it will be executed ASAP.
Moment.now()
else:
at
let loop = getThreadDispatcher()
result = TimerCallback(finishAt: at,
result = TimerCallback(finishAt: finishAt,
function: AsyncCallback(function: cb, udata: udata))
loop.timers.push(result)

Expand Down Expand Up @@ -1206,6 +1258,7 @@ proc internalCallTick*(cbproc: CallbackFunc) =
proc runForever*() =
## Begins a never ending global dispatcher poll loop.
## Raises different exceptions depending on the platform.
## It is expected to run for the entire lifetime of the process.
while true:
poll()

Expand Down
21 changes: 19 additions & 2 deletions chronos/internal/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import std/[sequtils, macros]
import stew/base10

import ./[asyncengine, raisesfutures]
import ../[config, futures]
import ../[config, futures, shutdown]

export
raisesfutures.Raising, raisesfutures.InternalRaisesFuture,
Expand Down Expand Up @@ -629,7 +629,10 @@ proc pollFor[F: Future | InternalRaisesFuture](fut: F): F {.raises: [].} =
#
# Must not be called recursively (from inside `async` procedures).
#
# See alse `awaitne`.
# The process may eventually start another thread after this loop's completion.
# Therefore, the dispatcher's resources are cleaned up in the end.
#
# See also `awaitne`.
if not(fut.finished()):
var finished = false
# Ensure that callbacks currently scheduled on the future run before returning
Expand All @@ -641,6 +644,20 @@ proc pollFor[F: Future | InternalRaisesFuture](fut: F): F {.raises: [].} =

fut

proc gracefulShutdown*(): Result[void, string] {.raises: [].} =
## Continues polling the dispatcher until shutdown completion, then
## performs final cleanup of all dispatcher resources.
##
## This routine shall be called only after `pollFor` has completed. Upon
## invocation, all streams are assumed to have been closed.

setShutdownInProgress()
let disp = getThreadDispatcher()
while disp.networkEventsCount > 0:
poll()

? disp.closeDispatcher()

proc waitFor*[T: not void](fut: Future[T]): lent T {.raises: [CatchableError].} =
## Blocks the current thread of execution until `fut` has finished, returning
## its value.
Expand Down
Loading