--- a/netdata_tsrelay.nim Mon Feb 12 13:25:26 2018 -0800
+++ b/netdata_tsrelay.nim Wed Feb 14 15:41:11 2018 -0800
@@ -31,18 +31,28 @@
import
db_postgres,
json,
+ math,
nativesockets,
net,
+ os,
parseopt2,
strutils,
tables,
+ terminal,
+ times,
threadpool
const
VERSION = "v0.1.0"
USAGE = """
-./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866
+./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
+
+ -q: Quiet mode. No output at all. Ignored if -d is supplied.
+ -c: Suppress ANSI color output.
+ -d: Debug: Show incoming and parsed data.
+ -v: Display version number.
+ -h: Help. You're lookin' at it.
The default connection string is:
"host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
@@ -55,18 +65,41 @@
"""
-type Config = object of RootObj
- dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
- listen_port: int # The port to listen for incoming connections
+type
+ Config = object of RootObj
+ dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
+ listen_port: int # The port to listen for incoming connections
+ listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any.
+ verbose: bool # Be informative
+ debug: bool # Spew out raw data
+ use_color: bool # Pretty things up a little, probably want to disable this if debugging
-# Global config object
+
+# The global config object
+#
+# FIXME: Rather than pass this all over the
+# place, consider channels and createThread instead of spawn.
#
var conf = Config(
dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
- listen_port: 14866
+ listen_port: 14866,
+ listen_addr: "0.0.0.0",
+ verbose: true,
+ debug: false,
+ use_color: true
)
+proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
+ ## Quick wrapper for color formatting a string, since the 'terminal'
+ ## module only deals with stdout directly.
+ if not conf.use_color: return msg
+
+ var color: BiggestInt = ord( fg )
+ if bright: inc( color, 60 )
+ result = "\e[" & $color & 'm' & msg & "\e[0m"
+
+
proc fetch_data( client: Socket ): string =
## Netdata JSON backend doesn't send a length, so we read line by
## line and wait for stream timeout to determine a "sample".
@@ -78,7 +111,7 @@
discard
-proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] =
+proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] =
## Given a raw +data+ string, parse JSON and return a table of
## JSON samples ready for writing, keyed by timestamp. Netdata can
## buffer multiple samples in one batch.
@@ -88,14 +121,15 @@
result = init_table[ BiggestInt, JsonNode ]()
for sample in split_lines( data ):
- if defined( testing ): echo sample
+ if conf.debug: echo sample.hl( fgBlack, bright=true )
if sample.len == 0: continue
var parsed: JsonNode
try:
parsed = sample.parse_json
except JsonParsingError:
- if defined( testing ): echo "Unable to parse sample line: " & sample
+ discard
+ if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
# Create or use existing Json object for modded data.
#
@@ -112,56 +146,96 @@
pivot[ "hostname" ] = parsed[ "hostname" ]
pivot[ name ] = parsed[ "value" ]
- if defined( testing ): echo $result.len & " samples"
-
return result
-proc process( client: Socket, db: DBConn ): void =
+proc process( client: Socket, db: DBConn, conf: Config ): int =
## Do the work for a connected client within a thread.
+ ## Returns the number of samples parsed.
var raw_data = client.fetch_data
+ # Done with the socket, netdata will automatically
+ # reconnect. Save local resources/file descriptors
+ # by closing after the send is considered complete.
+ #
try:
- if defined( testing ):
- echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0]
client.close
except OSError:
return
- var samples = parse_data( raw_data )
- if samples.len == 0: return
+ # Pivot data and save to SQL.
+ #
+ var samples = parse_data( raw_data, conf )
+ if samples.len != 0:
+ db.exec sql( "BEGIN" )
+ for timestamp, sample in samples:
+ var host = sample[ "hostname" ].get_str
+ sample.delete( "hostname" )
+ db.exec sql( INSERT_SQL ), timestamp, host, sample
+ db.exec sql( "COMMIT" )
+
+ return samples.len
+
- db.exec sql( "BEGIN" )
- for timestamp, sample in samples:
- var host = sample[ "hostname" ].get_str
- sample.delete( "hostname" )
- db.exec sql( INSERT_SQL ), timestamp, host, sample
- db.exec sql( "COMMIT" )
+proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} =
+ ## A thread that performs that dispatches processing and returns
+ ## results.
+ let t0 = cpu_time()
+ var samples = client.process( db, conf )
+
+ if conf.verbose:
+ echo(
+ hl( $samples, fgWhite, bright=true ),
+ " sample(s) parsed from ",
+ address.hl( fgYellow, bright=true ),
+ " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
+ # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
+ )
+ when defined( testing ): dumpNumberOfInstances()
+
proc serverloop: void =
## Open a database connection, bind to the listening socket,
## and start serving incoming netdata streams.
let db = open( "", "", "", conf.dbopts )
- echo "Successfully connected to the backend database."
-
- var server = newSocket()
- echo "Listening for incoming connections on port ", conf.listen_port, "..."
+ if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) )
+
+ var
+ server = newSocket()
+ client = newSocket()
+
server.set_sock_opt( OptReuseAddr, true )
- server.bind_addr( Port(conf.listen_port) )
+ server.bind_addr( Port(conf.listen_port), conf.listen_addr )
server.listen()
+ if conf.verbose:
+ echo(
+ "Listening for incoming connections on ".hl( fgGreen, bright=true ),
+ hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ),
+ ":",
+ hl( $conf.listen_port, fgBlue, bright=true ),
+ )
+ echo ""
+
while true:
- var
- client = newSocket()
- address = ""
+ var address = ""
+ server.acceptAddr( client, address ) # blocking call
+ spawn runthread( client, address, db, conf )
+
- server.acceptAddr( client, address )
- echo "New connection: " & address
- spawn client.process( db )
+proc atexit() {.noconv.} =
+ ## Exit cleanly after waiting on any running threads.
+ echo "Exiting..."
+ sync()
+ quit( 0 )
proc parse_cmdline: void =
## Populate the config object with the user's preferences.
+
+ # always set debug mode if development build.
+ conf.debug = defined( testing )
+
for kind, key, val in getopt():
case kind
@@ -170,15 +244,25 @@
of cmdLongOption, cmdShortOption:
case key
+ of "debug", "d":
+ conf.debug = true
+
+ of "no-color", "c":
+ conf.use_color = false
+
of "help", "h":
echo USAGE
quit( 0 )
+
+ of "quiet", "q":
+ conf.verbose = false
of "version", "v":
- echo "netdata_tsrelay ", VERSION
+ echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
quit( 0 )
of "dbopts": conf.dbopts = val
+ of "listen-addr", "a": conf.listen_addr = val
of "listen-port", "p": conf.listen_port = val.parse_int
else: discard
@@ -187,7 +271,11 @@
when isMainModule:
+ system.addQuitProc( resetAttributes )
+ system.addQuitProc( atexit )
+
parse_cmdline()
- if defined( testing ): echo conf
+
+ if conf.debug: echo hl( $conf, fgYellow )
serverloop()