2018-02-12 12:26:16 -08:00
|
|
|
# vim: set et nosta sw=4 ts=4 ft=nim :
|
|
|
|
|
#
|
|
|
|
|
# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
|
|
|
|
|
# All rights reserved.
|
|
|
|
|
# Redistribution and use in source and binary forms, with or without
|
|
|
|
|
# modification, are permitted provided that the following conditions are met:
|
|
|
|
|
#
|
|
|
|
|
# * Redistributions of source code must retain the above copyright
|
|
|
|
|
# notice, this list of conditions and the following disclaimer.
|
|
|
|
|
#
|
|
|
|
|
# * Redistributions in binary form must reproduce the above copyright
|
|
|
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
|
|
|
# documentation and/or other materials provided with the distribution.
|
|
|
|
|
#
|
|
|
|
|
# * Neither the name of Mahlon E. Smith nor the names of his
|
|
|
|
|
# contributors may be used to endorse or promote products derived
|
|
|
|
|
# from this software without specific prior written permission.
|
|
|
|
|
#
|
|
|
|
|
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
|
|
|
|
|
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
|
|
|
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
|
|
|
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
|
|
|
|
|
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
|
|
|
|
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
|
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
|
|
|
|
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
|
|
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
|
|
|
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import
|
|
|
|
|
db_postgres,
|
|
|
|
|
json,
|
2018-02-14 15:41:11 -08:00
|
|
|
math,
|
2018-02-12 12:26:16 -08:00
|
|
|
nativesockets,
|
|
|
|
|
net,
|
2018-02-14 15:41:11 -08:00
|
|
|
os,
|
2018-02-12 12:26:16 -08:00
|
|
|
parseopt2,
|
|
|
|
|
strutils,
|
|
|
|
|
tables,
|
2018-02-14 15:41:11 -08:00
|
|
|
terminal,
|
|
|
|
|
times,
|
2018-02-12 12:26:16 -08:00
|
|
|
threadpool
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const
|
|
|
|
|
VERSION = "v0.1.0"
|
|
|
|
|
USAGE = """
|
2018-02-14 15:41:11 -08:00
|
|
|
./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
|
|
|
|
|
|
|
|
|
|
-q: Quiet mode. No output at all. Ignored if -d is supplied.
|
|
|
|
|
-c: Suppress ANSI color output.
|
|
|
|
|
-d: Debug: Show incoming and parsed data.
|
|
|
|
|
-v: Display version number.
|
|
|
|
|
-h: Help. You're lookin' at it.
|
2018-02-12 12:26:16 -08:00
|
|
|
|
|
|
|
|
The default connection string is:
|
|
|
|
|
"host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
|
|
|
|
|
"""
|
|
|
|
|
INSERT_SQL = """
|
|
|
|
|
INSERT INTO netdata
|
|
|
|
|
( time, host, metrics )
|
|
|
|
|
VALUES
|
|
|
|
|
( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
2018-02-14 15:41:11 -08:00
|
|
|
type
|
|
|
|
|
Config = object of RootObj
|
|
|
|
|
dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
|
|
|
|
|
listen_port: int # The port to listen for incoming connections
|
|
|
|
|
listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any.
|
|
|
|
|
verbose: bool # Be informative
|
|
|
|
|
debug: bool # Spew out raw data
|
|
|
|
|
use_color: bool # Pretty things up a little, probably want to disable this if debugging
|
|
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
|
2018-02-14 15:41:11 -08:00
|
|
|
# The global config object
|
|
|
|
|
#
|
|
|
|
|
# FIXME: Rather than pass this all over the
|
|
|
|
|
# place, consider channels and createThread instead of spawn.
|
2018-02-12 12:26:16 -08:00
|
|
|
#
|
|
|
|
|
var conf = Config(
|
|
|
|
|
dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
|
2018-02-14 15:41:11 -08:00
|
|
|
listen_port: 14866,
|
|
|
|
|
listen_addr: "0.0.0.0",
|
|
|
|
|
verbose: true,
|
|
|
|
|
debug: false,
|
|
|
|
|
use_color: true
|
2018-02-12 12:26:16 -08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2018-02-14 15:41:11 -08:00
|
|
|
proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
|
|
|
|
|
## Quick wrapper for color formatting a string, since the 'terminal'
|
|
|
|
|
## module only deals with stdout directly.
|
|
|
|
|
if not conf.use_color: return msg
|
|
|
|
|
|
|
|
|
|
var color: BiggestInt = ord( fg )
|
|
|
|
|
if bright: inc( color, 60 )
|
|
|
|
|
result = "\e[" & $color & 'm' & msg & "\e[0m"
|
|
|
|
|
|
|
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
proc fetch_data( client: Socket ): string =
|
|
|
|
|
## Netdata JSON backend doesn't send a length, so we read line by
|
|
|
|
|
## line and wait for stream timeout to determine a "sample".
|
|
|
|
|
try:
|
|
|
|
|
result = client.recv_line( timeout=500 ) & "\n"
|
|
|
|
|
while result != "":
|
|
|
|
|
result = result & client.recv_line( timeout=500 ) & "\n"
|
|
|
|
|
except TimeoutError:
|
|
|
|
|
discard
|
|
|
|
|
|
|
|
|
|
|
2018-02-14 15:41:11 -08:00
|
|
|
proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] =
|
2018-02-12 12:26:16 -08:00
|
|
|
## Given a raw +data+ string, parse JSON and return a table of
|
|
|
|
|
## JSON samples ready for writing, keyed by timestamp. Netdata can
|
|
|
|
|
## buffer multiple samples in one batch.
|
|
|
|
|
if data == "": return
|
|
|
|
|
|
|
|
|
|
# Hash of sample timeperiods to pivoted json data
|
|
|
|
|
result = init_table[ BiggestInt, JsonNode ]()
|
|
|
|
|
|
|
|
|
|
for sample in split_lines( data ):
|
2018-02-14 15:41:11 -08:00
|
|
|
if conf.debug: echo sample.hl( fgBlack, bright=true )
|
2018-02-12 12:26:16 -08:00
|
|
|
if sample.len == 0: continue
|
|
|
|
|
|
|
|
|
|
var parsed: JsonNode
|
|
|
|
|
try:
|
|
|
|
|
parsed = sample.parse_json
|
|
|
|
|
except JsonParsingError:
|
2018-02-14 15:41:11 -08:00
|
|
|
discard
|
|
|
|
|
if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )
|
2018-02-12 12:26:16 -08:00
|
|
|
|
|
|
|
|
# Create or use existing Json object for modded data.
|
|
|
|
|
#
|
|
|
|
|
var pivot: JsonNode
|
|
|
|
|
let key = parsed["timestamp"].get_num
|
|
|
|
|
|
|
|
|
|
if result.has_key( key ):
|
|
|
|
|
pivot = result[ key ]
|
|
|
|
|
else:
|
|
|
|
|
pivot = newJObject()
|
|
|
|
|
result[ key ] = pivot
|
|
|
|
|
|
|
|
|
|
var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
|
|
|
|
|
pivot[ "hostname" ] = parsed[ "hostname" ]
|
|
|
|
|
pivot[ name ] = parsed[ "value" ]
|
|
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
2018-02-14 15:41:11 -08:00
|
|
|
proc process( client: Socket, db: DBConn, conf: Config ): int =
|
2018-02-12 12:26:16 -08:00
|
|
|
## Do the work for a connected client within a thread.
|
2018-02-14 15:41:11 -08:00
|
|
|
## Returns the number of samples parsed.
|
2018-02-12 12:26:16 -08:00
|
|
|
var raw_data = client.fetch_data
|
|
|
|
|
|
2018-02-14 15:41:11 -08:00
|
|
|
# Done with the socket, netdata will automatically
|
|
|
|
|
# reconnect. Save local resources/file descriptors
|
|
|
|
|
# by closing after the send is considered complete.
|
|
|
|
|
#
|
2018-02-12 12:26:16 -08:00
|
|
|
try:
|
|
|
|
|
client.close
|
|
|
|
|
except OSError:
|
|
|
|
|
return
|
|
|
|
|
|
2018-02-14 15:41:11 -08:00
|
|
|
# Pivot data and save to SQL.
|
|
|
|
|
#
|
|
|
|
|
var samples = parse_data( raw_data, conf )
|
|
|
|
|
if samples.len != 0:
|
|
|
|
|
db.exec sql( "BEGIN" )
|
|
|
|
|
for timestamp, sample in samples:
|
|
|
|
|
var host = sample[ "hostname" ].get_str
|
|
|
|
|
sample.delete( "hostname" )
|
|
|
|
|
db.exec sql( INSERT_SQL ), timestamp, host, sample
|
|
|
|
|
db.exec sql( "COMMIT" )
|
|
|
|
|
|
|
|
|
|
return samples.len
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} =
|
|
|
|
|
## A thread that performs that dispatches processing and returns
|
|
|
|
|
## results.
|
|
|
|
|
let t0 = cpu_time()
|
|
|
|
|
var samples = client.process( db, conf )
|
|
|
|
|
|
|
|
|
|
if conf.verbose:
|
|
|
|
|
echo(
|
|
|
|
|
hl( $samples, fgWhite, bright=true ),
|
|
|
|
|
" sample(s) parsed from ",
|
|
|
|
|
address.hl( fgYellow, bright=true ),
|
|
|
|
|
" in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
|
|
|
|
|
# " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
|
|
|
|
|
)
|
2018-02-12 13:25:26 -08:00
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
|
|
|
|
|
proc serverloop: void =
|
|
|
|
|
## Open a database connection, bind to the listening socket,
|
|
|
|
|
## and start serving incoming netdata streams.
|
|
|
|
|
let db = open( "", "", "", conf.dbopts )
|
2018-02-14 15:41:11 -08:00
|
|
|
if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) )
|
|
|
|
|
|
2018-02-15 10:29:37 -08:00
|
|
|
var
|
|
|
|
|
conn_count = 0
|
|
|
|
|
server = newSocket()
|
2018-02-14 15:41:11 -08:00
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
server.set_sock_opt( OptReuseAddr, true )
|
2018-02-14 15:41:11 -08:00
|
|
|
server.bind_addr( Port(conf.listen_port), conf.listen_addr )
|
2018-02-12 12:26:16 -08:00
|
|
|
server.listen()
|
|
|
|
|
|
2018-02-14 15:41:11 -08:00
|
|
|
if conf.verbose:
|
|
|
|
|
echo(
|
|
|
|
|
"Listening for incoming connections on ".hl( fgGreen, bright=true ),
|
|
|
|
|
hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ),
|
|
|
|
|
":",
|
|
|
|
|
hl( $conf.listen_port, fgBlue, bright=true ),
|
|
|
|
|
)
|
|
|
|
|
echo ""
|
|
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
while true:
|
2018-02-14 17:27:29 -08:00
|
|
|
var client = newSocket()
|
2018-02-14 15:41:11 -08:00
|
|
|
var address = ""
|
2018-02-14 17:27:29 -08:00
|
|
|
|
2018-02-15 10:29:37 -08:00
|
|
|
# Force a garbage collection pass.
|
|
|
|
|
#
|
|
|
|
|
conn_count = conn_count + 1
|
|
|
|
|
if conn_count == 25:
|
|
|
|
|
when defined( testing ): echo "Forcing GC pass."
|
|
|
|
|
GC_full_collect()
|
|
|
|
|
conn_count = 1
|
|
|
|
|
|
2018-02-14 17:27:29 -08:00
|
|
|
client.close
|
2018-02-14 15:41:11 -08:00
|
|
|
server.acceptAddr( client, address ) # blocking call
|
|
|
|
|
spawn runthread( client, address, db, conf )
|
2018-02-15 10:29:37 -08:00
|
|
|
when defined( testing ): dumpNumberOfInstances()
|
2018-02-12 12:26:16 -08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
proc parse_cmdline: void =
|
|
|
|
|
## Populate the config object with the user's preferences.
|
2018-02-14 15:41:11 -08:00
|
|
|
|
|
|
|
|
# always set debug mode if development build.
|
|
|
|
|
conf.debug = defined( testing )
|
|
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
for kind, key, val in getopt():
|
|
|
|
|
case kind
|
|
|
|
|
|
|
|
|
|
of cmdArgument:
|
|
|
|
|
discard
|
|
|
|
|
|
|
|
|
|
of cmdLongOption, cmdShortOption:
|
|
|
|
|
case key
|
2018-02-14 15:41:11 -08:00
|
|
|
of "debug", "d":
|
|
|
|
|
conf.debug = true
|
|
|
|
|
|
|
|
|
|
of "no-color", "c":
|
|
|
|
|
conf.use_color = false
|
|
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
of "help", "h":
|
|
|
|
|
echo USAGE
|
|
|
|
|
quit( 0 )
|
2018-02-14 15:41:11 -08:00
|
|
|
|
|
|
|
|
of "quiet", "q":
|
|
|
|
|
conf.verbose = false
|
2018-02-12 12:26:16 -08:00
|
|
|
|
|
|
|
|
of "version", "v":
|
2018-02-14 15:41:11 -08:00
|
|
|
echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
|
2018-02-12 12:26:16 -08:00
|
|
|
quit( 0 )
|
|
|
|
|
|
|
|
|
|
of "dbopts": conf.dbopts = val
|
2018-02-14 15:41:11 -08:00
|
|
|
of "listen-addr", "a": conf.listen_addr = val
|
2018-02-12 12:26:16 -08:00
|
|
|
of "listen-port", "p": conf.listen_port = val.parse_int
|
|
|
|
|
|
|
|
|
|
else: discard
|
|
|
|
|
|
|
|
|
|
of cmdEnd: assert( false ) # shouldn't reach here ever
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
when isMainModule:
|
2018-02-14 15:41:11 -08:00
|
|
|
system.addQuitProc( resetAttributes )
|
|
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
parse_cmdline()
|
2018-02-14 15:41:11 -08:00
|
|
|
if conf.debug: echo hl( $conf, fgYellow )
|
2018-02-15 10:29:37 -08:00
|
|
|
|
2018-02-12 12:26:16 -08:00
|
|
|
serverloop()
|
|
|
|
|
|