Multiple changes.

- There's still a delay somewhere with threading in the socket read()
   that impacts simultaneous client connections.  After a bunch of
   experimenting with Channel message passing, rip it all out in
   favor of a simple fork()ing server.

 - Remove the color option, just check stdout for a tty instead to make
   it automatic.

 - Better error handling for malformed packets/samples.
This commit is contained in:
Mahlon E. Smith 2018-02-18 18:16:37 -08:00
parent 687f0411be
commit d9c179f32a

View file

@ -1,4 +1,5 @@
# vim: set et nosta sw=4 ts=4 ft=nim : # vim: set et nosta sw=4 ts=4 :
# im: set et nosta sw=4 ts=4 ft=nim :
# #
# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu> # Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
# All rights reserved. # All rights reserved.
@ -36,11 +37,11 @@ import
net, net,
os, os,
parseopt2, parseopt2,
posix,
strutils, strutils,
tables, tables,
terminal, terminal,
times, times
threadpool
const const
@ -49,7 +50,6 @@ const
./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0 ./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
-q: Quiet mode. No output at all. Ignored if -d is supplied. -q: Quiet mode. No output at all. Ignored if -d is supplied.
-c: Suppress ANSI color output.
-d: Debug: Show incoming and parsed data. -d: Debug: Show incoming and parsed data.
-v: Display version number. -v: Display version number.
-h: Help. You're lookin' at it. -h: Help. You're lookin' at it.
@ -72,28 +72,14 @@ type
listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any.
verbose: bool # Be informative verbose: bool # Be informative
debug: bool # Spew out raw data debug: bool # Spew out raw data
use_color: bool # Pretty things up a little, probably want to disable this if debugging
# The global config object
#
# FIXME: Rather than pass this all over the
# place, consider channels and createThread instead of spawn.
#
var conf = Config(
dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
listen_port: 14866,
listen_addr: "0.0.0.0",
verbose: true,
debug: false,
use_color: true
)
# Global configuration
var conf: Config
proc hl( msg: string, fg: ForegroundColor, bright=false ): string = proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
## Quick wrapper for color formatting a string, since the 'terminal' ## Quick wrapper for color formatting a string, since the 'terminal'
## module only deals with stdout directly. ## module only deals with stdout directly.
if not conf.use_color: return msg if not isatty(stdout): return msg
var color: BiggestInt = ord( fg ) var color: BiggestInt = ord( fg )
if bright: inc( color, 60 ) if bright: inc( color, 60 )
@ -103,55 +89,91 @@ proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
proc fetch_data( client: Socket ): string = proc fetch_data( client: Socket ): string =
## Netdata JSON backend doesn't send a length, so we read line by ## Netdata JSON backend doesn't send a length, so we read line by
## line and wait for stream timeout to determine a "sample". ## line and wait for stream timeout to determine a "sample".
var buf: string = nil
try: try:
result = client.recv_line( timeout=500 ) & "\n" result = client.recv_line( timeout=500 )
while result != "": if result != "" and not result.is_nil: result = result & "\n"
result = result & client.recv_line( timeout=500 ) & "\n" while buf != "":
buf = client.recv_line( timeout=500 )
if buf != "" and not buf.is_nil: result = result & buf & "\n"
except TimeoutError: except TimeoutError:
discard discard
proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] = proc parse_data( data: string ): seq[ JsonNode ] =
## Given a raw +data+ string, parse JSON and return a table of ## Given a raw +data+ string, parse JSON and return a sequence
## JSON samples ready for writing, keyed by timestamp. Netdata can ## of JSON samples. Netdata can buffer multiple samples in one batch.
## buffer multiple samples in one batch. result = @[]
if data == "": return if data == "" or data.is_nil: return
# Hash of sample timeperiods to pivoted json data # Hash of sample timeperiods to pivoted json data
result = init_table[ BiggestInt, JsonNode ]() var pivoted_data = init_table[ BiggestInt, JsonNode ]()
for sample in split_lines( data ): for sample in split_lines( data ):
if conf.debug: echo sample.hl( fgBlack, bright=true ) if sample == "" or sample.is_nil: continue
if sample.len == 0: continue #if conf.debug: echo sample.hl( fgBlack, bright=true )
var parsed: JsonNode var parsed: JsonNode
try: try:
parsed = sample.parse_json parsed = sample.parse_json
except JsonParsingError: except JsonParsingError:
discard
if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed ) if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
continue
if parsed.kind != JObject: return
# Create or use existing Json object for modded data. # Create or use existing Json object for modded data.
# #
var pivot: JsonNode var pivot: JsonNode
try:
let key = parsed["timestamp"].get_num let key = parsed["timestamp"].get_num
if result.has_key( key ): if pivoted_data.has_key( key ):
pivot = result[ key ] pivot = pivoted_data[ key ]
else: else:
pivot = newJObject() pivot = newJObject()
result[ key ] = pivot pivoted_data[ key ] = pivot
var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
pivot[ "hostname" ] = parsed[ "hostname" ] pivot[ "hostname" ] = parsed[ "hostname" ]
pivot[ "timestamp" ] = parsed[ "timestamp" ]
pivot[ name ] = parsed[ "value" ] pivot[ name ] = parsed[ "value" ]
except:
continue
return result for timestamp, sample in pivoted_data:
result.add( sample )
proc process( client: Socket, db: DBConn, conf: Config ): int = proc write_to_database( samples: seq[ JsonNode ] ): void =
## Given a sequence of json samples, write them to database.
if samples.len == 0: return
let db = open( "", "", "", conf.dbopts )
try:
db.exec sql( "BEGIN" )
for sample in samples:
var
timestamp = sample[ "timestamp" ].get_num
host = sample[ "hostname" ].get_str
sample.delete( "timestamp" )
sample.delete( "hostname" )
db.exec sql( INSERT_SQL ), timestamp, host, sample
db.exec sql( "COMMIT" )
except:
let
e = getCurrentException()
msg = getCurrentExceptionMsg()
echo "Got exception ", repr(e), " while writing to DB: ", msg
discard
db.close
proc process( client: Socket, address: string ): void =
## Do the work for a connected client within a thread. ## Do the work for a connected client within a thread.
## Returns the number of samples parsed. ## Returns the formatted json data keyed on sample time.
let t0 = cpu_time()
var raw_data = client.fetch_data var raw_data = client.fetch_data
# Done with the socket, netdata will automatically # Done with the socket, netdata will automatically
@ -163,29 +185,15 @@ proc process( client: Socket, db: DBConn, conf: Config ): int =
except OSError: except OSError:
return return
# Pivot data and save to SQL. # Pivot the parsed data to a single JSON blob per sample time.
# var samples = parse_data( raw_data )
var samples = parse_data( raw_data, conf ) write_to_database( samples )
if samples.len != 0:
db.exec sql( "BEGIN" )
for timestamp, sample in samples:
var host = sample[ "hostname" ].get_str
sample.delete( "hostname" )
db.exec sql( INSERT_SQL ), timestamp, host, sample
db.exec sql( "COMMIT" )
return samples.len
proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} =
## A thread that performs that dispatches processing and returns
## results.
let t0 = cpu_time()
var samples = client.process( db, conf )
if conf.verbose: if conf.verbose:
echo( echo(
hl( $samples, fgWhite, bright=true ), hl( $(epochTime().to_int), fgMagenta, bright=true ),
" ",
hl( $(samples.len), fgWhite, bright=true ),
" sample(s) parsed from ", " sample(s) parsed from ",
address.hl( fgYellow, bright=true ), address.hl( fgYellow, bright=true ),
" in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds." " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
@ -193,16 +201,16 @@ proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): voi
) )
proc serverloop: void = proc serverloop( conf: Config ): void =
## Open a database connection, bind to the listening socket, ## Open a database connection, bind to the listening socket,
## and start serving incoming netdata streams. ## and start serving incoming netdata streams.
let db = open( "", "", "", conf.dbopts ) let db = open( "", "", "", conf.dbopts )
if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) ) if conf.verbose: echo( "Successfully tested connection to the backend database.".hl( fgGreen ) )
db.close
var
conn_count = 0
server = newSocket()
# Setup listening socket.
#
var server = newSocket()
server.set_sock_opt( OptReuseAddr, true ) server.set_sock_opt( OptReuseAddr, true )
server.bind_addr( Port(conf.listen_port), conf.listen_addr ) server.bind_addr( Port(conf.listen_port), conf.listen_addr )
server.listen() server.listen()
@ -216,29 +224,42 @@ proc serverloop: void =
) )
echo "" echo ""
while true: # Wait for incoming connections, fork for each client.
var client = newSocket()
var address = ""
# Force a garbage collection pass.
# #
conn_count = conn_count + 1 while true:
if conn_count == 25: var
when defined( testing ): echo "Forcing GC pass." client = new Socket
GC_full_collect() address = ""
conn_count = 1 status: cint = 0
server.acceptAddr( client, address ) # block
if fork() == 0:
server.close
client.process( address )
quit( 0 )
client.close client.close
server.acceptAddr( client, address ) # blocking call
spawn runthread( client, address, db, conf ) discard waitpid( P_ALL, status, WNOHANG ) # reap all previous children
when defined( testing ): dumpNumberOfInstances() when defined( testing ): dumpNumberOfInstances()
proc parse_cmdline: void = proc parse_cmdline: Config =
## Populate the config object with the user's preferences. ## Populate the config object with the user's preferences.
# Config object defaults.
#
result = Config(
dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
listen_port: 14866,
listen_addr: "0.0.0.0",
verbose: true,
debug: false
)
# always set debug mode if development build. # always set debug mode if development build.
conf.debug = defined( testing ) result.debug = defined( testing )
for kind, key, val in getopt(): for kind, key, val in getopt():
case kind case kind
@ -249,25 +270,22 @@ proc parse_cmdline: void =
of cmdLongOption, cmdShortOption: of cmdLongOption, cmdShortOption:
case key case key
of "debug", "d": of "debug", "d":
conf.debug = true result.debug = true
of "no-color", "c":
conf.use_color = false
of "help", "h": of "help", "h":
echo USAGE echo USAGE
quit( 0 ) quit( 0 )
of "quiet", "q": of "quiet", "q":
conf.verbose = false result.verbose = false
of "version", "v": of "version", "v":
echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true ) echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
quit( 0 ) quit( 0 )
of "dbopts": conf.dbopts = val of "dbopts": result.dbopts = val
of "listen-addr", "a": conf.listen_addr = val of "listen-addr", "a": result.listen_addr = val
of "listen-port", "p": conf.listen_port = val.parse_int of "listen-port", "p": result.listen_port = val.parse_int
else: discard else: discard
@ -277,8 +295,8 @@ proc parse_cmdline: void =
when isMainModule: when isMainModule:
system.addQuitProc( resetAttributes ) system.addQuitProc( resetAttributes )
parse_cmdline() conf = parse_cmdline()
if conf.debug: echo hl( $conf, fgYellow ) if conf.debug: echo hl( $conf, fgYellow )
serverloop() serverloop( conf )