author | Mahlon E. Smith <mahlon@laika.com> |
Thu, 15 Feb 2018 10:29:37 -0800 | |
changeset 7 | c0bcf3bea772 |
parent 6 | 1f366fc61592 |
child 8 | 1ef3f2d6d10e |
permissions | -rw-r--r-- |
0 | 1 |
# vim: set et nosta sw=4 ts=4 ft=nim : |
2 |
# |
|
3 |
# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu> |
|
4 |
# All rights reserved. |
|
5 |
# Redistribution and use in source and binary forms, with or without |
|
6 |
# modification, are permitted provided that the following conditions are met: |
|
7 |
# |
|
8 |
# * Redistributions of source code must retain the above copyright |
|
9 |
# notice, this list of conditions and the following disclaimer. |
|
10 |
# |
|
11 |
# * Redistributions in binary form must reproduce the above copyright |
|
12 |
# notice, this list of conditions and the following disclaimer in the |
|
13 |
# documentation and/or other materials provided with the distribution. |
|
14 |
# |
|
15 |
# * Neither the name of Mahlon E. Smith nor the names of his |
|
16 |
# contributors may be used to endorse or promote products derived |
|
17 |
# from this software without specific prior written permission. |
|
18 |
# |
|
19 |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY |
|
20 |
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
21 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
22 |
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY |
|
23 |
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
|
24 |
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
25 |
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
|
26 |
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
27 |
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
|
28 |
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
29 |
||
30 |
||
31 |
import |
|
32 |
db_postgres, |
|
33 |
json, |
|
5 | 34 |
math, |
0 | 35 |
nativesockets, |
36 |
net, |
|
5 | 37 |
os, |
0 | 38 |
parseopt2, |
39 |
strutils, |
|
40 |
tables, |
|
5 | 41 |
terminal, |
42 |
times, |
|
0 | 43 |
threadpool |
44 |
||
45 |
||
46 |
const |
|
47 |
VERSION = "v0.1.0" |
|
48 |
USAGE = """ |
|
5 | 49 |
./netdata_tsrelay [-q][-v][-h] --dbopts="[PostgreSQL connection string]" --listen-port=14866 --listen-addr=0.0.0.0 |
50 |
||
51 |
-q: Quiet mode. No output at all. Ignored if -d is supplied. |
|
52 |
-c: Suppress ANSI color output. |
|
53 |
-d: Debug: Show incoming and parsed data. |
|
54 |
-v: Display version number. |
|
55 |
-h: Help. You're lookin' at it. |
|
0 | 56 |
|
57 |
The default connection string is: |
|
58 |
"host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay" |
|
59 |
""" |
|
60 |
INSERT_SQL = """ |
|
61 |
INSERT INTO netdata |
|
62 |
( time, host, metrics ) |
|
63 |
VALUES |
|
64 |
( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? ) |
|
65 |
""" |
|
66 |
||
67 |
||
5 | 68 |
type |
69 |
Config = object of RootObj |
|
70 |
dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
|
71 |
listen_port: int # The port to listen for incoming connections |
|
72 |
listen_addr: string # The IP address listen for incoming connections. Defaults to inaddr_any. |
|
73 |
verbose: bool # Be informative |
|
74 |
debug: bool # Spew out raw data |
|
75 |
use_color: bool # Pretty things up a little, probably want to disable this if debugging |
|
0 | 76 |
|
5 | 77 |
|
78 |
# The global config object |
|
79 |
# |
|
80 |
# FIXME: Rather than pass this all over the |
|
81 |
# place, consider channels and createThread instead of spawn. |
|
0 | 82 |
# |
83 |
var conf = Config( |
|
84 |
dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", |
|
5 | 85 |
listen_port: 14866, |
86 |
listen_addr: "0.0.0.0", |
|
87 |
verbose: true, |
|
88 |
debug: false, |
|
89 |
use_color: true |
|
0 | 90 |
) |
91 |
||
92 |
||
5 | 93 |
proc hl( msg: string, fg: ForegroundColor, bright=false ): string = |
94 |
## Quick wrapper for color formatting a string, since the 'terminal' |
|
95 |
## module only deals with stdout directly. |
|
96 |
if not conf.use_color: return msg |
|
97 |
||
98 |
var color: BiggestInt = ord( fg ) |
|
99 |
if bright: inc( color, 60 ) |
|
100 |
result = "\e[" & $color & 'm' & msg & "\e[0m" |
|
101 |
||
102 |
||
0 | 103 |
proc fetch_data( client: Socket ): string = |
104 |
## Netdata JSON backend doesn't send a length, so we read line by |
|
105 |
## line and wait for stream timeout to determine a "sample". |
|
106 |
try: |
|
107 |
result = client.recv_line( timeout=500 ) & "\n" |
|
108 |
while result != "": |
|
109 |
result = result & client.recv_line( timeout=500 ) & "\n" |
|
110 |
except TimeoutError: |
|
111 |
discard |
|
112 |
||
113 |
||
5 | 114 |
proc parse_data( data: string, conf: Config ): Table[ BiggestInt, JsonNode ] = |
0 | 115 |
## Given a raw +data+ string, parse JSON and return a table of |
116 |
## JSON samples ready for writing, keyed by timestamp. Netdata can |
|
117 |
## buffer multiple samples in one batch. |
|
118 |
if data == "": return |
|
119 |
||
120 |
# Hash of sample timeperiods to pivoted json data |
|
121 |
result = init_table[ BiggestInt, JsonNode ]() |
|
122 |
||
123 |
for sample in split_lines( data ): |
|
5 | 124 |
if conf.debug: echo sample.hl( fgBlack, bright=true ) |
0 | 125 |
if sample.len == 0: continue |
126 |
||
127 |
var parsed: JsonNode |
|
128 |
try: |
|
129 |
parsed = sample.parse_json |
|
130 |
except JsonParsingError: |
|
5 | 131 |
discard |
132 |
if conf.debug: echo hl( "Unable to parse sample line: " & sample.hl(fgRed, bright=true), fgRed ) |
|
0 | 133 |
|
134 |
# Create or use existing Json object for modded data. |
|
135 |
# |
|
136 |
var pivot: JsonNode |
|
137 |
let key = parsed["timestamp"].get_num |
|
138 |
||
139 |
if result.has_key( key ): |
|
140 |
pivot = result[ key ] |
|
141 |
else: |
|
142 |
pivot = newJObject() |
|
143 |
result[ key ] = pivot |
|
144 |
||
145 |
var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str |
|
146 |
pivot[ "hostname" ] = parsed[ "hostname" ] |
|
147 |
pivot[ name ] = parsed[ "value" ] |
|
148 |
||
149 |
return result |
|
150 |
||
151 |
||
5 | 152 |
proc process( client: Socket, db: DBConn, conf: Config ): int = |
0 | 153 |
## Do the work for a connected client within a thread. |
5 | 154 |
## Returns the number of samples parsed. |
0 | 155 |
var raw_data = client.fetch_data |
156 |
||
5 | 157 |
# Done with the socket, netdata will automatically |
158 |
# reconnect. Save local resources/file descriptors |
|
159 |
# by closing after the send is considered complete. |
|
160 |
# |
|
0 | 161 |
try: |
162 |
client.close |
|
163 |
except OSError: |
|
164 |
return |
|
165 |
||
5 | 166 |
# Pivot data and save to SQL. |
167 |
# |
|
168 |
var samples = parse_data( raw_data, conf ) |
|
169 |
if samples.len != 0: |
|
170 |
db.exec sql( "BEGIN" ) |
|
171 |
for timestamp, sample in samples: |
|
172 |
var host = sample[ "hostname" ].get_str |
|
173 |
sample.delete( "hostname" ) |
|
174 |
db.exec sql( INSERT_SQL ), timestamp, host, sample |
|
175 |
db.exec sql( "COMMIT" ) |
|
176 |
||
177 |
return samples.len |
|
178 |
||
4
f3d83bdd7877
Wrap commits in a transaction.
Mahlon E. Smith <mahlon@laika.com>
parents:
1
diff
changeset
|
179 |
|
5 | 180 |
proc runthread( client: Socket, address: string, db: DBConn, conf: Config ): void {.thread.} = |
181 |
## A thread that performs that dispatches processing and returns |
|
182 |
## results. |
|
183 |
let t0 = cpu_time() |
|
184 |
var samples = client.process( db, conf ) |
|
185 |
||
186 |
if conf.verbose: |
|
187 |
echo( |
|
188 |
hl( $samples, fgWhite, bright=true ), |
|
189 |
" sample(s) parsed from ", |
|
190 |
address.hl( fgYellow, bright=true ), |
|
191 |
" in ", hl($( round(cpu_time() - t0, 3) ), fgWhite, bright=true), " seconds." |
|
192 |
# " ", hl($(round((get_occupied_mem()/1024/1024),1)), fgWhite, bright=true), "MB memory used." |
|
193 |
) |
|
194 |
||
0 | 195 |
|
196 |
proc serverloop: void = |
|
197 |
## Open a database connection, bind to the listening socket, |
|
198 |
## and start serving incoming netdata streams. |
|
199 |
let db = open( "", "", "", conf.dbopts ) |
|
5 | 200 |
if conf.verbose: echo( "Successfully connected to the backend database.".hl( fgGreen ) ) |
201 |
||
7
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
202 |
var |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
203 |
conn_count = 0 |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
204 |
server = newSocket() |
5 | 205 |
|
0 | 206 |
server.set_sock_opt( OptReuseAddr, true ) |
5 | 207 |
server.bind_addr( Port(conf.listen_port), conf.listen_addr ) |
0 | 208 |
server.listen() |
209 |
||
5 | 210 |
if conf.verbose: |
211 |
echo( |
|
212 |
"Listening for incoming connections on ".hl( fgGreen, bright=true ), |
|
213 |
hl( (if conf.listen_addr == "0.0.0.0": "*" else: conf.listen_addr) , fgBlue, bright=true ), |
|
214 |
":", |
|
215 |
hl( $conf.listen_port, fgBlue, bright=true ), |
|
216 |
) |
|
217 |
echo "" |
|
218 |
||
0 | 219 |
while true: |
6
1f366fc61592
Each incoming connection requires its own client socket.
Mahlon E. Smith <mahlon@laika.com>
parents:
5
diff
changeset
|
220 |
var client = newSocket() |
5 | 221 |
var address = "" |
6
1f366fc61592
Each incoming connection requires its own client socket.
Mahlon E. Smith <mahlon@laika.com>
parents:
5
diff
changeset
|
222 |
|
7
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
223 |
# Force a garbage collection pass. |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
224 |
# |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
225 |
conn_count = conn_count + 1 |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
226 |
if conn_count == 25: |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
227 |
when defined( testing ): echo "Forcing GC pass." |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
228 |
GC_full_collect() |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
229 |
conn_count = 1 |
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
230 |
|
6
1f366fc61592
Each incoming connection requires its own client socket.
Mahlon E. Smith <mahlon@laika.com>
parents:
5
diff
changeset
|
231 |
client.close |
5 | 232 |
server.acceptAddr( client, address ) # blocking call |
233 |
spawn runthread( client, address, db, conf ) |
|
7
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
234 |
when defined( testing ): dumpNumberOfInstances() |
0 | 235 |
|
236 |
||
237 |
proc parse_cmdline: void = |
|
238 |
## Populate the config object with the user's preferences. |
|
5 | 239 |
|
240 |
# always set debug mode if development build. |
|
241 |
conf.debug = defined( testing ) |
|
242 |
||
0 | 243 |
for kind, key, val in getopt(): |
244 |
case kind |
|
245 |
||
246 |
of cmdArgument: |
|
247 |
discard |
|
248 |
||
249 |
of cmdLongOption, cmdShortOption: |
|
250 |
case key |
|
5 | 251 |
of "debug", "d": |
252 |
conf.debug = true |
|
253 |
||
254 |
of "no-color", "c": |
|
255 |
conf.use_color = false |
|
256 |
||
0 | 257 |
of "help", "h": |
258 |
echo USAGE |
|
259 |
quit( 0 ) |
|
5 | 260 |
|
261 |
of "quiet", "q": |
|
262 |
conf.verbose = false |
|
0 | 263 |
|
264 |
of "version", "v": |
|
5 | 265 |
echo hl( "netdata_tsrelay " & VERSION, fgWhite, bright=true ) |
0 | 266 |
quit( 0 ) |
267 |
||
268 |
of "dbopts": conf.dbopts = val |
|
5 | 269 |
of "listen-addr", "a": conf.listen_addr = val |
0 | 270 |
of "listen-port", "p": conf.listen_port = val.parse_int |
271 |
||
272 |
else: discard |
|
273 |
||
274 |
of cmdEnd: assert( false ) # shouldn't reach here ever |
|
275 |
||
276 |
||
277 |
when isMainModule: |
|
5 | 278 |
system.addQuitProc( resetAttributes ) |
279 |
||
0 | 280 |
parse_cmdline() |
7
c0bcf3bea772
Force a GC pass after 25 cycles. Don't bother with sync() at exit, unnecessary.
Mahlon E. Smith <mahlon@laika.com>
parents:
6
diff
changeset
|
281 |
if conf.debug: echo hl( $conf, fgYellow ) |
5 | 282 |
|
0 | 283 |
serverloop() |
284 |