author | Mahlon E. Smith <mahlon@martini.nu> |
Sat, 12 Dec 2020 21:46:39 -0800 | |
changeset 22 | 4250addd706f |
parent 21 | a2fe9ec4cdf2 |
child 23 | 89ba1d73b232 |
child 24 | af18a10710be |
permissions | -rw-r--r-- |
8 | 1 |
# vim: set et nosta sw=4 ts=4 : |
0 | 2 |
# |
19 | 3 |
# Copyright (c) 2018-2020, Mahlon E. Smith <mahlon@martini.nu> |
0 | 4 |
# All rights reserved. |
5 |
# Redistribution and use in source and binary forms, with or without |
|
6 |
# modification, are permitted provided that the following conditions are met: |
|
7 |
# |
|
8 |
# * Redistributions of source code must retain the above copyright |
|
9 |
# notice, this list of conditions and the following disclaimer. |
|
10 |
# |
|
11 |
# * Redistributions in binary form must reproduce the above copyright |
|
12 |
# notice, this list of conditions and the following disclaimer in the |
|
13 |
# documentation and/or other materials provided with the distribution. |
|
14 |
# |
|
15 |
# * Neither the name of Mahlon E. Smith nor the names of his |
|
16 |
# contributors may be used to endorse or promote products derived |
|
17 |
# from this software without specific prior written permission. |
|
18 |
# |
|
19 |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY |
|
20 |
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
21 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
22 |
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY |
|
23 |
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
|
24 |
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
25 |
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
|
26 |
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
27 |
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
|
28 |
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
29 |
||
30 |
||
31 |
import |
|
32 |
db_postgres, |
|
33 |
json, |
|
5 | 34 |
math, |
0 | 35 |
nativesockets, |
36 |
net, |
|
16 | 37 |
parseopt, |
8 | 38 |
posix, |
0 | 39 |
strutils, |
19 | 40 |
strformat, |
0 | 41 |
tables, |
5 | 42 |
terminal, |
8 | 43 |
times |
0 | 44 |
|
45 |
||
46 |
const |
|
19 | 47 |
VERSION = "v0.3.0" |
0 | 48 |
USAGE = """ |
19 | 49 |
./netdata_tsrelay [-adDhopqtTv] |
50 |
||
51 |
-a --listen-addr: |
|
52 |
The outbound IP address to listen for netdata streams. |
|
53 |
||
54 |
-d --debug: |
|
55 |
Debug: Show incoming and parsed data. |
|
56 |
||
57 |
-D --dropconn: |
|
58 |
Drop the persistent socket to netdata between samples to conserve |
|
59 |
local resources. This may be helpful with a large number of clients. |
|
60 |
Defaults to false. |
|
61 |
||
62 |
-h --help: |
|
63 |
Help. You're lookin' at it. |
|
5 | 64 |
|
19 | 65 |
-o --dbopts: |
66 |
The PostgreSQL connection string parameters. |
|
67 |
The default connection string is: |
|
68 |
"host=localhost dbname=netdata application_name=netdata-tsrelay" |
|
69 |
||
70 |
-p --listen-port: |
|
71 |
Change the listening port from the default (14866). |
|
72 |
||
73 |
-P --persistent: |
|
74 |
Don't disconnect from the database between samples. This may be |
|
75 |
more efficient with a small number of clients, when not using a |
|
76 |
pooler, or with a very high sample size/rate. Defaults to false. |
|
0 | 77 |
|
19 | 78 |
-q --quiet: |
79 |
Quiet mode. No output at all. Ignored if -d is supplied. |
|
80 |
||
81 |
-T --dbtable: |
|
82 |
Change the destination table name from the default (netdata). |
|
83 |
||
84 |
-t --timeout: |
|
85 |
Alter the maximum time (in ms) an open socket waits for data |
|
86 |
before processing the sample. Default: 500ms. |
|
87 |
||
21
a2fe9ec4cdf2
Fix docs: exporting.conf example and help verbose/version typo.
Mahlon E. Smith <mahlon@laika.com>
parents:
19
diff
changeset
|
88 |
-v --version: |
19 | 89 |
Display version number. |
90 |
||
0 | 91 |
""" |
92 |
INSERT_SQL = """ |
|
11 | 93 |
INSERT INTO $1 |
0 | 94 |
( time, host, metrics ) |
95 |
VALUES |
|
96 |
( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? ) |
|
97 |
""" |
|
98 |
||
99 |
||
5 | 100 |
type |
101 |
Config = object of RootObj |
|
102 |
dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
|
11 | 103 |
dbtable: string # The name of the table to write to. |
19 | 104 |
dropconn: bool # Close the TCP connection between samples. |
105 |
persistent: bool # Don't close the database handle between samples. |
|
11 | 106 |
listen_port: int # The port to listen for incoming connections. |
5 | 107 |
listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. |
108 |
verbose: bool # Be informative |
|
109 |
debug: bool # Spew out raw data |
|
11 | 110 |
insertsql: string # The SQL insert string after interpolating the table name. |
111 |
timeout: int # How long to block, waiting on connection data. |
|
0 | 112 |
|
19 | 113 |
|
114 |
type |
|
115 |
NetdataClient = ref object |
|
116 |
sock: Socket # The raw socket fd |
|
117 |
address: string # The remote IP address |
|
118 |
db: DbConn # An optionally persistent database handle |
|
119 |
||
120 |
||
8 | 121 |
# Global configuration |
122 |
var conf: Config |
|
0 | 123 |
|
9
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
124 |
|
5 | 125 |
proc hl( msg: string, fg: ForegroundColor, bright=false ): string = |
126 |
## Quick wrapper for color formatting a string, since the 'terminal' |
|
127 |
## module only deals with stdout directly. |
|
8 | 128 |
if not isatty(stdout): return msg |
5 | 129 |
|
130 |
var color: BiggestInt = ord( fg ) |
|
131 |
if bright: inc( color, 60 ) |
|
132 |
result = "\e[" & $color & 'm' & msg & "\e[0m" |
|
133 |
||
134 |
||
19 | 135 |
proc fetch_data( client: NetdataClient ): string = |
136 |
## Netdata JSON backend doesn't send a length nor a separator |
|
137 |
## between samples, so we read line by line and wait for stream |
|
138 |
## timeout to determine what constitutes a sample. |
|
17
96b8799a565a
Fix command line usage docs, replace deprecated recvLine() with readLine().
Mahlon E. Smith <mahlon@martini.nu>
parents:
16
diff
changeset
|
139 |
var buf = "" |
19 | 140 |
while true: |
141 |
try: |
|
142 |
client.sock.readline( buf, timeout=conf.timeout ) |
|
143 |
if buf == "": |
|
144 |
if conf.debug: echo "Client {client.address} closed socket.".fmt.hl( fgRed, bright=true ) |
|
145 |
quit( 1 ) |
|
146 |
||
147 |
result = result & buf & "\n" |
|
148 |
||
149 |
except OSError: |
|
150 |
quit( 1 ) |
|
151 |
except TimeoutError: |
|
152 |
if result == "": continue |
|
153 |
return |
|
0 | 154 |
|
155 |
||
8 | 156 |
proc parse_data( data: string ): seq[ JsonNode ] = |
157 |
## Given a raw +data+ string, parse JSON and return a sequence |
|
158 |
## of JSON samples. Netdata can buffer multiple samples in one batch. |
|
159 |
result = @[] |
|
16 | 160 |
if data == "": return |
0 | 161 |
|
162 |
# Hash of sample timeperiods to pivoted json data |
|
8 | 163 |
var pivoted_data = init_table[ BiggestInt, JsonNode ]() |
0 | 164 |
|
165 |
for sample in split_lines( data ): |
|
16 | 166 |
if sample == "": continue |
11 | 167 |
if conf.debug: echo sample.hl( fgBlack, bright=true ) |
0 | 168 |
|
169 |
var parsed: JsonNode |
|
170 |
try: |
|
171 |
parsed = sample.parse_json |
|
172 |
except JsonParsingError: |
|
5 | 173 |
if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed ) |
8 | 174 |
continue |
175 |
if parsed.kind != JObject: return |
|
0 | 176 |
|
177 |
# Create or use existing Json object for modded data. |
|
178 |
# |
|
179 |
var pivot: JsonNode |
|
8 | 180 |
try: |
16 | 181 |
let key = parsed[ "timestamp" ].get_int |
8 | 182 |
|
183 |
if pivoted_data.has_key( key ): |
|
184 |
pivot = pivoted_data[ key ] |
|
185 |
else: |
|
186 |
pivot = newJObject() |
|
187 |
pivoted_data[ key ] = pivot |
|
0 | 188 |
|
8 | 189 |
var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str |
190 |
pivot[ "hostname" ] = parsed[ "hostname" ] |
|
191 |
pivot[ "timestamp" ] = parsed[ "timestamp" ] |
|
192 |
pivot[ name ] = parsed[ "value" ] |
|
22
4250addd706f
Support storing Netdata host labels, if they are passed to this json backend.
Mahlon E. Smith <mahlon@martini.nu>
parents:
21
diff
changeset
|
193 |
|
4250addd706f
Support storing Netdata host labels, if they are passed to this json backend.
Mahlon E. Smith <mahlon@martini.nu>
parents:
21
diff
changeset
|
194 |
if parsed.has_key( "labels" ): |
4250addd706f
Support storing Netdata host labels, if they are passed to this json backend.
Mahlon E. Smith <mahlon@martini.nu>
parents:
21
diff
changeset
|
195 |
pivot[ "labels" ] = parsed[ "labels" ] |
8 | 196 |
except: |
197 |
continue |
|
0 | 198 |
|
8 | 199 |
for timestamp, sample in pivoted_data: |
200 |
result.add( sample ) |
|
0 | 201 |
|
202 |
||
19 | 203 |
proc write_to_database( client: NetdataClient, samples: seq[ JsonNode ] ): void = |
8 | 204 |
## Given a sequence of json samples, write them to database. |
205 |
if samples.len == 0: return |
|
206 |
||
19 | 207 |
if client.db.isNil: |
208 |
client.db = open( "", "", "", conf.dbopts ) |
|
8 | 209 |
|
210 |
try: |
|
19 | 211 |
client.db.exec sql( "BEGIN" ) |
8 | 212 |
for sample in samples: |
213 |
var |
|
16 | 214 |
timestamp = sample[ "timestamp" ].get_int |
15
ed87882bb7f0
Lowercase all hostnames before sending to the database.
Mahlon E. Smith <mahlon@martini.nu>
parents:
13
diff
changeset
|
215 |
host = sample[ "hostname" ].get_str.to_lowerascii |
8 | 216 |
sample.delete( "timestamp" ) |
217 |
sample.delete( "hostname" ) |
|
19 | 218 |
client.db.exec sql( conf.insertsql ), timestamp, host, sample |
219 |
client.db.exec sql( "COMMIT" ) |
|
8 | 220 |
except: |
221 |
let |
|
222 |
e = getCurrentException() |
|
223 |
msg = getCurrentExceptionMsg() |
|
224 |
echo "Got exception ", repr(e), " while writing to DB: ", msg |
|
225 |
discard |
|
226 |
||
19 | 227 |
if not conf.persistent: |
228 |
client.db.close |
|
229 |
client.db = nil |
|
8 | 230 |
|
231 |
||
19 | 232 |
proc process( client: NetdataClient ): void = |
9
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
233 |
## Do the work for a connected client within child process. |
8 | 234 |
let t0 = cpu_time() |
0 | 235 |
var raw_data = client.fetch_data |
236 |
||
5 | 237 |
# Done with the socket, netdata will automatically |
238 |
# reconnect. Save local resources/file descriptors |
|
239 |
# by closing after the send is considered complete. |
|
240 |
# |
|
19 | 241 |
if conf.dropconn: |
242 |
try: |
|
243 |
client.sock.close |
|
244 |
except OSError: |
|
245 |
return |
|
0 | 246 |
|
8 | 247 |
# Pivot the parsed data to a single JSON blob per sample time. |
248 |
var samples = parse_data( raw_data ) |
|
19 | 249 |
client.write_to_database( samples ) |
5 | 250 |
|
251 |
if conf.verbose: |
|
19 | 252 |
let cputime = cpu_time() - t0 |
5 | 253 |
echo( |
8 | 254 |
hl( $(epochTime().to_int), fgMagenta, bright=true ), |
255 |
" ", |
|
256 |
hl( $(samples.len), fgWhite, bright=true ), |
|
5 | 257 |
" sample(s) parsed from ", |
19 | 258 |
client.address.hl( fgYellow, bright=true ), |
259 |
" in ", hl( "{cputime:<2.3f}".fmt, fgWhite, bright=true), " seconds." |
|
5 | 260 |
) |
261 |
||
0 | 262 |
|
8 | 263 |
proc serverloop( conf: Config ): void = |
0 | 264 |
## Open a database connection, bind to the listening socket, |
265 |
## and start serving incoming netdata streams. |
|
266 |
let db = open( "", "", "", conf.dbopts ) |
|
9
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
267 |
db.close |
8 | 268 |
if conf.verbose: echo( "Successfully tested connection to the backend database.".hl( fgGreen ) ) |
9
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
269 |
|
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
270 |
# Ensure children are properly reaped. |
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
271 |
# |
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
272 |
var sa: Sigaction |
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
273 |
sa.sa_handler = SIG_IGN |
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
274 |
discard sigaction( SIGCHLD, sa ) |
5 | 275 |
|
8 | 276 |
# Setup listening socket. |
277 |
# |
|
278 |
var server = newSocket() |
|
0 | 279 |
server.set_sock_opt( OptReuseAddr, true ) |
5 | 280 |
server.bind_addr( Port(conf.listen_port), conf.listen_addr ) |
0 | 281 |
server.listen() |
282 |
||
5 | 283 |
if conf.verbose: |
284 |
echo( |
|
285 |
"Listening for incoming connections on ".hl( fgGreen, bright=true ), |
|
286 |
hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ), |
|
287 |
":", |
|
288 |
hl( $conf.listen_port, fgBlue, bright=true ), |
|
289 |
) |
|
290 |
echo "" |
|
291 |
||
8 | 292 |
# Wait for incoming connections, fork for each client. |
293 |
# |
|
0 | 294 |
while true: |
19 | 295 |
let client = NetdataClient.new |
296 |
client.sock = Socket.new |
|
6
1f366fc61592
Each incoming connection requires its own client socket.
Mahlon E. Smith <mahlon@laika.com>
parents:
5
diff
changeset
|
297 |
|
9
aa9d537f7067
Properly reap child processes.
Mahlon E. Smith <mahlon@laika.com>
parents:
8
diff
changeset
|
298 |
# Block, waiting for new connections. |
19 | 299 |
server.acceptAddr( client.sock, client.address ) |
8 | 300 |
|
301 |
if fork() == 0: |
|
302 |
server.close |
|
19 | 303 |
if conf.dropconn: |
304 |
# "one shot" mode. |
|
305 |
client.process |
|
306 |
quit( 0 ) |
|
307 |
else: |
|
308 |
# Keep the connection to netdata open. |
|
309 |
while true: client.process |
|
7
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
310 |
|
19 | 311 |
client.sock.close |
7
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
312 |
when defined( testing ): dumpNumberOfInstances() |
0 | 313 |
|
314 |
||
8 | 315 |
proc parse_cmdline: Config = |
0 | 316 |
## Populate the config object with the user's preferences. |
5 | 317 |
|
8 | 318 |
# Config object defaults. |
319 |
# |
|
320 |
result = Config( |
|
13
e1777929ba15
Remove port and user from the default dbopts, so they instead use the postgresql behavioral default.
Mahlon E. Smith <mahlon@laika.com>
parents:
11
diff
changeset
|
321 |
dbopts: "host=localhost dbname=netdata application_name=netdata-tsrelay", |
11 | 322 |
dbtable: "netdata", |
19 | 323 |
dropconn: false, |
8 | 324 |
listen_port: 14866, |
325 |
listen_addr: "0.0.0.0", |
|
326 |
verbose: true, |
|
11 | 327 |
debug: false, |
328 |
timeout: 500, |
|
19 | 329 |
persistent: false, |
11 | 330 |
insertsql: INSERT_SQL % [ "netdata" ] |
8 | 331 |
) |
332 |
||
5 | 333 |
# always set debug mode if development build. |
8 | 334 |
result.debug = defined( testing ) |
5 | 335 |
|
0 | 336 |
for kind, key, val in getopt(): |
337 |
case kind |
|
338 |
||
339 |
of cmdArgument: |
|
340 |
discard |
|
341 |
||
342 |
of cmdLongOption, cmdShortOption: |
|
343 |
case key |
|
5 | 344 |
of "debug", "d": |
8 | 345 |
result.debug = true |
5 | 346 |
|
19 | 347 |
of "dropconn", "D": |
348 |
if result.persistent: |
|
349 |
echo "Dropping TCP sockets are incompatible with persistent database connections." |
|
350 |
quit( 1 ) |
|
351 |
result.dropconn = true |
|
352 |
||
0 | 353 |
of "help", "h": |
354 |
echo USAGE |
|
355 |
quit( 0 ) |
|
5 | 356 |
|
357 |
of "quiet", "q": |
|
8 | 358 |
result.verbose = false |
11 | 359 |
|
0 | 360 |
of "version", "v": |
5 | 361 |
echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true ) |
0 | 362 |
quit( 0 ) |
11 | 363 |
|
364 |
of "timeout", "t": result.timeout = val.parse_int |
|
365 |
||
366 |
of "dbtable", "T": |
|
367 |
result.insertsql = INSERT_SQL % [ val ] |
|
19 | 368 |
of "dbopts", "o": result.dbopts = val |
11 | 369 |
|
8 | 370 |
of "listen-addr", "a": result.listen_addr = val |
371 |
of "listen-port", "p": result.listen_port = val.parse_int |
|
0 | 372 |
|
19 | 373 |
of "persistent", "P": |
374 |
if result.dropconn: |
|
375 |
echo "Persistent database connections are incompatible with dropping TCP sockets." |
|
376 |
quit( 1 ) |
|
377 |
result.persistent = true |
|
378 |
||
0 | 379 |
else: discard |
380 |
||
381 |
of cmdEnd: assert( false ) # shouldn't reach here ever |
|
382 |
||
383 |
||
384 |
when isMainModule: |
|
5 | 385 |
system.addQuitProc( resetAttributes ) |
8 | 386 |
conf = parse_cmdline() |
7
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
387 |
if conf.debug: echo hl( $conf, fgYellow ) |
8 | 388 |
serverloop( conf ) |
0 | 389 |