--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/netdata_tsrelay.nim Mon Feb 12 12:26:16 2018 -0800
@@ -0,0 +1,191 @@
+# vim: set et nosta sw=4 ts=4 ft=nim :
+#
+# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
+# All rights reserved.
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of Mahlon E. Smith nor the names of his
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
+# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
+# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+import
+ db_postgres,
+ json,
+ nativesockets,
+ net,
+ parseopt2,
+ strutils,
+ tables,
+ threadpool
+
+
+const
+ VERSION = "v0.1.0"
+ USAGE = """
+./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866
+
+The default connection string is:
+ "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
+ """
+ INSERT_SQL = """
+ INSERT INTO netdata
+ ( time, host, metrics )
+ VALUES
+ ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
+ """
+
+
+type Config = object of RootObj
+ dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
+ listen_port: int # The port to listen for incoming connections
+
+# Global config object
+#
+var conf = Config(
+ dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
+ listen_port: 14866
+)
+
+
+proc fetch_data( client: Socket ): string =
+ ## Netdata JSON backend doesn't send a length, so we read line by
+ ## line and wait for stream timeout to determine a "sample".
+ try:
+ result = client.recv_line( timeout=500 ) & "\n"
+ while result != "":
+ result = result & client.recv_line( timeout=500 ) & "\n"
+ except TimeoutError:
+ discard
+
+
+proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] =
+ ## Given a raw +data+ string, parse JSON and return a table of
+ ## JSON samples ready for writing, keyed by timestamp. Netdata can
+ ## buffer multiple samples in one batch.
+ if data == "": return
+
+ # Hash of sample timeperiods to pivoted json data
+ result = init_table[ BiggestInt, JsonNode ]()
+
+ for sample in split_lines( data ):
+ if defined( testing ): echo sample
+ if sample.len == 0: continue
+
+ var parsed: JsonNode
+ try:
+ parsed = sample.parse_json
+ except JsonParsingError:
+ if defined( testing ): echo "Unable to parse sample line: " & sample
+
+ # Create or use existing Json object for modded data.
+ #
+ var pivot: JsonNode
+ let key = parsed["timestamp"].get_num
+
+ if result.has_key( key ):
+ pivot = result[ key ]
+ else:
+ pivot = newJObject()
+ result[ key ] = pivot
+
+ var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
+ pivot[ "hostname" ] = parsed[ "hostname" ]
+ pivot[ name ] = parsed[ "value" ]
+
+ if defined( testing ): echo $result.len & " samples"
+
+ return result
+
+
+proc process( client: Socket, db: DBConn ): void =
+ ## Do the work for a connected client within a thread.
+ var raw_data = client.fetch_data
+
+ try:
+ if defined( testing ):
+ echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0]
+ client.close
+ except OSError:
+ return
+
+ var samples = parse_data( raw_data )
+ for timestamp, sample in samples:
+ var host = sample[ "hostname" ].get_str
+ sample.delete( "hostname" )
+ db.exec sql( INSERT_SQL ), timestamp, host, sample
+
+proc serverloop: void =
+ ## Open a database connection, bind to the listening socket,
+ ## and start serving incoming netdata streams.
+ let db = open( "", "", "", conf.dbopts )
+ echo "Successfully connected to the backend database."
+ #SELECT TIMESTAMP WITH TIME ZONE 'epoch' + 982384720 * INTERVAL '1 second';'
+ #db.close
+
+ var server = newSocket()
+ echo "Listening for incoming connections on port ", conf.listen_port, "..."
+ server.set_sock_opt( OptReuseAddr, true )
+ server.bind_addr( Port(conf.listen_port) )
+ server.listen()
+
+ while true:
+ var
+ client = newSocket()
+ address = ""
+
+ server.acceptAddr( client, address )
+ echo "New connection: " & address
+ spawn client.process( db )
+
+
+proc parse_cmdline: void =
+ ## Populate the config object with the user's preferences.
+ for kind, key, val in getopt():
+ case kind
+
+ of cmdArgument:
+ discard
+
+ of cmdLongOption, cmdShortOption:
+ case key
+ of "help", "h":
+ echo USAGE
+ quit( 0 )
+
+ of "version", "v":
+ echo "netdata_tsrelay ", VERSION
+ quit( 0 )
+
+ of "dbopts": conf.dbopts = val
+ of "listen-port", "p": conf.listen_port = val.parse_int
+
+ else: discard
+
+ of cmdEnd: assert( false ) # shouldn't reach here ever
+
+
+when isMainModule:
+ parse_cmdline()
+ if defined( testing ): echo conf
+ serverloop()
+