# HG changeset patch # User Mahlon E. Smith # Date 1518467176 28800 # Node ID 72c9c6f0b713b5134fd1dd2a186dfd47cf0e109f Initial commit. diff -r 000000000000 -r 72c9c6f0b713 Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Makefile Mon Feb 12 12:26:16 2018 -0800 @@ -0,0 +1,25 @@ + +FILES = netdata_tsrelay.nim + +default: development + +debug: ${FILES} + nim --assertions:on --nimcache:.cache c ${FILES} + +development: ${FILES} + # can use gdb with this... + nim --debugInfo --threads:on --linedir:on --define:testing --nimcache:.cache c ${FILES} + +debugger: ${FILES} + nim --debugger:on --threads:on --nimcache:.cache c ${FILES} + +release: ${FILES} + nim -d:release --opt:speed --threads:on --nimcache:.cache c ${FILES} + +docs: + nim doc ${FILES} + #nim buildIndex ${FILES} + +clean: + cat .hgignore | xargs rm -rf + diff -r 000000000000 -r 72c9c6f0b713 netdata_tsrelay.nim --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/netdata_tsrelay.nim Mon Feb 12 12:26:16 2018 -0800 @@ -0,0 +1,191 @@ +# vim: set et nosta sw=4 ts=4 ft=nim : +# +# Copyright (c) 2018, Mahlon E. Smith +# All rights reserved. +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of Mahlon E. Smith nor the names of his +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import + db_postgres, + json, + nativesockets, + net, + parseopt2, + strutils, + tables, + threadpool + + +const + VERSION = "v0.1.0" + USAGE = """ +./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866 + +The default connection string is: + "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay" + """ + INSERT_SQL = """ + INSERT INTO netdata + ( time, host, metrics ) + VALUES + ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? ) + """ + + +type Config = object of RootObj + dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) + listen_port: int # The port to listen for incoming connections + +# Global config object +# +var conf = Config( + dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", + listen_port: 14866 +) + + +proc fetch_data( client: Socket ): string = + ## Netdata JSON backend doesn't send a length, so we read line by + ## line and wait for stream timeout to determine a "sample". + try: + result = client.recv_line( timeout=500 ) & "\n" + while result != "": + result = result & client.recv_line( timeout=500 ) & "\n" + except TimeoutError: + discard + + +proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] = + ## Given a raw +data+ string, parse JSON and return a table of + ## JSON samples ready for writing, keyed by timestamp. Netdata can + ## buffer multiple samples in one batch. + if data == "": return + + # Hash of sample timeperiods to pivoted json data + result = init_table[ BiggestInt, JsonNode ]() + + for sample in split_lines( data ): + if defined( testing ): echo sample + if sample.len == 0: continue + + var parsed: JsonNode + try: + parsed = sample.parse_json + except JsonParsingError: + if defined( testing ): echo "Unable to parse sample line: " & sample + + # Create or use existing Json object for modded data. + # + var pivot: JsonNode + let key = parsed["timestamp"].get_num + + if result.has_key( key ): + pivot = result[ key ] + else: + pivot = newJObject() + result[ key ] = pivot + + var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str + pivot[ "hostname" ] = parsed[ "hostname" ] + pivot[ name ] = parsed[ "value" ] + + if defined( testing ): echo $result.len & " samples" + + return result + + +proc process( client: Socket, db: DBConn ): void = + ## Do the work for a connected client within a thread. + var raw_data = client.fetch_data + + try: + if defined( testing ): + echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0] + client.close + except OSError: + return + + var samples = parse_data( raw_data ) + for timestamp, sample in samples: + var host = sample[ "hostname" ].get_str + sample.delete( "hostname" ) + db.exec sql( INSERT_SQL ), timestamp, host, sample + +proc serverloop: void = + ## Open a database connection, bind to the listening socket, + ## and start serving incoming netdata streams. + let db = open( "", "", "", conf.dbopts ) + echo "Successfully connected to the backend database." + #SELECT TIMESTAMP WITH TIME ZONE 'epoch' + 982384720 * INTERVAL '1 second';' + #db.close + + var server = newSocket() + echo "Listening for incoming connections on port ", conf.listen_port, "..." + server.set_sock_opt( OptReuseAddr, true ) + server.bind_addr( Port(conf.listen_port) ) + server.listen() + + while true: + var + client = newSocket() + address = "" + + server.acceptAddr( client, address ) + echo "New connection: " & address + spawn client.process( db ) + + +proc parse_cmdline: void = + ## Populate the config object with the user's preferences. + for kind, key, val in getopt(): + case kind + + of cmdArgument: + discard + + of cmdLongOption, cmdShortOption: + case key + of "help", "h": + echo USAGE + quit( 0 ) + + of "version", "v": + echo "netdata_tsrelay ", VERSION + quit( 0 ) + + of "dbopts": conf.dbopts = val + of "listen-port", "p": conf.listen_port = val.parse_int + + else: discard + + of cmdEnd: assert( false ) # shouldn't reach here ever + + +when isMainModule: + parse_cmdline() + if defined( testing ): echo conf + serverloop() +