# HG changeset patch # User Mahlon E. Smith # Date 1518651671 28800 # Node ID a1276c3d39ebf3f553401bacfecfcf321274f13f # Parent f3d83bdd78772e20504d5315649513c0c0ae89db Multiple changes. - Colorize output by default, add option to disable. - Time parsing per incoming client. - Add a "quiet" mode. - Allow binding to a specific IP address. - Allow debug mode to be set without recompiling. - Alter thread wrapper for reporting and config passing. - Fix file descriptor leak with client connections. - Wait for current threads to finish before exiting. diff -r f3d83bdd7877 -r a1276c3d39eb Makefile --- a/Makefile Mon Feb 12 13:25:26 2018 -0800 +++ b/Makefile Wed Feb 14 15:41:11 2018 -0800 @@ -4,17 +4,17 @@ default: development debug: ${FILES} - nim --assertions:on --nimcache:.cache c ${FILES} + nim --assertions:on --threads:on --nimcache:.cache c ${FILES} development: ${FILES} # can use gdb with this... - nim --debugInfo --threads:on --linedir:on --define:testing --nimcache:.cache c ${FILES} + nim --debugInfo --threads:on --linedir:on -d:testing -d:nimTypeNames --nimcache:.cache c ${FILES} debugger: ${FILES} nim --debugger:on --threads:on --nimcache:.cache c ${FILES} release: ${FILES} - nim -d:release --opt:speed --threads:on --nimcache:.cache c ${FILES} + nim -d:release --opt:speed --parallelBuild:0 --threads:on --nimcache:.cache c ${FILES} docs: nim doc ${FILES} diff -r f3d83bdd7877 -r a1276c3d39eb README.md --- a/README.md Mon Feb 12 13:25:26 2018 -0800 +++ b/README.md Wed Feb 14 15:41:11 2018 -0800 @@ -68,6 +68,6 @@ prefix = n update every = 10 buffer on failures = 6 - send charts matching = !cpu.cpu* !ipv6* nfs.rpc net.* net_drops.* net_packets.* !system.interrupts* system.* disk.* disk_space.* disk_ops.* mem.* users.* + send charts matching = !cpu.cpu* !ipv6* !users* nfs.rpc net.* net_drops.* net_packets.* !system.interrupts* system.* disk.* disk_space.* disk_ops.* mem.* ``` diff -r f3d83bdd7877 -r a1276c3d39eb netdata_tsrelay.nim --- a/netdata_tsrelay.nim Mon Feb 12 13:25:26 2018 -0800 +++ b/netdata_tsrelay.nim Wed Feb 14 15:41:11 2018 -0800 @@ -31,18 +31,28 @@ import db_postgres, json, + math, nativesockets, net, + os, parseopt2, strutils, tables, + terminal, + times, threadpool const VERSION = "v0.1.0" USAGE = """ -./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866 +./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0 + + -q: Quiet mode. No output at all. Ignored if -d is supplied. + -c: Suppress ANSI color output. + -d: Debug: Show incoming and parsed data. + -v: Display version number. + -h: Help. You're lookin' at it. The default connection string is: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay" @@ -55,18 +65,41 @@ """ -type Config = object of RootObj - dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) - listen_port: int # The port to listen for incoming connections +type + Config = object of RootObj + dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) + listen_port: int # The port to listen for incoming connections + listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. + verbose: bool # Be informative + debug: bool # Spew out raw data + use_color: bool # Pretty things up a little, probably want to disable this if debugging -# Global config object + +# The global config object +# +# FIXME: Rather than pass this all over the +# place, consider channels and createThread instead of spawn. # var conf = Config( dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", - listen_port: 14866 + listen_port: 14866, + listen_addr: "0.0.0.0", + verbose: true, + debug: false, + use_color: true ) +proc hl( msg: string, fg: ForegroundColor, bright=false ): string = + ## Quick wrapper for color formatting a string, since the 'terminal' + ## module only deals with stdout directly. + if not conf.use_color: return msg + + var color: BiggestInt = ord( fg ) + if bright: inc( color, 60 ) + result = "\e[" & $color & 'm' & msg & "\e[0m" + + proc fetch_data( client: Socket ): string = ## Netdata JSON backend doesn't send a length, so we read line by ## line and wait for stream timeout to determine a "sample". @@ -78,7 +111,7 @@ discard -proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] = +proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] = ## Given a raw +data+ string, parse JSON and return a table of ## JSON samples ready for writing, keyed by timestamp. Netdata can ## buffer multiple samples in one batch. @@ -88,14 +121,15 @@ result = init_table[ BiggestInt, JsonNode ]() for sample in split_lines( data ): - if defined( testing ): echo sample + if conf.debug: echo sample.hl( fgBlack, bright=true ) if sample.len == 0: continue var parsed: JsonNode try: parsed = sample.parse_json except JsonParsingError: - if defined( testing ): echo "Unable to parse sample line: " & sample + discard + if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed ) # Create or use existing Json object for modded data. # @@ -112,56 +146,96 @@ pivot[ "hostname" ] = parsed[ "hostname" ] pivot[ name ] = parsed[ "value" ] - if defined( testing ): echo $result.len & " samples" - return result -proc process( client: Socket, db: DBConn ): void = +proc process( client: Socket, db: DBConn, conf: Config ): int = ## Do the work for a connected client within a thread. + ## Returns the number of samples parsed. var raw_data = client.fetch_data + # Done with the socket, netdata will automatically + # reconnect. Save local resources/file descriptors + # by closing after the send is considered complete. + # try: - if defined( testing ): - echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0] client.close except OSError: return - var samples = parse_data( raw_data ) - if samples.len == 0: return + # Pivot data and save to SQL. + # + var samples = parse_data( raw_data, conf ) + if samples.len != 0: + db.exec sql( "BEGIN" ) + for timestamp, sample in samples: + var host = sample[ "hostname" ].get_str + sample.delete( "hostname" ) + db.exec sql( INSERT_SQL ), timestamp, host, sample + db.exec sql( "COMMIT" ) + + return samples.len + - db.exec sql( "BEGIN" ) - for timestamp, sample in samples: - var host = sample[ "hostname" ].get_str - sample.delete( "hostname" ) - db.exec sql( INSERT_SQL ), timestamp, host, sample - db.exec sql( "COMMIT" ) +proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} = + ## A thread that performs that dispatches processing and returns + ## results. + let t0 = cpu_time() + var samples = client.process( db, conf ) + + if conf.verbose: + echo( + hl( $samples, fgWhite, bright=true ), + " sample(s) parsed from ", + address.hl( fgYellow, bright=true ), + " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds." + # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used." + ) + when defined( testing ): dumpNumberOfInstances() + proc serverloop: void = ## Open a database connection, bind to the listening socket, ## and start serving incoming netdata streams. let db = open( "", "", "", conf.dbopts ) - echo "Successfully connected to the backend database." - - var server = newSocket() - echo "Listening for incoming connections on port ", conf.listen_port, "..." + if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) ) + + var + server = newSocket() + client = newSocket() + server.set_sock_opt( OptReuseAddr, true ) - server.bind_addr( Port(conf.listen_port) ) + server.bind_addr( Port(conf.listen_port), conf.listen_addr ) server.listen() + if conf.verbose: + echo( + "Listening for incoming connections on ".hl( fgGreen, bright=true ), + hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ), + ":", + hl( $conf.listen_port, fgBlue, bright=true ), + ) + echo "" + while true: - var - client = newSocket() - address = "" + var address = "" + server.acceptAddr( client, address ) # blocking call + spawn runthread( client, address, db, conf ) + - server.acceptAddr( client, address ) - echo "New connection: " & address - spawn client.process( db ) +proc atexit() {.noconv.} = + ## Exit cleanly after waiting on any running threads. + echo "Exiting..." + sync() + quit( 0 ) proc parse_cmdline: void = ## Populate the config object with the user's preferences. + + # always set debug mode if development build. + conf.debug = defined( testing ) + for kind, key, val in getopt(): case kind @@ -170,15 +244,25 @@ of cmdLongOption, cmdShortOption: case key + of "debug", "d": + conf.debug = true + + of "no-color", "c": + conf.use_color = false + of "help", "h": echo USAGE quit( 0 ) + + of "quiet", "q": + conf.verbose = false of "version", "v": - echo "netdata_tsrelay ", VERSION + echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true ) quit( 0 ) of "dbopts": conf.dbopts = val + of "listen-addr", "a": conf.listen_addr = val of "listen-port", "p": conf.listen_port = val.parse_int else: discard @@ -187,7 +271,11 @@ when isMainModule: + system.addQuitProc( resetAttributes ) + system.addQuitProc( atexit ) + parse_cmdline() - if defined( testing ): echo conf + + if conf.debug: echo hl( $conf, fgYellow ) serverloop()