0
|
1 |
# vim: set et nosta sw=4 ts=4 ft=nim :
|
|
2 |
#
|
|
3 |
# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu>
|
|
4 |
# All rights reserved.
|
|
5 |
# Redistribution and use in source and binary forms, with or without
|
|
6 |
# modification, are permitted provided that the following conditions are met:
|
|
7 |
#
|
|
8 |
# * Redistributions of source code must retain the above copyright
|
|
9 |
# notice, this list of conditions and the following disclaimer.
|
|
10 |
#
|
|
11 |
# * Redistributions in binary form must reproduce the above copyright
|
|
12 |
# notice, this list of conditions and the following disclaimer in the
|
|
13 |
# documentation and/or other materials provided with the distribution.
|
|
14 |
#
|
|
15 |
# * Neither the name of Mahlon E. Smith nor the names of his
|
|
16 |
# contributors may be used to endorse or promote products derived
|
|
17 |
# from this software without specific prior written permission.
|
|
18 |
#
|
|
19 |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
|
|
20 |
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
21 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
22 |
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
|
|
23 |
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
|
24 |
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
25 |
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
|
26 |
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
27 |
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
28 |
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
29 |
|
|
30 |
|
|
31 |
import
|
|
32 |
db_postgres,
|
|
33 |
json,
|
|
34 |
nativesockets,
|
|
35 |
net,
|
|
36 |
parseopt2,
|
|
37 |
strutils,
|
|
38 |
tables,
|
|
39 |
threadpool
|
|
40 |
|
|
41 |
|
|
42 |
const
|
|
43 |
VERSION = "v0.1.0"
|
|
44 |
USAGE = """
|
|
45 |
./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866
|
|
46 |
|
|
47 |
The default connection string is:
|
|
48 |
"host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay"
|
|
49 |
"""
|
|
50 |
INSERT_SQL = """
|
|
51 |
INSERT INTO netdata
|
|
52 |
( time, host, metrics )
|
|
53 |
VALUES
|
|
54 |
( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? )
|
|
55 |
"""
|
|
56 |
|
|
57 |
|
|
58 |
type Config = object of RootObj
|
|
59 |
dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html)
|
|
60 |
listen_port: int # The port to listen for incoming connections
|
|
61 |
|
|
62 |
# Global config object
|
|
63 |
#
|
|
64 |
var conf = Config(
|
|
65 |
dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay",
|
|
66 |
listen_port: 14866
|
|
67 |
)
|
|
68 |
|
|
69 |
|
|
70 |
proc fetch_data( client: Socket ): string =
|
|
71 |
## Netdata JSON backend doesn't send a length, so we read line by
|
|
72 |
## line and wait for stream timeout to determine a "sample".
|
|
73 |
try:
|
|
74 |
result = client.recv_line( timeout=500 ) & "\n"
|
|
75 |
while result != "":
|
|
76 |
result = result & client.recv_line( timeout=500 ) & "\n"
|
|
77 |
except TimeoutError:
|
|
78 |
discard
|
|
79 |
|
|
80 |
|
|
81 |
proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] =
|
|
82 |
## Given a raw +data+ string, parse JSON and return a table of
|
|
83 |
## JSON samples ready for writing, keyed by timestamp. Netdata can
|
|
84 |
## buffer multiple samples in one batch.
|
|
85 |
if data == "": return
|
|
86 |
|
|
87 |
# Hash of sample timeperiods to pivoted json data
|
|
88 |
result = init_table[ BiggestInt, JsonNode ]()
|
|
89 |
|
|
90 |
for sample in split_lines( data ):
|
|
91 |
if defined( testing ): echo sample
|
|
92 |
if sample.len == 0: continue
|
|
93 |
|
|
94 |
var parsed: JsonNode
|
|
95 |
try:
|
|
96 |
parsed = sample.parse_json
|
|
97 |
except JsonParsingError:
|
|
98 |
if defined( testing ): echo "Unable to parse sample line: " & sample
|
|
99 |
|
|
100 |
# Create or use existing Json object for modded data.
|
|
101 |
#
|
|
102 |
var pivot: JsonNode
|
|
103 |
let key = parsed["timestamp"].get_num
|
|
104 |
|
|
105 |
if result.has_key( key ):
|
|
106 |
pivot = result[ key ]
|
|
107 |
else:
|
|
108 |
pivot = newJObject()
|
|
109 |
result[ key ] = pivot
|
|
110 |
|
|
111 |
var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str
|
|
112 |
pivot[ "hostname" ] = parsed[ "hostname" ]
|
|
113 |
pivot[ name ] = parsed[ "value" ]
|
|
114 |
|
|
115 |
if defined( testing ): echo $result.len & " samples"
|
|
116 |
|
|
117 |
return result
|
|
118 |
|
|
119 |
|
|
120 |
proc process( client: Socket, db: DBConn ): void =
|
|
121 |
## Do the work for a connected client within a thread.
|
|
122 |
var raw_data = client.fetch_data
|
|
123 |
|
|
124 |
try:
|
|
125 |
if defined( testing ):
|
|
126 |
echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0]
|
|
127 |
client.close
|
|
128 |
except OSError:
|
|
129 |
return
|
|
130 |
|
|
131 |
var samples = parse_data( raw_data )
|
|
132 |
for timestamp, sample in samples:
|
|
133 |
var host = sample[ "hostname" ].get_str
|
|
134 |
sample.delete( "hostname" )
|
|
135 |
db.exec sql( INSERT_SQL ), timestamp, host, sample
|
|
136 |
|
|
137 |
proc serverloop: void =
|
|
138 |
## Open a database connection, bind to the listening socket,
|
|
139 |
## and start serving incoming netdata streams.
|
|
140 |
let db = open( "", "", "", conf.dbopts )
|
|
141 |
echo "Successfully connected to the backend database."
|
|
142 |
|
|
143 |
var server = newSocket()
|
|
144 |
echo "Listening for incoming connections on port ", conf.listen_port, "..."
|
|
145 |
server.set_sock_opt( OptReuseAddr, true )
|
|
146 |
server.bind_addr( Port(conf.listen_port) )
|
|
147 |
server.listen()
|
|
148 |
|
|
149 |
while true:
|
|
150 |
var
|
|
151 |
client = newSocket()
|
|
152 |
address = ""
|
|
153 |
|
|
154 |
server.acceptAddr( client, address )
|
|
155 |
echo "New connection: " & address
|
|
156 |
spawn client.process( db )
|
|
157 |
|
|
158 |
|
|
159 |
proc parse_cmdline: void =
|
|
160 |
## Populate the config object with the user's preferences.
|
|
161 |
for kind, key, val in getopt():
|
|
162 |
case kind
|
|
163 |
|
|
164 |
of cmdArgument:
|
|
165 |
discard
|
|
166 |
|
|
167 |
of cmdLongOption, cmdShortOption:
|
|
168 |
case key
|
|
169 |
of "help", "h":
|
|
170 |
echo USAGE
|
|
171 |
quit( 0 )
|
|
172 |
|
|
173 |
of "version", "v":
|
|
174 |
echo "netdata_tsrelay ", VERSION
|
|
175 |
quit( 0 )
|
|
176 |
|
|
177 |
of "dbopts": conf.dbopts = val
|
|
178 |
of "listen-port", "p": conf.listen_port = val.parse_int
|
|
179 |
|
|
180 |
else: discard
|
|
181 |
|
|
182 |
of cmdEnd: assert( false ) # shouldn't reach here ever
|
|
183 |
|
|
184 |
|
|
185 |
when isMainModule:
|
|
186 |
parse_cmdline()
|
|
187 |
if defined( testing ): echo conf
|
|
188 |
serverloop()
|
|
189 |
|