--- a/netdata_tsrelay.nim Mon Nov 19 12:05:51 2018 -0800
+++ b/netdata_tsrelay.nim Sat Jul 25 15:05:26 2020 -0700
@@ -1,6 +1,6 @@
# vim: set et nosta sw=4 ts=4 :
#
-# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
+# Copyright (c) 2018-2020, Mahlon E. Smith <mahlon@martini.nu>
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
@@ -34,29 +34,60 @@
math,
nativesockets,
net,
- os,
parseopt,
posix,
strutils,
+ strformat,
tables,
terminal,
times
const
- VERSION = "v0.2.0"
+ VERSION = "v0.3.0"
USAGE = """
-./netdata_tsrelay [-d][-h][-q][-t][-T][-v] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0
+./netdata_tsrelay [-adDhopqtTv]
+
+ -a --listen-addr:
+ The outbound IP address to listen for netdata streams.
+
+ -d --debug:
+ Debug: Show incoming and parsed data.
+
+ -D --dropconn:
+ Drop the persistent socket to netdata between samples to conserve
+ local resources. This may be helpful with a large number of clients.
+ Defaults to false.
+
+ -h --help:
+ Help. You're lookin' at it.
- -q: Quiet mode. No output at all. Ignored if -d is supplied.
- -d: Debug: Show incoming and parsed data.
- -v: Display version number.
- -T: Change the destination table name from the default 'netdata'.
- -t: Alter the maximum time (in ms) an open socket waits for data. Default: 500ms.
- -h: Help. You're lookin' at it.
+ -o --dbopts:
+ The PostgreSQL connection string parameters.
+ The default connection string is:
+ "host=localhost dbname=netdata application_name=netdata-tsrelay"
+
+ -p --listen-port:
+ Change the listening port from the default (14866).
+
+ -P --persistent:
+ Don't disconnect from the database between samples. This may be
+ more efficient with a small number of clients, when not using a
+ pooler, or with a very high sample size/rate. Defaults to false.
-The default connection string is:
- "host=localhost dbname=netdata application_name=netdata-tsrelay"
+ -q --quiet:
+ Quiet mode. No output at all. Ignored if -d is supplied.
+
+ -T --dbtable:
+ Change the destination table name from the default (netdata).
+
+ -t --timeout:
+ Alter the maximum time (in ms) an open socket waits for data
+ before processing the sample. Default: 500ms.
+
+ -v --verbose:
+ Display version number.
+
"""
INSERT_SQL = """
INSERT INTO $1
@@ -70,6 +101,8 @@
Config = object of RootObj
dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
dbtable: string # The name of the table to write to.
+ dropconn: bool # Close the TCP connection between samples.
+ persistent: bool # Don't close the database handle between samples.
listen_port: int # The port to listen for incoming connections.
listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any.
verbose: bool # Be informative
@@ -77,6 +110,14 @@
insertsql: string # The SQL insert string after interpolating the table name.
timeout: int # How long to block, waiting on connection data.
+
+type
+ NetdataClient = ref object
+ sock: Socket # The raw socket fd
+ address: string # The remote IP address
+ db: DbConn # An optionally persistent database handle
+
+
# Global configuration
var conf: Config
@@ -91,16 +132,25 @@
result = "\e[" & $color & 'm' & msg & "\e[0m"
-proc fetch_data( client: Socket ): string =
- ## Netdata JSON backend doesn't send a length, so we read line by
- ## line and wait for stream timeout to determine a "sample".
+proc fetch_data( client: NetdataClient ): string =
+ ## Netdata JSON backend doesn't send a length nor a separator
+ ## between samples, so we read line by line and wait for stream
+ ## timeout to determine what constitutes a sample.
var buf = ""
- try:
- while true:
- client.readline( buf, timeout=conf.timeout )
- if buf != "": result = result & buf & "\n"
- except TimeoutError:
- return
+ while true:
+ try:
+ client.sock.readline( buf, timeout=conf.timeout )
+ if buf == "":
+ if conf.debug: echo "Client {client.address} closed socket.".fmt.hl( fgRed, bright=true )
+ quit( 1 )
+
+ result = result & buf & "\n"
+
+ except OSError:
+ quit( 1 )
+ except TimeoutError:
+ if result == "": continue
+ return
proc parse_data( data: string ): seq[ JsonNode ] =
@@ -147,22 +197,23 @@
result.add( sample )
-proc write_to_database( samples: seq[ JsonNode ] ): void =
+proc write_to_database( client: NetdataClient, samples: seq[ JsonNode ] ): void =
## Given a sequence of json samples, write them to database.
if samples.len == 0: return
- let db = open( "", "", "", conf.dbopts )
+ if client.db.isNil:
+ client.db = open( "", "", "", conf.dbopts )
try:
- db.exec sql( "BEGIN" )
+ client.db.exec sql( "BEGIN" )
for sample in samples:
var
timestamp = sample[ "timestamp" ].get_int
host = sample[ "hostname" ].get_str.to_lowerascii
sample.delete( "timestamp" )
sample.delete( "hostname" )
- db.exec sql( conf.insertsql ), timestamp, host, sample
- db.exec sql( "COMMIT" )
+ client.db.exec sql( conf.insertsql ), timestamp, host, sample
+ client.db.exec sql( "COMMIT" )
except:
let
e = getCurrentException()
@@ -170,10 +221,12 @@
echo "Got exception ", repr(e), " while writing to DB: ", msg
discard
- db.close
+ if not conf.persistent:
+ client.db.close
+ client.db = nil
-proc process( client: Socket, address: string ): void =
+proc process( client: NetdataClient ): void =
## Do the work for a connected client within child process.
let t0 = cpu_time()
var raw_data = client.fetch_data
@@ -182,24 +235,25 @@
# reconnect. Save local resources/file descriptors
# by closing after the send is considered complete.
#
- try:
- client.close
- except OSError:
- return
+ if conf.dropconn:
+ try:
+ client.sock.close
+ except OSError:
+ return
# Pivot the parsed data to a single JSON blob per sample time.
var samples = parse_data( raw_data )
- write_to_database( samples )
+ client.write_to_database( samples )
if conf.verbose:
+ let cputime = cpu_time() - t0
echo(
hl( $(epochTime().to_int), fgMagenta, bright=true ),
" ",
hl( $(samples.len), fgWhite, bright=true ),
" sample(s) parsed from ",
- address.hl( fgYellow, bright=true ),
- " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
- # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
+ client.address.hl( fgYellow, bright=true ),
+ " in ", hl( "{cputime:<2.3f}".fmt, fgWhite, bright=true), " seconds."
)
@@ -235,19 +289,23 @@
# Wait for incoming connections, fork for each client.
#
while true:
- var
- client = new Socket
- address = ""
+ let client = NetdataClient.new
+ client.sock = Socket.new
# Block, waiting for new connections.
- server.acceptAddr( client, address )
+ server.acceptAddr( client.sock, client.address )
if fork() == 0:
server.close
- client.process( address )
- quit( 0 )
+ if conf.dropconn:
+ # "one shot" mode.
+ client.process
+ quit( 0 )
+ else:
+ # Keep the connection to netdata open.
+ while true: client.process
- client.close
+ client.sock.close
when defined( testing ): dumpNumberOfInstances()
@@ -259,11 +317,13 @@
result = Config(
dbopts: "host=localhost dbname=netdata application_name=netdata-tsrelay",
dbtable: "netdata",
+ dropconn: false,
listen_port: 14866,
listen_addr: "0.0.0.0",
verbose: true,
debug: false,
timeout: 500,
+ persistent: false,
insertsql: INSERT_SQL % [ "netdata" ]
)
@@ -281,6 +341,12 @@
of "debug", "d":
result.debug = true
+ of "dropconn", "D":
+ if result.persistent:
+ echo "Dropping TCP sockets are incompatible with persistent database connections."
+ quit( 1 )
+ result.dropconn = true
+
of "help", "h":
echo USAGE
quit( 0 )
@@ -296,11 +362,17 @@
of "dbtable", "T":
result.insertsql = INSERT_SQL % [ val ]
- of "dbopts": result.dbopts = val
+ of "dbopts", "o": result.dbopts = val
of "listen-addr", "a": result.listen_addr = val
of "listen-port", "p": result.listen_port = val.parse_int
+ of "persistent", "P":
+ if result.dropconn:
+ echo "Persistent database connections are incompatible with dropping TCP sockets."
+ quit( 1 )
+ result.persistent = true
+
else: discard
of cmdEnd: assert( false ) # shouldn't reach here ever