-- try to make get_work faster by avoiding temporary tables -- flag set by client if this connection is able to score. ALTER TABLE connections ADD COLUMN udp_reachable BOOLEAN DEFAULT TRUE NOT NULL; -- flag set by the user indicating this connection can't score until this time has elapsed ALTER TABLE connections ADD COLUMN scoring_timeout TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL; -- counter of how many scoring failures have occurred on this connection. once it hits a certain number, we'll set scoring_timeout ALTER TABLE connections ADD COLUMN scoring_failures INTEGER NOT NULL DEFAULT 0; -- counter how many times a connection has been put in scoring timeout ALTER TABLE connections ADD COLUMN scoring_timeout_occurrences INTEGER NOT NULL DEFAULT 0; -- counter to provide a relative counter offset to avoid writes when scoring_timeout period has elapsed ALTER TABLE connections ADD COLUMN scoring_failures_offset INTEGER NOT NULL DEFAULT 0; --DROP FUNCTION IF EXISTS get_work (mylocidispid BIGINT, myaddr BIGINT); CREATE FUNCTION get_work (my_client_id VARCHAR(64), mylocidispid BIGINT, myaddr BIGINT, return_rows INT, stale_score INTERVAL) RETURNS TABLE (client_id VARCHAR(64)) VOLATILE AS $$ BEGIN RETURN QUERY WITH scorable_locations AS ( SELECT DISTINCT locidispid FROM connections WHERE client_type = 'client' AND connections.client_id != my_client_id AND addr != myaddr AND udp_reachable AND NOW() > scoring_timeout AND connections.music_session_id IS NULL AND locidispid NOT IN (SELECT DISTINCT blocidispid FROM most_recent_scores WHERE alocidispid = mylocidispid AND (current_timestamp - score_dt) < stale_score) AND locidispid/1000000 IN (SELECT locid FROM geoiplocations WHERE geog && st_buffer((SELECT geog FROM geoiplocations WHERE locid = mylocidispid/1000000), 4023360)) ) SELECT tmp.client_id FROM (SELECT connections.client_id, random() AS r, row_number() OVER (PARTITION BY connections.locidispid) AS rownum FROM connections, scorable_locations WHERE connections.locidispid = scorable_locations.locidispid AND client_type = 'client' AND connections.client_id != my_client_id AND addr != myaddr AND udp_reachable AND NOW() > scoring_timeout AND connections.music_session_id IS NULL ) tmp WHERE rownum <= 1 ORDER BY r LIMIT return_rows; RETURN; END; $$ LANGUAGE plpgsql; -- DROP FUNCTION get_work (my_client_id VARCHAR(64), mylocidispid BIGINT, myaddr BIGINT, return_rows INT, stale_score INTERVAL) -- when a conneciton is involved with a failed score, increment their scoring_failures column, and possible put them in the 'doghouse' -- DROP FUNCTION connection_failed_score(a_client_id VARCHAR(64), b_client_id VARCHAR(64), timeout_time INTERVAL, failed_score_threshold INT); CREATE FUNCTION connection_failed_score(a_client_id VARCHAR(64), b_client_id VARCHAR(64), timeout_time INTERVAL, failed_score_threshold INT) RETURNS BOOLEAN VOLATILE AS $$ DECLARE ret BOOLEAN; BEGIN UPDATE connections SET scoring_timeout_occurrences = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN scoring_timeout_occurrences + 1 ELSE scoring_timeout_occurrences END, scoring_timeout = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN CURRENT_TIMESTAMP + timeout_time ELSE scoring_timeout END, scoring_failures_offset = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN scoring_failures + 1 WHEN scoring_timeout < CURRENT_TIMESTAMP THEN scoring_failures_offset ELSE scoring_failures_offset + 1 END, scoring_failures = scoring_failures + 1 WHERE connections.client_id = b_client_id; UPDATE connections SET scoring_timeout_occurrences = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN scoring_timeout_occurrences + 1 ELSE scoring_timeout_occurrences END, scoring_timeout = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN CURRENT_TIMESTAMP + timeout_time ELSE scoring_timeout END, scoring_failures_offset = CASE WHEN scoring_failures = scoring_failures_offset + failed_score_threshold - 1 THEN scoring_failures + 1 WHEN scoring_timeout < CURRENT_TIMESTAMP THEN scoring_failures_offset ELSE scoring_failures_offset + 1 END, scoring_failures = scoring_failures + 1 WHERE connections.client_id = a_client_id RETURNING scoring_timeout > NOW() AS in_timeout INTO ret; RETURN ret; END; $$ LANGUAGE plpgsql; -- when a connection is involved with a failed score, increment their scoring_failures column, and possible put them in the 'doghouse' -- DROP FUNCTION connection_good_score(a_client_id VARCHAR(64), b_client_id VARCHAR(64)); CREATE FUNCTION connection_good_score(a_client_id VARCHAR(64), b_client_id VARCHAR(64)) RETURNS BOOLEAN VOLATILE AS $$ DECLARE ret BOOLEAN; BEGIN UPDATE connections SET scoring_failures = CASE WHEN scoring_timeout < CURRENT_TIMESTAMP THEN 0 ELSE scoring_failures END, scoring_failures_offset = CASE WHEN scoring_timeout < CURRENT_TIMESTAMP THEN 0 ELSE scoring_failures_offset END WHERE connections.client_id = b_client_id; UPDATE connections SET scoring_failures = CASE WHEN scoring_timeout < CURRENT_TIMESTAMP THEN 0 ELSE scoring_failures END, scoring_failures_offset = CASE WHEN scoring_timeout < CURRENT_TIMESTAMP THEN 0 ELSE scoring_failures_offset END WHERE connections.client_id = a_client_id RETURNING scoring_timeout > NOW() AS in_timeout INTO ret; RETURN ret; END; $$ LANGUAGE plpgsql; DROP FUNCTION IF EXISTS scorable_locations(my_client_id VARCHAR(64), mylocidispid BIGINT, myaddr BIGINT, stale_score INTERVAL); CREATE FUNCTION scorable_locations(my_client_id VARCHAR(64), mylocidispid BIGINT, myaddr BIGINT, stale_score INTERVAL) RETURNS TABLE (locidispid BIGINT, addr BIGINT, client_id VARCHAR(64)) VOLATILE AS $$ BEGIN RETURN QUERY SELECT c.locidispid, c.addr, c.client_id FROM connections c WHERE client_type = 'client' AND c.udp_reachable AND NOW() > c.scoring_timeout AND c.music_session_id IS NULL AND c.addr != myaddr AND c.client_id != my_client_id AND c.locidispid NOT IN (SELECT DISTINCT blocidispid FROM most_recent_scores WHERE alocidispid = mylocidispid AND (current_timestamp - score_dt) < stale_score) AND c.locidispid/1000000 IN (SELECT locid FROM geoiplocations WHERE geog && st_buffer((SELECT geog FROM geoiplocations WHERE locid = mylocidispid/1000000), 4023360)); RETURN; END; $$ LANGUAGE plpgsql; --DROP FUNCTION IF EXISTS get_work_summary (stale_score INTERVAL); CREATE FUNCTION get_work_summary (stale_score INTERVAL) RETURNS TABLE (work_count BIGINT, client_id VARCHAR(64), email VARCHAR, first_name VARCHAR, last_name VARCHAR, user_id VARCHAR(64), udp_reachable BOOLEAN, in_timeout BOOLEAN, in_session BOOLEAN, scoring_failures INT, scoring_failures_offset INT, scoring_timeout_occurrences INT) VOLATILE AS $$ BEGIN RETURN QUERY SELECT SUM(CASE WHEN tmp.test_client_id IS NULL OR tmp.in_session OR tmp.in_timeout OR tmp.udp_reachable = FALSE THEN 0 ELSE 1 END) AS work_count, tmp.client_id AS client_id, users.email, users.first_name, users.last_name, users.id AS user_id, tmp.udp_reachable, tmp.in_timeout, tmp.in_session, tmp.scoring_failures, tmp.scoring_failures_offset, tmp.scoring_timeout_occurrences FROM (SELECT connections.client_type, scorable_locations.client_id AS test_client_id, connections.client_id AS client_id, connections.user_id AS user_id, connections.udp_reachable, connections.scoring_timeout > NOW() as in_timeout, connections.music_session_id IS NOT NULL AS in_session, connections.scoring_failures, connections.scoring_failures_offset, connections.scoring_timeout_occurrences, scorable_locations.client_id IS NULL AS same_client, row_number() OVER (PARTITION BY connections.locidispid) AS rownum FROM connections LEFT OUTER JOIN scorable_locations(connections.client_id, connections.locidispid, connections.addr, stale_score) ON connections.locidispid != scorable_locations.locidispid) tmp INNER JOIN users ON tmp.user_id = users.id WHERE tmp.client_type = 'client' GROUP BY tmp.client_id, users.email, users.first_name, users.last_name, users.id, tmp.same_client, tmp.udp_reachable, tmp.in_timeout, tmp.in_session, tmp.scoring_failures, tmp.scoring_failures_offset, tmp.scoring_timeout_occurrences ORDER BY work_count DESC; RETURN; END; $$ LANGUAGE plpgsql; --DROP FUNCTION IF EXISTS get_work_summary_no_agg(stale_score INTERVAL); -- useful for debugging get_work_summary CREATE FUNCTION get_work_summary_no_agg (stale_score INTERVAL) RETURNS TABLE (client_id VARCHAR(64), test_client_id VARCHAR(64), email VARCHAR, first_name VARCHAR, last_name VARCHAR, user_id VARCHAR(64), udp_reachable BOOLEAN, in_timeout BOOLEAN, scoring_failures INT, scoring_failures_offset INT, scoring_timeout_occurrences INT) VOLATILE AS $$ BEGIN RETURN QUERY SELECT tmp.client_id AS client_id, tmp.test_client_id, users.email, users.first_name, users.last_name, users.id AS user_id, tmp.udp_reachable, tmp.in_timeout, tmp.scoring_failures, tmp.scoring_failures_offset, tmp.scoring_timeout_occurrences FROM (SELECT scorable_locations.client_id AS test_client_id, connections.client_id AS client_id, connections.user_id AS user_id, connections.udp_reachable, connections.scoring_timeout > NOW() as in_timeout, connections.scoring_failures, connections.scoring_failures_offset, connections.scoring_timeout_occurrences, scorable_locations.client_id IS NULL AS same_client, row_number() OVER (PARTITION BY connections.locidispid) AS rownum FROM connections LEFT OUTER JOIN scorable_locations(connections.client_id, connections.locidispid, connections.addr, stale_score) ON connections.locidispid != scorable_locations.locidispid AND connections.client_type = 'client') tmp INNER JOIN users ON tmp.user_id = users.id ORDER BY tmp.client_id; RETURN; END; $$ LANGUAGE plpgsql;