diff -r c0bcf3bea772 -r 1ef3f2d6d10e netdata_tsrelay.nim --- a/netdata_tsrelay.nim Thu Feb 15 10:29:37 2018 -0800 +++ b/netdata_tsrelay.nim Sun Feb 18 18:16:37 2018 -0800 @@ -1,4 +1,5 @@ -# vim: set et nosta sw=4 ts=4 ft=nim : +# vim: set et nosta sw=4 ts=4 : +# im: set et nosta sw=4 ts=4 ft=nim : # # Copyright (c) 2018, Mahlon E. Smith # All rights reserved. @@ -36,11 +37,11 @@ net, os, parseopt2, + posix, strutils, tables, terminal, - times, - threadpool + times const @@ -49,7 +50,6 @@ ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0 -q: Quiet mode. No output at all. Ignored if -d is supplied. - -c: Suppress ANSI color output. -d: Debug: Show incoming and parsed data. -v: Display version number. -h: Help. You're lookin' at it. @@ -72,28 +72,14 @@ listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. verbose: bool # Be informative debug: bool # Spew out raw data - use_color: bool # Pretty things up a little, probably want to disable this if debugging - -# The global config object -# -# FIXME: Rather than pass this all over the -# place, consider channels and createThread instead of spawn. -# -var conf = Config( - dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", - listen_port: 14866, - listen_addr: "0.0.0.0", - verbose: true, - debug: false, - use_color: true -) - +# Global configuration +var conf: Config proc hl( msg: string, fg: ForegroundColor, bright=false ): string = ## Quick wrapper for color formatting a string, since the 'terminal' ## module only deals with stdout directly. - if not conf.use_color: return msg + if not isatty(stdout): return msg var color: BiggestInt = ord( fg ) if bright: inc( color, 60 ) @@ -103,55 +89,91 @@ proc fetch_data( client: Socket ): string = ## Netdata JSON backend doesn't send a length, so we read line by ## line and wait for stream timeout to determine a "sample". + var buf: string = nil try: - result = client.recv_line( timeout=500 ) & "\n" - while result != "": - result = result & client.recv_line( timeout=500 ) & "\n" + result = client.recv_line( timeout=500 ) + if result != "" and not result.is_nil: result = result & "\n" + while buf != "": + buf = client.recv_line( timeout=500 ) + if buf != "" and not buf.is_nil: result = result & buf & "\n" except TimeoutError: discard -proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] = - ## Given a raw +data+ string, parse JSON and return a table of - ## JSON samples ready for writing, keyed by timestamp. Netdata can - ## buffer multiple samples in one batch. - if data == "": return +proc parse_data( data: string ): seq[ JsonNode ] = + ## Given a raw +data+ string, parse JSON and return a sequence + ## of JSON samples. Netdata can buffer multiple samples in one batch. + result = @[] + if data == "" or data.is_nil: return # Hash of sample timeperiods to pivoted json data - result = init_table[ BiggestInt, JsonNode ]() + var pivoted_data = init_table[ BiggestInt, JsonNode ]() for sample in split_lines( data ): - if conf.debug: echo sample.hl( fgBlack, bright=true ) - if sample.len == 0: continue + if sample == "" or sample.is_nil: continue + #if conf.debug: echo sample.hl( fgBlack, bright=true ) var parsed: JsonNode try: parsed = sample.parse_json except JsonParsingError: - discard if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed ) + continue + if parsed.kind != JObject: return # Create or use existing Json object for modded data. # var pivot: JsonNode - let key = parsed["timestamp"].get_num + try: + let key = parsed["timestamp"].get_num + + if pivoted_data.has_key( key ): + pivot = pivoted_data[ key ] + else: + pivot = newJObject() + pivoted_data[ key ] = pivot - if result.has_key( key ): - pivot = result[ key ] - else: - pivot = newJObject() - result[ key ] = pivot + var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str + pivot[ "hostname" ] = parsed[ "hostname" ] + pivot[ "timestamp" ] = parsed[ "timestamp" ] + pivot[ name ] = parsed[ "value" ] + except: + continue - var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str - pivot[ "hostname" ] = parsed[ "hostname" ] - pivot[ name ] = parsed[ "value" ] - - return result + for timestamp, sample in pivoted_data: + result.add( sample ) -proc process( client: Socket, db: DBConn, conf: Config ): int = +proc write_to_database( samples: seq[ JsonNode ] ): void = + ## Given a sequence of json samples, write them to database. + if samples.len == 0: return + + let db = open( "", "", "", conf.dbopts ) + + try: + db.exec sql( "BEGIN" ) + for sample in samples: + var + timestamp = sample[ "timestamp" ].get_num + host = sample[ "hostname" ].get_str + sample.delete( "timestamp" ) + sample.delete( "hostname" ) + db.exec sql( INSERT_SQL ), timestamp, host, sample + db.exec sql( "COMMIT" ) + except: + let + e = getCurrentException() + msg = getCurrentExceptionMsg() + echo "Got exception ", repr(e), " while writing to DB: ", msg + discard + + db.close + + +proc process( client: Socket, address: string ): void = ## Do the work for a connected client within a thread. - ## Returns the number of samples parsed. + ## Returns the formatted json data keyed on sample time. + let t0 = cpu_time() var raw_data = client.fetch_data # Done with the socket, netdata will automatically @@ -163,29 +185,15 @@ except OSError: return - # Pivot data and save to SQL. - # - var samples = parse_data( raw_data, conf ) - if samples.len != 0: - db.exec sql( "BEGIN" ) - for timestamp, sample in samples: - var host = sample[ "hostname" ].get_str - sample.delete( "hostname" ) - db.exec sql( INSERT_SQL ), timestamp, host, sample - db.exec sql( "COMMIT" ) - - return samples.len - - -proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} = - ## A thread that performs that dispatches processing and returns - ## results. - let t0 = cpu_time() - var samples = client.process( db, conf ) + # Pivot the parsed data to a single JSON blob per sample time. + var samples = parse_data( raw_data ) + write_to_database( samples ) if conf.verbose: echo( - hl( $samples, fgWhite, bright=true ), + hl( $(epochTime().to_int), fgMagenta, bright=true ), + " ", + hl( $(samples.len), fgWhite, bright=true ), " sample(s) parsed from ", address.hl( fgYellow, bright=true ), " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds." @@ -193,16 +201,16 @@ ) -proc serverloop: void = +proc serverloop( conf: Config ): void = ## Open a database connection, bind to the listening socket, ## and start serving incoming netdata streams. let db = open( "", "", "", conf.dbopts ) - if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) ) + if conf.verbose: echo( "Successfully tested connection to the backend database.".hl( fgGreen ) ) + db.close - var - conn_count = 0 - server = newSocket() - + # Setup listening socket. + # + var server = newSocket() server.set_sock_opt( OptReuseAddr, true ) server.bind_addr( Port(conf.listen_port), conf.listen_addr ) server.listen() @@ -216,29 +224,42 @@ ) echo "" + # Wait for incoming connections, fork for each client. + # while true: - var client = newSocket() - var address = "" + var + client = new Socket + address = "" + status: cint = 0 - # Force a garbage collection pass. - # - conn_count = conn_count + 1 - if conn_count == 25: - when defined( testing ): echo "Forcing GC pass." - GC_full_collect() - conn_count = 1 + server.acceptAddr( client, address ) # block + + if fork() == 0: + server.close + client.process( address ) + quit( 0 ) client.close - server.acceptAddr( client, address ) # blocking call - spawn runthread( client, address, db, conf ) + + discard waitpid( P_ALL, status, WNOHANG ) # reap all previous children when defined( testing ): dumpNumberOfInstances() -proc parse_cmdline: void = +proc parse_cmdline: Config = ## Populate the config object with the user's preferences. + # Config object defaults. + # + result = Config( + dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", + listen_port: 14866, + listen_addr: "0.0.0.0", + verbose: true, + debug: false + ) + # always set debug mode if development build. - conf.debug = defined( testing ) + result.debug = defined( testing ) for kind, key, val in getopt(): case kind @@ -249,25 +270,22 @@ of cmdLongOption, cmdShortOption: case key of "debug", "d": - conf.debug = true - - of "no-color", "c": - conf.use_color = false + result.debug = true of "help", "h": echo USAGE quit( 0 ) of "quiet", "q": - conf.verbose = false + result.verbose = false of "version", "v": echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true ) quit( 0 ) - of "dbopts": conf.dbopts = val - of "listen-addr", "a": conf.listen_addr = val - of "listen-port", "p": conf.listen_port = val.parse_int + of "dbopts": result.dbopts = val + of "listen-addr", "a": result.listen_addr = val + of "listen-port", "p": result.listen_port = val.parse_int else: discard @@ -277,8 +295,8 @@ when isMainModule: system.addQuitProc( resetAttributes ) - parse_cmdline() + conf = parse_cmdline() if conf.debug: echo hl( $conf, fgYellow ) - serverloop() + serverloop( conf )