netdata_tsrelay.nim
author Mahlon E. Smith <mahlon@laika.com>
Mon, 12 Feb 2018 12:49:12 -0800
changeset 1 160338bb2822
parent 0 72c9c6f0b713
child 4 f3d83bdd7877
permissions -rw-r--r--
Add README.

# vim: set et nosta sw=4 ts=4 ft=nim : 
#
# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
#     * Neither the name of Mahlon E. Smith nor the names of his
#       contributors may be used to endorse or promote products derived
#       from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


import
    db_postgres,
    json,
    nativesockets,
    net,
    parseopt2,
    strutils,
    tables,
    threadpool


const
    VERSION = "v0.1.0"
    USAGE = """
./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866

The default connection string is:
  "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
    """
    INSERT_SQL = """
    INSERT INTO netdata
        ( time, host, metrics )
    VALUES
        ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
    """


type Config = object of RootObj
    dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
    listen_port: int     # The port to listen for incoming connections

# Global config object
#
var conf = Config(
    dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
    listen_port: 14866
)


proc fetch_data( client: Socket ): string =
    ## Netdata JSON backend doesn't send a length, so we read line by
    ## line and wait for stream timeout to determine a "sample".
    try:
        result = client.recv_line( timeout=500 ) & "\n"
        while result != "":
            result = result & client.recv_line( timeout=500 ) & "\n"
    except TimeoutError:
        discard


proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] =
    ## Given a raw +data+ string, parse JSON and return a table of
    ## JSON samples ready for writing, keyed by timestamp. Netdata can
    ## buffer multiple samples in one batch.
    if data == "": return

    # Hash of sample timeperiods to pivoted json data
    result = init_table[ BiggestInt, JsonNode ]()

    for sample in split_lines( data ):
        if defined( testing ): echo sample
        if sample.len == 0: continue

        var parsed: JsonNode
        try:
            parsed = sample.parse_json
        except JsonParsingError:
            if defined( testing ): echo "Unable to parse sample line: " & sample

        # Create or use existing Json object for modded data.
        #
        var pivot: JsonNode
        let key = parsed["timestamp"].get_num

        if result.has_key( key ):
            pivot = result[ key ]
        else:
            pivot = newJObject()
            result[ key ] = pivot

        var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
        pivot[ "hostname" ] = parsed[ "hostname" ]
        pivot[ name ] = parsed[ "value" ]

    if defined( testing ): echo $result.len & " samples"

    return result


proc process( client: Socket, db: DBConn ): void =
    ## Do the work for a connected client within a thread.
    var raw_data = client.fetch_data

    try:
        if defined( testing ):
            echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0]
        client.close
    except OSError:
        return

    var samples = parse_data( raw_data )
    for timestamp, sample in samples:
        var host = sample[ "hostname" ].get_str
        sample.delete( "hostname" )
        db.exec sql( INSERT_SQL ), timestamp, host, sample

proc serverloop: void =
    ## Open a database connection, bind to the listening socket,
    ## and start serving incoming netdata streams.
    let db = open( "", "", "", conf.dbopts )
    echo "Successfully connected to the backend database."
    
    var server = newSocket()
    echo "Listening for incoming connections on port ", conf.listen_port, "..."
    server.set_sock_opt( OptReuseAddr, true )
    server.bind_addr( Port(conf.listen_port) )
    server.listen()

    while true:
        var
            client  = newSocket()
            address = ""

        server.acceptAddr( client, address )
        echo "New connection: " & address
        spawn client.process( db )


proc parse_cmdline: void =
    ## Populate the config object with the user's preferences.
    for kind, key, val in getopt():
        case kind

        of cmdArgument:
            discard

        of cmdLongOption, cmdShortOption:
            case key
                of "help", "h":
                    echo USAGE
                    quit( 0 )
            
                of "version", "v":
                    echo "netdata_tsrelay ", VERSION
                    quit( 0 )
               
                of "dbopts": conf.dbopts = val
                of "listen-port", "p": conf.listen_port = val.parse_int

                else: discard

        of cmdEnd: assert( false ) # shouldn't reach here ever


when isMainModule:
    parse_cmdline()
    if defined( testing ): echo conf
    serverloop()