netdata_tsrelay.nim
changeset 0 72c9c6f0b713
child 1 160338bb2822
equal deleted inserted replaced
-1:000000000000 0:72c9c6f0b713
       
     1 # vim: set et nosta sw=4 ts=4 ft=nim : 
       
     2 #
       
     3 # Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
       
     4 # All rights reserved.
       
     5 # Redistribution and use in source and binary forms, with or without
       
     6 # modification, are permitted provided that the following conditions are met:
       
     7 #
       
     8 #     * Redistributions of source code must retain the above copyright
       
     9 #       notice, this list of conditions and the following disclaimer.
       
    10 #
       
    11 #     * Redistributions in binary form must reproduce the above copyright
       
    12 #       notice, this list of conditions and the following disclaimer in the
       
    13 #       documentation and/or other materials provided with the distribution.
       
    14 #
       
    15 #     * Neither the name of Mahlon E. Smith nor the names of his
       
    16 #       contributors may be used to endorse or promote products derived
       
    17 #       from this software without specific prior written permission.
       
    18 #
       
    19 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
       
    20 # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
       
    21 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
       
    22 # DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
       
    23 # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
       
    24 # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
       
    25 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
       
    26 # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
       
    27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
       
    28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
       
    29 
       
    30 
       
    31 import
       
    32     db_postgres,
       
    33     json,
       
    34     nativesockets,
       
    35     net,
       
    36     parseopt2,
       
    37     strutils,
       
    38     tables,
       
    39     threadpool
       
    40 
       
    41 
       
    42 const
       
    43     VERSION = "v0.1.0"
       
    44     USAGE = """
       
    45 ./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866
       
    46 
       
    47 The default connection string is:
       
    48   "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
       
    49     """
       
    50     INSERT_SQL = """
       
    51     INSERT INTO netdata
       
    52         ( time, host, metrics )
       
    53     VALUES
       
    54         ( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
       
    55     """
       
    56 
       
    57 
       
    58 type Config = object of RootObj
       
    59     dbopts:      string  # The postgresql connection parameters.  (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
       
    60     listen_port: int     # The port to listen for incoming connections
       
    61 
       
    62 # Global config object
       
    63 #
       
    64 var conf = Config(
       
    65     dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
       
    66     listen_port: 14866
       
    67 )
       
    68 
       
    69 
       
    70 proc fetch_data( client: Socket ): string =
       
    71     ## Netdata JSON backend doesn't send a length, so we read line by
       
    72     ## line and wait for stream timeout to determine a "sample".
       
    73     try:
       
    74         result = client.recv_line( timeout=500 ) & "\n"
       
    75         while result != "":
       
    76             result = result & client.recv_line( timeout=500 ) & "\n"
       
    77     except TimeoutError:
       
    78         discard
       
    79 
       
    80 
       
    81 proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] =
       
    82     ## Given a raw +data+ string, parse JSON and return a table of
       
    83     ## JSON samples ready for writing, keyed by timestamp. Netdata can
       
    84     ## buffer multiple samples in one batch.
       
    85     if data == "": return
       
    86 
       
    87     # Hash of sample timeperiods to pivoted json data
       
    88     result = init_table[ BiggestInt, JsonNode ]()
       
    89 
       
    90     for sample in split_lines( data ):
       
    91         if defined( testing ): echo sample
       
    92         if sample.len == 0: continue
       
    93 
       
    94         var parsed: JsonNode
       
    95         try:
       
    96             parsed = sample.parse_json
       
    97         except JsonParsingError:
       
    98             if defined( testing ): echo "Unable to parse sample line: " & sample
       
    99 
       
   100         # Create or use existing Json object for modded data.
       
   101         #
       
   102         var pivot: JsonNode
       
   103         let key = parsed["timestamp"].get_num
       
   104 
       
   105         if result.has_key( key ):
       
   106             pivot = result[ key ]
       
   107         else:
       
   108             pivot = newJObject()
       
   109             result[ key ] = pivot
       
   110 
       
   111         var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
       
   112         pivot[ "hostname" ] = parsed[ "hostname" ]
       
   113         pivot[ name ] = parsed[ "value" ]
       
   114 
       
   115     if defined( testing ): echo $result.len & " samples"
       
   116 
       
   117     return result
       
   118 
       
   119 
       
   120 proc process( client: Socket, db: DBConn ): void =
       
   121     ## Do the work for a connected client within a thread.
       
   122     var raw_data = client.fetch_data
       
   123 
       
   124     try:
       
   125         if defined( testing ):
       
   126             echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0]
       
   127         client.close
       
   128     except OSError:
       
   129         return
       
   130 
       
   131     var samples = parse_data( raw_data )
       
   132     for timestamp, sample in samples:
       
   133         var host = sample[ "hostname" ].get_str
       
   134         sample.delete( "hostname" )
       
   135         db.exec sql( INSERT_SQL ), timestamp, host, sample
       
   136 
       
   137 proc serverloop: void =
       
   138     ## Open a database connection, bind to the listening socket,
       
   139     ## and start serving incoming netdata streams.
       
   140     let db = open( "", "", "", conf.dbopts )
       
   141     echo "Successfully connected to the backend database."
       
   142     #SELECT TIMESTAMP WITH TIME ZONE 'epoch' + 982384720 * INTERVAL '1 second';'
       
   143     #db.close
       
   144     
       
   145     var server = newSocket()
       
   146     echo "Listening for incoming connections on port ", conf.listen_port, "..."
       
   147     server.set_sock_opt( OptReuseAddr, true )
       
   148     server.bind_addr( Port(conf.listen_port) )
       
   149     server.listen()
       
   150 
       
   151     while true:
       
   152         var
       
   153             client  = newSocket()
       
   154             address = ""
       
   155 
       
   156         server.acceptAddr( client, address )
       
   157         echo "New connection: " & address
       
   158         spawn client.process( db )
       
   159 
       
   160 
       
   161 proc parse_cmdline: void =
       
   162     ## Populate the config object with the user's preferences.
       
   163     for kind, key, val in getopt():
       
   164         case kind
       
   165 
       
   166         of cmdArgument:
       
   167             discard
       
   168 
       
   169         of cmdLongOption, cmdShortOption:
       
   170             case key
       
   171                 of "help", "h":
       
   172                     echo USAGE
       
   173                     quit( 0 )
       
   174             
       
   175                 of "version", "v":
       
   176                     echo "netdata_tsrelay ", VERSION
       
   177                     quit( 0 )
       
   178                
       
   179                 of "dbopts": conf.dbopts = val
       
   180                 of "listen-port", "p": conf.listen_port = val.parse_int
       
   181 
       
   182                 else: discard
       
   183 
       
   184         of cmdEnd: assert( false ) # shouldn't reach here ever
       
   185 
       
   186 
       
   187 when isMainModule:
       
   188     parse_cmdline()
       
   189     if defined( testing ): echo conf
       
   190     serverloop()
       
   191