author | Mahlon E. Smith <mahlon@laika.com> |
Mon, 12 Feb 2018 13:25:26 -0800 | |
changeset 4 | f3d83bdd7877 |
parent 1 | 160338bb2822 |
child 5 | a1276c3d39eb |
permissions | -rw-r--r-- |
0 | 1 |
# vim: set et nosta sw=4 ts=4 ft=nim : |
2 |
# |
|
3 |
# Copyright (c) 2018, Mahlon E. Smith <mahlon@martini.nu> |
|
4 |
# All rights reserved. |
|
5 |
# Redistribution and use in source and binary forms, with or without |
|
6 |
# modification, are permitted provided that the following conditions are met: |
|
7 |
# |
|
8 |
# * Redistributions of source code must retain the above copyright |
|
9 |
# notice, this list of conditions and the following disclaimer. |
|
10 |
# |
|
11 |
# * Redistributions in binary form must reproduce the above copyright |
|
12 |
# notice, this list of conditions and the following disclaimer in the |
|
13 |
# documentation and/or other materials provided with the distribution. |
|
14 |
# |
|
15 |
# * Neither the name of Mahlon E. Smith nor the names of his |
|
16 |
# contributors may be used to endorse or promote products derived |
|
17 |
# from this software without specific prior written permission. |
|
18 |
# |
|
19 |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY |
|
20 |
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
21 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
22 |
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY |
|
23 |
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
|
24 |
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
25 |
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
|
26 |
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
27 |
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
|
28 |
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
29 |
||
30 |
||
31 |
import |
|
32 |
db_postgres, |
|
33 |
json, |
|
34 |
nativesockets, |
|
35 |
net, |
|
36 |
parseopt2, |
|
37 |
strutils, |
|
38 |
tables, |
|
39 |
threadpool |
|
40 |
||
41 |
||
42 |
const |
|
43 |
VERSION = "v0.1.0" |
|
44 |
USAGE = """ |
|
45 |
./netdata_tsrelay --dbopts="[PostgreSQL connection string]" --listen-port=14866 |
|
46 |
||
47 |
The default connection string is: |
|
48 |
"host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay" |
|
49 |
""" |
|
50 |
INSERT_SQL = """ |
|
51 |
INSERT INTO netdata |
|
52 |
( time, host, metrics ) |
|
53 |
VALUES |
|
54 |
( 'epoch'::timestamptz + ? * '1 second'::interval, ?, ? ) |
|
55 |
""" |
|
56 |
||
57 |
||
58 |
type Config = object of RootObj |
|
59 |
dbopts: string # The postgresql connection parameters. (See https://www.postgresql.org/docs/current/static/libpq-connect.html) |
|
60 |
listen_port: int # The port to listen for incoming connections |
|
61 |
||
62 |
# Global config object |
|
63 |
# |
|
64 |
var conf = Config( |
|
65 |
dbopts: "host=localhost port=5432 dbname=netdata user=netdata application_name=netdata-tsrelay", |
|
66 |
listen_port: 14866 |
|
67 |
) |
|
68 |
||
69 |
||
70 |
proc fetch_data( client: Socket ): string = |
|
71 |
## Netdata JSON backend doesn't send a length, so we read line by |
|
72 |
## line and wait for stream timeout to determine a "sample". |
|
73 |
try: |
|
74 |
result = client.recv_line( timeout=500 ) & "\n" |
|
75 |
while result != "": |
|
76 |
result = result & client.recv_line( timeout=500 ) & "\n" |
|
77 |
except TimeoutError: |
|
78 |
discard |
|
79 |
||
80 |
||
81 |
proc parse_data( data: string ): Table[ BiggestInt, JsonNode ] = |
|
82 |
## Given a raw +data+ string, parse JSON and return a table of |
|
83 |
## JSON samples ready for writing, keyed by timestamp. Netdata can |
|
84 |
## buffer multiple samples in one batch. |
|
85 |
if data == "": return |
|
86 |
||
87 |
# Hash of sample timeperiods to pivoted json data |
|
88 |
result = init_table[ BiggestInt, JsonNode ]() |
|
89 |
||
90 |
for sample in split_lines( data ): |
|
91 |
if defined( testing ): echo sample |
|
92 |
if sample.len == 0: continue |
|
93 |
||
94 |
var parsed: JsonNode |
|
95 |
try: |
|
96 |
parsed = sample.parse_json |
|
97 |
except JsonParsingError: |
|
98 |
if defined( testing ): echo "Unable to parse sample line: " & sample |
|
99 |
||
100 |
# Create or use existing Json object for modded data. |
|
101 |
# |
|
102 |
var pivot: JsonNode |
|
103 |
let key = parsed["timestamp"].get_num |
|
104 |
||
105 |
if result.has_key( key ): |
|
106 |
pivot = result[ key ] |
|
107 |
else: |
|
108 |
pivot = newJObject() |
|
109 |
result[ key ] = pivot |
|
110 |
||
111 |
var name = parsed[ "chart_id" ].get_str & "." & parsed[ "id" ].get_str |
|
112 |
pivot[ "hostname" ] = parsed[ "hostname" ] |
|
113 |
pivot[ name ] = parsed[ "value" ] |
|
114 |
||
115 |
if defined( testing ): echo $result.len & " samples" |
|
116 |
||
117 |
return result |
|
118 |
||
119 |
||
120 |
proc process( client: Socket, db: DBConn ): void = |
|
121 |
## Do the work for a connected client within a thread. |
|
122 |
var raw_data = client.fetch_data |
|
123 |
||
124 |
try: |
|
125 |
if defined( testing ): |
|
126 |
echo "Closed connection for " & get_peer_addr( client.get_fd, get_sock_domain(client.get_fd) )[0] |
|
127 |
client.close |
|
128 |
except OSError: |
|
129 |
return |
|
130 |
||
131 |
var samples = parse_data( raw_data ) |
|
4
f3d83bdd7877
Wrap commits in a transaction.
Mahlon E. Smith <mahlon@laika.com>
parents:
1
diff
changeset
|
132 |
if samples.len == 0: return |
f3d83bdd7877
Wrap commits in a transaction.
Mahlon E. Smith <mahlon@laika.com>
parents:
1
diff
changeset
|
133 |
|
f3d83bdd7877
Wrap commits in a transaction.
Mahlon E. Smith <mahlon@laika.com>
parents:
1
diff
changeset
|
134 |
db.exec sql( "BEGIN" ) |
0 | 135 |
for timestamp, sample in samples: |
136 |
var host = sample[ "hostname" ].get_str |
|
137 |
sample.delete( "hostname" ) |
|
138 |
db.exec sql( INSERT_SQL ), timestamp, host, sample |
|
4
f3d83bdd7877
Wrap commits in a transaction.
Mahlon E. Smith <mahlon@laika.com>
parents:
1
diff
changeset
|
139 |
db.exec sql( "COMMIT" ) |
0 | 140 |
|
141 |
proc serverloop: void = |
|
142 |
## Open a database connection, bind to the listening socket, |
|
143 |
## and start serving incoming netdata streams. |
|
144 |
let db = open( "", "", "", conf.dbopts ) |
|
145 |
echo "Successfully connected to the backend database." |
|
146 |
||
147 |
var server = newSocket() |
|
148 |
echo "Listening for incoming connections on port ", conf.listen_port, "..." |
|
149 |
server.set_sock_opt( OptReuseAddr, true ) |
|
150 |
server.bind_addr( Port(conf.listen_port) ) |
|
151 |
server.listen() |
|
152 |
||
153 |
while true: |
|
154 |
var |
|
155 |
client = newSocket() |
|
156 |
address = "" |
|
157 |
||
158 |
server.acceptAddr( client, address ) |
|
159 |
echo "New connection: " & address |
|
160 |
spawn client.process( db ) |
|
161 |
||
162 |
||
163 |
proc parse_cmdline: void = |
|
164 |
## Populate the config object with the user's preferences. |
|
165 |
for kind, key, val in getopt(): |
|
166 |
case kind |
|
167 |
||
168 |
of cmdArgument: |
|
169 |
discard |
|
170 |
||
171 |
of cmdLongOption, cmdShortOption: |
|
172 |
case key |
|
173 |
of "help", "h": |
|
174 |
echo USAGE |
|
175 |
quit( 0 ) |
|
176 |
||
177 |
of "version", "v": |
|
178 |
echo "netdata_tsrelay ", VERSION |
|
179 |
quit( 0 ) |
|
180 |
||
181 |
of "dbopts": conf.dbopts = val |
|
182 |
of "listen-port", "p": conf.listen_port = val.parse_int |
|
183 |
||
184 |
else: discard |
|
185 |
||
186 |
of cmdEnd: assert( false ) # shouldn't reach here ever |
|
187 |
||
188 |
||
189 |
when isMainModule: |
|
190 |
parse_cmdline() |
|
191 |
if defined( testing ): echo conf |
|
192 |
serverloop() |
|
193 |