Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 11 additions & 40 deletions library/libsds.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
when defined(linux):
{.passl: "-Wl,-soname,libsds.so".}

import std/[typetraits, tables, atomics, locks], chronos, chronicles
import std/[typetraits, tables, atomics], chronos, chronicles
import
./sds_thread/sds_thread,
./alloc,
Expand Down Expand Up @@ -57,29 +57,6 @@ template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)

var
ctxPool: seq[ptr SdsContext]
ctxPoolLock: Lock

proc acquireCtx(callback: SdsCallBack, userData: pointer): ptr SdsContext =
ctxPoolLock.acquire()
defer: ctxPoolLock.release()
if ctxPool.len > 0:
result = ctxPool.pop()
else:
result = sds_thread.createSdsThread().valueOr:
let msg = "Error in createSdsThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil

proc releaseCtx(ctx: ptr SdsContext) =
ctxPoolLock.acquire()
defer: ctxPoolLock.release()
ctx.userData = nil
ctx.eventCallback = nil
ctx.eventUserData = nil
ctxPool.add(ctx)

proc handleRequest(
ctx: ptr SdsContext,
requestType: RequestType,
Expand Down Expand Up @@ -140,7 +117,6 @@ proc initializeLibrary() {.exported.} =
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
libsdsNimMain()
ctxPoolLock.initLock() # ensure the lock is initialized once (fix Windows crash)
when declared(setupForeignThreadGc):
setupForeignThreadGc()
when declared(nimGC_setStackBottom):
Expand All @@ -164,9 +140,10 @@ proc SdsNewReliabilityManager(
echo "error: missing callback in NewReliabilityManager"
return nil

## Create or reuse the SDS thread that will keep waiting for req from the main thread.
var ctx = acquireCtx(callback, userData)
if ctx.isNil():
## Create the SDS thread that will keep waiting for req from the main thread.
var ctx = sds_thread.createSdsThread().valueOr:
let msg = "Error in createSdsThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil

ctx.userData = userData
Expand Down Expand Up @@ -206,20 +183,14 @@ proc SdsCleanupReliabilityManager(
initializeLibrary()
checkLibsdsParams(ctx, callback, userData)

let resetRes = handleRequest(
ctx,
RequestType.LIFECYCLE,
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER),
callback,
userData,
)

if resetRes == RET_ERR:
sds_thread.destroySdsThread(ctx).isOkOr:
let msg = "libsds error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

releaseCtx(ctx)
## always need to invoke the callback although we don't retrieve value to the caller
callback(RET_OK, nil, 0, userData)

# handleRequest already invoked the callback; nothing else to signal here.
return RET_OK

proc SdsResetReliabilityManager(
Expand Down Expand Up @@ -352,4 +323,4 @@ proc SdsStartPeriodicTasks(
)

### End of exported procs
################################################################################
################################################################################
27 changes: 23 additions & 4 deletions library/sds_thread/sds_thread.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single,
import
../ffi_types,
./inter_thread_communication/sds_thread_request,
../../src/[reliability_utils]
../alloc,
../../src/[reliability_utils],
./shutdown

type SdsContext* = object
thread: Thread[(ptr SdsContext)]
Expand All @@ -21,6 +23,7 @@ type SdsContext* = object
eventCallback*: pointer
eventUserdata*: pointer
running: Atomic[bool] # To control when the thread is running
threadErrorMsg: cstring # to store any error message from the thread

proc runSds(ctx: ptr SdsContext) {.async.} =
## This is the worker body. This runs the SDS instance
Expand Down Expand Up @@ -52,6 +55,18 @@ proc run(ctx: ptr SdsContext) {.thread.} =
## Launch sds worker
waitFor runSds(ctx)

ctx.reqSignal.close().isOkOr:
ctx.threadErrorMsg = alloc("error closing reqSignal: " & $error)
return

ctx.reqReceivedSignal.close().isOkOr:
ctx.threadErrorMsg = alloc("error closing reqReceivedSignal: " & $error)
return

shutdown().isOkOr:
ctx.threadErrorMsg = alloc("error calling shutdown: " & $error)
return

proc createSdsThread*(): Result[ptr SdsContext, string] =
## This proc is called from the main thread and it creates
## the SDS working thread.
Expand Down Expand Up @@ -83,9 +98,13 @@ proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] =
return err("failed to signal reqSignal on time in destroySdsThread")

joinThread(ctx.thread)

if ctx.threadErrorMsg.isNil() == false and ctx.threadErrorMsg.len > 0:
let errorMsg = $ctx.threadErrorMsg
dealloc(ctx.threadErrorMsg)
return err("SDS thread error: " & errorMsg)

ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)

return ok()
Expand Down Expand Up @@ -129,4 +148,4 @@ proc sendRequestToSdsThread*(

## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the
## process proc.
ok()
ok()
51 changes: 51 additions & 0 deletions library/sds_thread/shutdown.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

import chronos, chronos/selectors2

## Notice that this module extends current nim-chronos functionality to provide
## proper shutdown of the thread's dispatcher.
##
## This is necessary because nim-chronos does not provide a way to close
## the selector associated with a thread's dispatcher, which may lead to
## resource leaks.
##
## Therefore, this ideally should be contributed back to nim-chronos.

when defined(windows):
proc safeCloseHandle(h: HANDLE): Result[void, string] =
let res = closeHandle(h)
if res == 0: # WINBOOL FALSE
return err("Failed to close handle error code: " & osErrorMsg(osLastError()))
return ok()

proc closeDispatcher*(loop: PDispatcher): Result[void, string] =
? safeCloseHandle(loop.ioPort)
for i in loop.handles.items:
closeHandle(i)
loop.handles.clear()
return ok()

elif defined(macosx) or defined(freebsd) or defined(netbsd) or
defined(openbsd) or defined(dragonfly) or defined(macos) or
defined(linux) or defined(android) or defined(solaris):

proc closeDispatcher*(loop: PDispatcher): Result[void, string] =
## Close selector associated with current thread's dispatcher.
try:
loop.getIoHandler().close()
except IOSelectorsException as e:
return err("Exception in closeDispatcher: " & e.msg)
return ok()

proc shutdown*(): Result[void, string] {.raises: [].} =
## Performs final cleanup of all dispatcher resources.
## Notice that this should be called only when sure that no new async tasks will be scheduled.
##
## This routine shall be called only after `pollFor` has completed. Upon
## invocation, all streams are assumed to have been closed.
##
## Then, it assumes the thread's dispatcher has explicitly been stopped, destroyed and will never
## be used again.

let disp = getThreadDispatcher()
? closeDispatcher(disp)
return ok()