netdata_tsrelay.nim
author Mahlon E. Smith <mahlon@laika.com>
Wed, 14 Feb 2018 17:27:29 -0800
changeset 6 1f366fc61592
parent 5 a1276c3d39eb
child 7 c0bcf3bea772
permissions -rw-r--r--
Each incoming connection requires its own client socket.

# vim: set et nosta sw=4 ts=4 ft=nim : 
#
# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
#     * Neither the name of Mahlon E. Smith nor the names of his
#       contributors may be used to endorse or promote products derived
#       from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


import
    db_postgres,
    json,
    math,
    nativesockets,
    net,
    os,
    parseopt2,
    strutils,
    tables,
    terminal,
    times,
    threadpool


const
    VERSION = "v0.1.0"
    USAGE = """
./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0

  -q: Quiet mode.  No output at all.  Ignored if -d is supplied.
  -c: Suppress ANSI color output.
  -d: Debug: Show incoming and parsed data.
  -v: Display version number.
  -h: Help.  You're lookin' at it.

The default connection string is:
  "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
    """
    INSERT_SQL = """
    INSERT INTO netdata
        ( time, host, metrics )
    VALUES
        ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
    """


type
    Config = object of RootObj
        dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
        listen_port: int     # The port to listen for incoming connections
        listen_addr: string  # The IP address listen for incoming connections.  Defaults to inaddr_any.
        verbose:     bool    # Be informative
        debug:       bool    # Spew out raw data
        use_color:   bool    # Pretty things up a little, probably want to disable this if debugging


# The global config object
#
# FIXME:  Rather than pass this all over the
# place, consider channels and createThread instead of spawn.
#
var conf = Config(
    dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
    listen_port: 14866,
    listen_addr: "0.0.0.0",
    verbose: true,
    debug: false,
    use_color: true
)


proc hl( msg: string, fg: ForegroundColor, bright=false ): string =
    ## Quick wrapper for color formatting a string, since the 'terminal'
    ## module only deals with stdout directly.
    if not conf.use_color: return msg

    var color: BiggestInt = ord( fg )
    if bright: inc( color, 60 )
    result = "\e[" & $color & 'm' & msg & "\e[0m"


proc fetch_data( client: Socket ): string =
    ## Netdata JSON backend doesn't send a length, so we read line by
    ## line and wait for stream timeout to determine a "sample".
    try:
        result = client.recv_line( timeout=500 ) & "\n"
        while result != "":
            result = result & client.recv_line( timeout=500 ) & "\n"
    except TimeoutError:
        discard


proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] =
    ## Given a raw +data+ string, parse JSON and return a table of
    ## JSON samples ready for writing, keyed by timestamp. Netdata can
    ## buffer multiple samples in one batch.
    if data == "": return

    # Hash of sample timeperiods to pivoted json data
    result = init_table[ BiggestInt, JsonNode ]()

    for sample in split_lines( data ):
        if conf.debug: echo sample.hl( fgBlack, bright=true )
        if sample.len == 0: continue

        var parsed: JsonNode
        try:
            parsed = sample.parse_json
        except JsonParsingError:
            discard
            if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed )

        # Create or use existing Json object for modded data.
        #
        var pivot: JsonNode
        let key = parsed["timestamp"].get_num

        if result.has_key( key ):
            pivot = result[ key ]
        else:
            pivot = newJObject()
            result[ key ] = pivot

        var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
        pivot[ "hostname" ] = parsed[ "hostname" ]
        pivot[ name ] = parsed[ "value" ]

    return result


proc process( client: Socket, db: DBConn, conf: Config ): int =
    ## Do the work for a connected client within a thread.
    ## Returns the number of samples parsed.
    var raw_data = client.fetch_data

    # Done with the socket, netdata will automatically
    # reconnect.  Save local resources/file descriptors
    # by closing after the send is considered complete.
    #
    try:
        client.close
    except OSError:
        return

    # Pivot data and save to SQL.
    #
    var samples = parse_data( raw_data, conf )
    if samples.len != 0:
        db.exec sql( "BEGIN" )
        for timestamp, sample in samples:
            var host = sample[ "hostname" ].get_str
            sample.delete( "hostname" )
            db.exec sql( INSERT_SQL ), timestamp, host, sample
        db.exec sql( "COMMIT" )

    return samples.len


proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} =
    ## A thread that performs that dispatches processing and returns
    ## results.
    let t0 = cpu_time()
    var samples = client.process( db, conf )

    if conf.verbose:
        echo(
            hl( $samples, fgWhite, bright=true ),
            " sample(s) parsed from ",
            address.hl( fgYellow, bright=true ),
            " in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds."
            # " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used."
        )
    when defined( testing ): dumpNumberOfInstances()


proc serverloop: void =
    ## Open a database connection, bind to the listening socket,
    ## and start serving incoming netdata streams.
    let db = open( "", "", "", conf.dbopts )
    if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) )

    var server = newSocket()

    server.set_sock_opt( OptReuseAddr, true )
    server.bind_addr( Port(conf.listen_port), conf.listen_addr )
    server.listen()

    if conf.verbose:
        echo(
            "Listening for incoming connections on ".hl( fgGreen, bright=true ),
            hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ),
            ":",
            hl( $conf.listen_port, fgBlue, bright=true ),
        )
        echo ""

    while true:
        var client  = newSocket()
        var address = ""

        client.close
        server.acceptAddr( client, address ) # blocking call
        spawn runthread( client, address, db, conf )


proc atexit() {.noconv.} =
    ## Exit cleanly after waiting on any running threads.
    echo "Exiting..."
    sync()
    quit( 0 )


proc parse_cmdline: void =
    ## Populate the config object with the user's preferences.

    # always set debug mode if development build.
    conf.debug = defined( testing )

    for kind, key, val in getopt():
        case kind

        of cmdArgument:
            discard

        of cmdLongOption, cmdShortOption:
            case key
                of "debug", "d":
                    conf.debug = true

                of "no-color", "c":
                    conf.use_color = false

                of "help", "h":
                    echo USAGE
                    quit( 0 )

                of "quiet", "q":
                    conf.verbose = false
            
                of "version", "v":
                    echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true )
                    quit( 0 )
               
                of "dbopts": conf.dbopts = val
                of "listen-addr", "a": conf.listen_addr = val
                of "listen-port", "p": conf.listen_port = val.parse_int

                else: discard

        of cmdEnd: assert( false ) # shouldn't reach here ever


when isMainModule:
    system.addQuitProc( resetAttributes )
    system.addQuitProc( atexit )

    parse_cmdline()

    if conf.debug: echo hl( $conf, fgYellow )
    serverloop()