netdata_tsrelay.nim
changeset 0 72c9c6f0b713
child 1 160338bb2822
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/netdata_tsrelay.nim	Mon Feb 12 12:26:16 2018 -0800
@@ -0,0 +1,191 @@
+# vim: set et nosta sw=4 ts=4 ft=nim : 
+#
+# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
+# All rights reserved.
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+#     * Redistributions of source code must retain the above copyright
+#       notice, this list of conditions and the following disclaimer.
+#
+#     * Redistributions in binary form must reproduce the above copyright
+#       notice, this list of conditions and the following disclaimer in the
+#       documentation and/or other materials provided with the distribution.
+#
+#     * Neither the name of Mahlon E. Smith nor the names of his
+#       contributors may be used to endorse or promote products derived
+#       from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
+# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
+# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+import
+    db_postgres,
+    json,
+    nativesockets,
+    net,
+    parseopt2,
+    strutils,
+    tables,
+    threadpool
+
+
+const
+    VERSION = "v0.1.0"
+    USAGE = """
+./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866
+
+The default connection string is:
+  "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
+    """
+    INSERT_SQL = """
+    INSERT INTO netdata
+        ( time, host, metrics )
+    VALUES
+        ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
+    """
+
+
+type Config = object of RootObj
+    dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
+    listen_port: int     # The port to listen for incoming connections
+
+# Global config object
+#
+var conf = Config(
+    dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
+    listen_port: 14866
+)
+
+
+proc fetch_data( client: Socket ): string =
+    ## Netdata JSON backend doesn't send a length, so we read line by
+    ## line and wait for stream timeout to determine a "sample".
+    try:
+        result = client.recv_line( timeout=500 ) & "\n"
+        while result != "":
+            result = result & client.recv_line( timeout=500 ) & "\n"
+    except TimeoutError:
+        discard
+
+
+proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] =
+    ## Given a raw +data+ string, parse JSON and return a table of
+    ## JSON samples ready for writing, keyed by timestamp. Netdata can
+    ## buffer multiple samples in one batch.
+    if data == "": return
+
+    # Hash of sample timeperiods to pivoted json data
+    result = init_table[ BiggestInt, JsonNode ]()
+
+    for sample in split_lines( data ):
+        if defined( testing ): echo sample
+        if sample.len == 0: continue
+
+        var parsed: JsonNode
+        try:
+            parsed = sample.parse_json
+        except JsonParsingError:
+            if defined( testing ): echo "Unable to parse sample line: " & sample
+
+        # Create or use existing Json object for modded data.
+        #
+        var pivot: JsonNode
+        let key = parsed["timestamp"].get_num
+
+        if result.has_key( key ):
+            pivot = result[ key ]
+        else:
+            pivot = newJObject()
+            result[ key ] = pivot
+
+        var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
+        pivot[ "hostname" ] = parsed[ "hostname" ]
+        pivot[ name ] = parsed[ "value" ]
+
+    if defined( testing ): echo $result.len & " samples"
+
+    return result
+
+
+proc process( client: Socket, db: DBConn ): void =
+    ## Do the work for a connected client within a thread.
+    var raw_data = client.fetch_data
+
+    try:
+        if defined( testing ):
+            echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0]
+        client.close
+    except OSError:
+        return
+
+    var samples = parse_data( raw_data )
+    for timestamp, sample in samples:
+        var host = sample[ "hostname" ].get_str
+        sample.delete( "hostname" )
+        db.exec sql( INSERT_SQL ), timestamp, host, sample
+
+proc serverloop: void =
+    ## Open a database connection, bind to the listening socket,
+    ## and start serving incoming netdata streams.
+    let db = open( "", "", "", conf.dbopts )
+    echo "Successfully connected to the backend database."
+    #SELECT TIMESTAMP WITH TIME ZONE 'epoch' + 982384720 * INTERVAL '1 second';'
+    #db.close
+    
+    var server = newSocket()
+    echo "Listening for incoming connections on port ", conf.listen_port, "..."
+    server.set_sock_opt( OptReuseAddr, true )
+    server.bind_addr( Port(conf.listen_port) )
+    server.listen()
+
+    while true:
+        var
+            client  = newSocket()
+            address = ""
+
+        server.acceptAddr( client, address )
+        echo "New connection: " & address
+        spawn client.process( db )
+
+
+proc parse_cmdline: void =
+    ## Populate the config object with the user's preferences.
+    for kind, key, val in getopt():
+        case kind
+
+        of cmdArgument:
+            discard
+
+        of cmdLongOption, cmdShortOption:
+            case key
+                of "help", "h":
+                    echo USAGE
+                    quit( 0 )
+            
+                of "version", "v":
+                    echo "netdata_tsrelay ", VERSION
+                    quit( 0 )
+               
+                of "dbopts": conf.dbopts = val
+                of "listen-port", "p": conf.listen_port = val.parse_int
+
+                else: discard
+
+        of cmdEnd: assert( false ) # shouldn't reach here ever
+
+
+when isMainModule:
+    parse_cmdline()
+    if defined( testing ): echo conf
+    serverloop()
+