--- a/netdata_tsrelay.nim Thu Feb 15 10:29:37 2018 -0800
+++ b/netdata_tsrelay.nim Sun Feb 18 18:16:37 2018 -0800
@@ -1,4 +1,5 @@
-# vim: set et nosta sw=4 ts=4 ft=nim :
+# vim: set et nosta sw=4 ts=4 :
+# im: set et nosta sw=4 ts=4 ft=nim :
#
# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
# All rights reserved.
@@ -36,11 +37,11 @@
net,
os,
parseopt2,
+ posix,
strutils,
tables,
terminal,
- times,
- threadpool
+ times
const
@@ -49,7 +50,6 @@
./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
-q: Quiet mode. No output at all. Ignored if -d is supplied.
- -c: Suppress ANSI color output.
-d: Debug: Show incoming and parsed data.
-v: Display version number.
-h: Help. You're lookin' at it.
@@ -72,28 +72,14 @@
listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any.
verbose: bool # Be informative
debug: bool # Spew out raw data
- use_color: bool # Pretty things up a little, probably want to disable this if debugging
-
-# The global config object
-#
-# FIXME: Rather than pass this all over the
-# place, consider channels and createThread instead of spawn.
-#
-var conf = Config(
- dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
- listen_port: 14866,
- listen_addr: "0.0.0.0",
- verbose: true,
- debug: false,
- use_color: true
-)
-
+# Global configuration
+var conf: Config
proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
## Quick wrapper for color formatting a string, since the 'terminal'
## module only deals with stdout directly.
- if not conf.use_color: return msg
+ if not isatty(stdout): return msg
var color: BiggestInt = ord( fg )
if bright: inc( color, 60 )
@@ -103,55 +89,91 @@
proc fetch_data( client: Socket ): string =
## Netdata JSON backend doesn't send a length, so we read line by
## line and wait for stream timeout to determine a "sample".
+ var buf: string = nil
try:
- result = client.recv_line( timeout=500 ) & "\n"
- while result != "":
- result = result & client.recv_line( timeout=500 ) & "\n"
+ result = client.recv_line( timeout=500 )
+ if result != "" and not result.is_nil: result = result & "\n"
+ while buf != "":
+ buf = client.recv_line( timeout=500 )
+ if buf != "" and not buf.is_nil: result = result & buf & "\n"
except TimeoutError:
discard
-proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] =
- ## Given a raw +data+ string, parse JSON and return a table of
- ## JSON samples ready for writing, keyed by timestamp. Netdata can
- ## buffer multiple samples in one batch.
- if data == "": return
+proc parse_data( data: string ): seq[ JsonNode ] =
+ ## Given a raw +data+ string, parse JSON and return a sequence
+ ## of JSON samples. Netdata can buffer multiple samples in one batch.
+ result = @[]
+ if data == "" or data.is_nil: return
# Hash of sample timeperiods to pivoted json data
- result = init_table[ BiggestInt, JsonNode ]()
+ var pivoted_data = init_table[ BiggestInt, JsonNode ]()
for sample in split_lines( data ):
- if conf.debug: echo sample.hl( fgBlack, bright=true )
- if sample.len == 0: continue
+ if sample == "" or sample.is_nil: continue
+ #if conf.debug: echo sample.hl( fgBlack, bright=true )
var parsed: JsonNode
try:
parsed = sample.parse_json
except JsonParsingError:
- discard
if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
+ continue
+ if parsed.kind != JObject: return
# Create or use existing Json object for modded data.
#
var pivot: JsonNode
- let key = parsed["timestamp"].get_num
+ try:
+ let key = parsed["timestamp"].get_num
+
+ if pivoted_data.has_key( key ):
+ pivot = pivoted_data[ key ]
+ else:
+ pivot = newJObject()
+ pivoted_data[ key ] = pivot
- if result.has_key( key ):
- pivot = result[ key ]
- else:
- pivot = newJObject()
- result[ key ] = pivot
+ var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
+ pivot[ "hostname" ] = parsed[ "hostname" ]
+ pivot[ "timestamp" ] = parsed[ "timestamp" ]
+ pivot[ name ] = parsed[ "value" ]
+ except:
+ continue
- var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
- pivot[ "hostname" ] = parsed[ "hostname" ]
- pivot[ name ] = parsed[ "value" ]
-
- return result
+ for timestamp, sample in pivoted_data:
+ result.add( sample )
-proc process( client: Socket, db: DBConn, conf: Config ): int =
+proc write_to_database( samples: seq[ JsonNode ] ): void =
+ ## Given a sequence of json samples, write them to database.
+ if samples.len == 0: return
+
+ let db = open( "", "", "", conf.dbopts )
+
+ try:
+ db.exec sql( "BEGIN" )
+ for sample in samples:
+ var
+ timestamp = sample[ "timestamp" ].get_num
+ host = sample[ "hostname" ].get_str
+ sample.delete( "timestamp" )
+ sample.delete( "hostname" )
+ db.exec sql( INSERT_SQL ), timestamp, host, sample
+ db.exec sql( "COMMIT" )
+ except:
+ let
+ e = getCurrentException()
+ msg = getCurrentExceptionMsg()
+ echo "Got exception ", repr(e), " while writing to DB: ", msg
+ discard
+
+ db.close
+
+
+proc process( client: Socket, address: string ): void =
## Do the work for a connected client within a thread.
- ## Returns the number of samples parsed.
+ ## Returns the formatted json data keyed on sample time.
+ let t0 = cpu_time()
var raw_data = client.fetch_data
# Done with the socket, netdata will automatically
@@ -163,29 +185,15 @@
except OSError:
return
- # Pivot data and save to SQL.
- #
- var samples = parse_data( raw_data, conf )
- if samples.len != 0:
- db.exec sql( "BEGIN" )
- for timestamp, sample in samples:
- var host = sample[ "hostname" ].get_str
- sample.delete( "hostname" )
- db.exec sql( INSERT_SQL ), timestamp, host, sample
- db.exec sql( "COMMIT" )
-
- return samples.len
-
-
-proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} =
- ## A thread that performs that dispatches processing and returns
- ## results.
- let t0 = cpu_time()
- var samples = client.process( db, conf )
+ # Pivot the parsed data to a single JSON blob per sample time.
+ var samples = parse_data( raw_data )
+ write_to_database( samples )
if conf.verbose:
echo(
- hl( $samples, fgWhite, bright=true ),
+ hl( $(epochTime().to_int), fgMagenta, bright=true ),
+ " ",
+ hl( $(samples.len), fgWhite, bright=true ),
" sample(s) parsed from ",
address.hl( fgYellow, bright=true ),
" in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
@@ -193,16 +201,16 @@
)
-proc serverloop: void =
+proc serverloop( conf: Config ): void =
## Open a database connection, bind to the listening socket,
## and start serving incoming netdata streams.
let db = open( "", "", "", conf.dbopts )
- if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) )
+ if conf.verbose: echo( "Successfully tested connection to the backend database.".hl( fgGreen ) )
+ db.close
- var
- conn_count = 0
- server = newSocket()
-
+ # Setup listening socket.
+ #
+ var server = newSocket()
server.set_sock_opt( OptReuseAddr, true )
server.bind_addr( Port(conf.listen_port), conf.listen_addr )
server.listen()
@@ -216,29 +224,42 @@
)
echo ""
+ # Wait for incoming connections, fork for each client.
+ #
while true:
- var client = newSocket()
- var address = ""
+ var
+ client = new Socket
+ address = ""
+ status: cint = 0
- # Force a garbage collection pass.
- #
- conn_count = conn_count + 1
- if conn_count == 25:
- when defined( testing ): echo "Forcing GC pass."
- GC_full_collect()
- conn_count = 1
+ server.acceptAddr( client, address ) # block
+
+ if fork() == 0:
+ server.close
+ client.process( address )
+ quit( 0 )
client.close
- server.acceptAddr( client, address ) # blocking call
- spawn runthread( client, address, db, conf )
+
+ discard waitpid( P_ALL, status, WNOHANG ) # reap all previous children
when defined( testing ): dumpNumberOfInstances()
-proc parse_cmdline: void =
+proc parse_cmdline: Config =
## Populate the config object with the user's preferences.
+ # Config object defaults.
+ #
+ result = Config(
+ dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
+ listen_port: 14866,
+ listen_addr: "0.0.0.0",
+ verbose: true,
+ debug: false
+ )
+
# always set debug mode if development build.
- conf.debug = defined( testing )
+ result.debug = defined( testing )
for kind, key, val in getopt():
case kind
@@ -249,25 +270,22 @@
of cmdLongOption, cmdShortOption:
case key
of "debug", "d":
- conf.debug = true
-
- of "no-color", "c":
- conf.use_color = false
+ result.debug = true
of "help", "h":
echo USAGE
quit( 0 )
of "quiet", "q":
- conf.verbose = false
+ result.verbose = false
of "version", "v":
echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
quit( 0 )
- of "dbopts": conf.dbopts = val
- of "listen-addr", "a": conf.listen_addr = val
- of "listen-port", "p": conf.listen_port = val.parse_int
+ of "dbopts": result.dbopts = val
+ of "listen-addr", "a": result.listen_addr = val
+ of "listen-port", "p": result.listen_port = val.parse_int
else: discard
@@ -277,8 +295,8 @@
when isMainModule:
system.addQuitProc( resetAttributes )
- parse_cmdline()
+ conf = parse_cmdline()
if conf.debug: echo hl( $conf, fgYellow )
- serverloop()
+ serverloop( conf )