This commit is contained in:
Seth Call 2014-05-19 08:46:03 -05:00
parent 05666f7927
commit e4da30f39e
33 changed files with 567 additions and 151 deletions

View File

@ -0,0 +1,32 @@
ActiveAdmin.register JamRuby::LatencyTester, :as => 'LatencyTester' do
config.filters = true
config.per_page = 50
config.clear_action_items!
config.sort_order = "client_id"
menu :parent => 'Operations'
controller do
def scoped_collection
@latency_testers ||= end_of_association_chain
.order('client_id')
end
end
index :as => :block do |latency_tester|
div :for => latency_tester do
h3 "#{latency_tester.client_id}"
columns do
column do
panel 'Details' do
attributes_table_for(latency_tester) do
row :connection do |latency_tester| latency_tester.connection ? "last updated at: #{latency_tester.connection.updated_at}" : "no connection" end
end
end
end
end
end
end
end

View File

@ -28,6 +28,17 @@ FactoryGirl.define do
end
end
end
factory :connection, :class => JamRuby::Connection do
sequence(:client_id) { |n| "Client#{n}" }
ip_address "1.1.1.1"
as_musician true
addr 0
locidispid 0
client_type 'client'
association :user, factory: :user
end
factory :artifact_update, :class => JamRuby::ArtifactUpdate do
sequence(:version) { |n| "0.1.#{n}" }
uri { "http://somewhere/jkclient.msi" }
@ -173,4 +184,18 @@ FactoryGirl.define do
end
end
factory :latency_tester, :class => JamRuby::LatencyTester do
ignore do
connection nil
make_connection true
end
sequence(:client_id) { |n| "LatencyTesterClientId-#{n}" }
after(:create) do |latency_tester, evaluator|
latency_tester.connection = evaluator.connection if evaluator.connection
latency_tester.connection = FactoryGirl.create(:connection, client_type: Connection::TYPE_LATENCY_TESTER, client_id: latency_tester.client_id) if evaluator.make_connection
latency_tester.save
end
end
end

View File

@ -0,0 +1,36 @@
require 'spec_helper'
describe 'Feeds' do
subject { page }
before(:each) do
end
describe "latency_tester with connection" do
let!(:latency_tester) {FactoryGirl.create(:latency_tester)}
before(:each) do
visit admin_latency_testers_path
end
it "shows connection info" do
should have_selector('td', text: "last updated at: #{latency_tester.connection.updated_at}")
end
end
describe "latency_tester with no connection" do
let!(:latency_tester) {FactoryGirl.create(:latency_tester, client_id: 'abc', make_connection: false)}
before(:each) do
visit admin_latency_testers_path
end
it "shows no connection" do
should have_selector('td', text: "no connection")
end
end
end

View File

@ -22,13 +22,11 @@ module JamRuby
Jampb::ClientMessage.parse(payload)
end
# create a login message using user/pass
def login_with_user_pass(username, password, options = {})
# create a login message using client_id (used by latency_tester)
def login_with_client_id(client_id)
login = Jampb::Login.new(
:username => username,
:password => password,
:client_id => options[:client_id],
:client_type => options[:client_type]
:client_id => client_id,
:client_type => Connection::TYPE_LATENCY_TESTER
)
Jampb::ClientMessage.new(
@ -38,6 +36,22 @@ module JamRuby
)
end
# create a login message using user/pass
def login_with_user_pass(username, password, options = {})
login = Jampb::Login.new(
:username => username,
:password => password,
:client_id => options[:client_id],
:client_type => options[:client_type]
)
Jampb::ClientMessage.new(
:type => ClientMessage::Type::LOGIN,
:route_to => SERVER_TARGET,
:login => login
)
end
# create a login message using token (a cookie or similar)
def login_with_token(token, options = {})
login = Jampb::Login.new(

View File

@ -196,6 +196,7 @@ module JamRuby
def user_or_latency_tester_present
if user.nil? && client_type != TYPE_LATENCY_TESTER
puts client_type
errors.add(:connection, ValidationMessages::USER_OR_LATENCY_TESTER_PRESENT)
end
end

View File

@ -17,6 +17,9 @@ module JamRuby
# this implies a coding error
MISSING_CLIENT_STATE = 'MISSING_CLIENT_STATE'
# the underlying database connection is gone when the heartbeat comes in
MISSING_CONNECTION = 'MISSING_CONNECTION'
# websocket gateway did not recognize message. indicates out-of-date websocket-gateway
UNKNOWN_MESSAGE_TYPE = 'UNKNOWN_MESSAGE_TYPE'

View File

@ -24,6 +24,7 @@ module JamRuby
end
else
latency_tester = LatencyTester.new
latency_tester.client_id = client_id
unless latency_tester.save
return latency_tester
end

View File

@ -467,6 +467,17 @@ FactoryGirl.define do
end
factory :latency_tester, :class => JamRuby::LatencyTester do
association :connection
ignore do
connection nil
make_connection true
end
sequence(:client_id) { |n| "LatencyTesterClientId-#{n}" }
after(:create) do |latency_tester, evaluator|
latency_tester.connection = evaluator.connection if evaluator.connection
latency_tester.connection = FactoryGirl.create(:connection, client_type: Connection::TYPE_LATENCY_TESTER, client_id: latency_tester.client_id) if evaluator.make_connection
latency_tester.save
end
end
end

View File

@ -21,10 +21,10 @@ describe LatencyTester do
end
it "existing latency tester, no connection" do
latency_tester = FactoryGirl.create(:latency_tester, connection: nil)
latency_tester = FactoryGirl.create(:latency_tester, client_id: params[:client_id], make_connection: false)
latency_tester.connection.should be_nil
latency_tester.client_id = params[:client_id ]
latency_tester.client_id = params[:client_id]
latency_tester.save!
found = LatencyTester.connect(params)

View File

@ -71,7 +71,7 @@ gem 'resque_mailer'
#gem 'typescript-src', path: '../../typescript-src-ruby'
#gem 'typescript-node', path: '../../typescript-node-ruby'
#gem 'typescript-rails', path: '../../typescript-rails'
gem 'netaddr'
gem 'quiet_assets', :group => :development
gem 'bugsnag'
gem 'multi_json', '1.9.0'

View File

@ -19,6 +19,14 @@
'log':null, 'debug':null, 'info':null, 'warn':null, 'error':null, 'assert':null, 'trace':null, 'exception':null
}
var backend_methods = {
"log" : 4,
"debug" : 4,
"info" : 3,
"warn" : 2,
"error" : 1
}
var logCache = [];
if ('undefined' === typeof(context.console)) {
@ -33,15 +41,28 @@
context.console.debug = function() { console.log(arguments); }
}
console.proxy_logs_to_backend = false;
// http://tobyho.com/2012/07/27/taking-over-console-log/
function takeOverConsole(){
var console = window.console
if (!console) return
if (!console) return;
var i = null;
function intercept(method){
var original = console[method]
console[method] = function(){
logCache.push([method].concat(arguments));
var logAsString = [];
for(i in arguments) {
var arg = arguments[i];
try {
logAsString.push(JSON.stringify(arg));
}
catch(e) {
logAsString.push("unable to parse node: " + e.toString());
}
}
logCache.push([method].concat(logAsString));
if(logCache.length > 50) {
// keep the cache size 50 or lower
logCache.pop();
@ -55,9 +76,15 @@
var message = Array.prototype.slice.apply(arguments).join(' ')
original(message)
}
if(console.proxy_logs_to_backend && context.jamClient) {
var backendLevel = backend_methods[method];
if(backendLevel) {
context.jamClient.log(backendLevel, logAsString.join(', '));
}
}
}
}
var methods = ['log', 'warn', 'error']
var methods = ['log', 'warn', 'error', 'debug', 'info']
for (var i = 0; i < methods.length; i++)
intercept(methods[i])
}

View File

@ -126,6 +126,18 @@
return client_container(msg.LOGIN, route_to.SERVER, login);
};
// create a login message using only the client_id. only valid for latency_tester
factory.login_with_client_id = function(client_id, client_type) {
if(client_type != 'latency_tester') {
throw "client_type must be latency_tester in login_with_client_id";
}
var login = {
client_id : client_id,
client_type : client_type
};
return client_container(msg.LOGIN, route_to.SERVER, login);
};
// create a music session login message
factory.login_music_session = function(music_session) {
var login_music_session = { music_session : music_session };

View File

@ -19,6 +19,8 @@
// uniquely identify the websocket connection
var channelId = null;
var clientType = null;
var mode = null;
var rest = context.JK.Rest();
// heartbeat
var heartbeatInterval = null;
@ -159,8 +161,8 @@
if(lastHeartbeatSentTime) {
var drift = new Date().getTime() - lastHeartbeatSentTime.getTime() - heartbeatMS;
if(drift > 500) {
logger.error("significant drift between heartbeats: " + drift + 'ms beyond target interval')
if (drift > 500) {
logger.warn("significant drift between heartbeats: " + drift + 'ms beyond target interval')
}
}
lastHeartbeatSentTime = now;
@ -169,6 +171,10 @@
}
}
function isClientMode() {
return mode == "client";
}
function loggedIn(header, payload) {
if (!connectTimeout) {
@ -180,21 +186,21 @@
app.clientId = payload.client_id;
// tell the backend that we have logged in
context.jamClient.OnLoggedIn(payload.user_id, payload.token);
$.cookie('client_id', payload.client_id);
if(isClientMode()) {
// tell the backend that we have logged in
context.jamClient.OnLoggedIn(payload.user_id, payload.token);
$.cookie('client_id', payload.client_id);
}
heartbeatMS = payload.heartbeat_interval * 1000;
connection_expire_time = payload.connection_expire_time * 1000;
logger.debug("jamkazam.js.loggedIn(): clientId=" + app.clientId + ", heartbeat=" + payload.heartbeat_interval + "s, expire_time=" + payload.connection_expire_time + 's');
logger.info("jamkazam.js.loggedIn(): clientId=" + app.clientId + ", heartbeat=" + payload.heartbeat_interval + "s, expire_time=" + payload.connection_expire_time + 's');
heartbeatInterval = context.setInterval(_heartbeat, heartbeatMS);
heartbeatAckCheckInterval = context.setInterval(_heartbeatAckCheck, 1000);
lastHeartbeatAckTime = new Date(new Date().getTime() + heartbeatMS); // add a little forgiveness to server for initial heartbeat
connectDeferred.resolve();
activeElementEvent('afterConnect', payload);
activeElementEvent('afterConnect', payload);
}
function heartbeatAck(header, payload) {
@ -265,7 +271,7 @@
.always(function() {
if ($currentDisplay.is('.no-websocket-connection')) {
// this path is the 'not in session path'; so there is nothing else to do
$currentDisplay.hide();
$currentDisplay.removeClass('active');
// TODO: tell certain elements that we've reconnected
}
@ -299,7 +305,7 @@
$inSituContent.find('.reconnect-countdown').html(formatDelaySecs(reconnectDelaySecs()));
$messageContents.empty();
$messageContents.append($inSituContent);
$inSituBannerHolder.show();
$inSituBannerHolder.addClass('active');
content = $inSituBannerHolder;
}
@ -447,6 +453,10 @@
if(!clientType) {
clientType = context.JK.clientType();
}
if(!mode) {
mode = context.jamClient.getOperatingMode();
}
connectDeferred = new $.Deferred();
channelId = context.JK.generateUUID(); // create a new channel ID for every websocket connection
@ -472,7 +482,7 @@
};
server.close = function (in_error) {
logger.log("closing websocket");
logger.info("closing websocket");
clientClosedConnection = true;
server.socket.close();
@ -488,9 +498,20 @@
server.send(loginMessage);
};
server.latencyTesterLogin = function() {
var loginMessage = msg_factory.login_with_client_id(context.jamClient.clientID, 'latency_tester');
server.send(loginMessage);
}
server.onOpen = function () {
logger.log("server.onOpen");
server.rememberLogin();
logger.debug("server.onOpen");
if(isClientMode()) {
server.rememberLogin();
}
else {
server.latencyTesterLogin();
}
};
server.onMessage = function (e) {
@ -500,7 +521,7 @@
callbacks = server.dispatchTable[message.type];
if (message.type != context.JK.MessageType.HEARTBEAT_ACK && message.type != context.JK.MessageType.PEER_MESSAGE) {
logger.log("server.onMessage:" + messageType + " payload:" + JSON.stringify(payload));
logger.info("server.onMessage:" + messageType + " payload:" + JSON.stringify(payload));
}
if (callbacks !== undefined) {
@ -515,13 +536,13 @@
}
}
else {
logger.log("Unexpected message type %s.", message.type);
logger.info("Unexpected message type %s.", message.type);
}
};
// onClose is called if either client or server closes connection
server.onClose = function () {
logger.log("Socket to server closed.");
logger.info("Socket to server closed.");
if (connectDeferred.state() === "pending") {
connectDeferred.reject();
@ -535,12 +556,12 @@
var jsMessage = JSON.stringify(message);
if (message.type != context.JK.MessageType.HEARTBEAT && message.type != context.JK.MessageType.PEER_MESSAGE) {
logger.log("server.send(" + jsMessage + ")");
logger.info("server.send(" + jsMessage + ")");
}
if (server !== undefined && server.socket !== undefined && server.socket.send !== undefined) {
server.socket.send(jsMessage);
} else {
logger.log("Dropped message because server connection is closed.");
logger.warn("Dropped message because server connection is closed.");
}
};
@ -548,7 +569,7 @@
var loginMessage;
if (!server.signedIn) {
logger.log("Not signed in!");
logger.warn("Not signed in!");
// TODO: surface the error
return;
}
@ -601,7 +622,7 @@
server.connected = true;
if (context.jamClient !== undefined) {
logger.debug("... (handling LOGIN_ACK) Updating backend client, connected to true and clientID to " +
logger.info("... (handling LOGIN_ACK) Updating backend client, connected to true and clientID to " +
payload.client_id);
context.jamClient.connected = true;
context.jamClient.clientID = server.clientID;

View File

@ -685,6 +685,19 @@
}
function log(level, message) {
console.log("beep : " + message)
}
function getOperatingMode() {
if (location.pathname == '/latency_tester') {
return 'server';
}
else {
return 'client';
}
}
// passed an array of recording objects from the server
function GetLocalRecordingState(recordings) {
var result = { recordings:[]};
@ -884,6 +897,10 @@
// fake calls; not a part of the actual jam client
this.RegisterP2PMessageCallbacks = RegisterP2PMessageCallbacks;
this.SetFakeRecordingImpl = SetFakeRecordingImpl;
this.log = log;
this.getOperatingMode = getOperatingMode;
this.clientID = "devtester";
};
})(window,jQuery);

View File

@ -3,6 +3,10 @@
text-align:center;
width:100%;
position:absolute;
&.active {
display:block;
}
}
.server-connection {

View File

@ -38,4 +38,29 @@ class ClientsController < ApplicationController
redirect_to client_url
end
end
<<<<<<< HEAD
=======
private
def gon_properties
# use gon to pass variables into javascript
gon.websocket_gateway_uri = Rails.application.config.websocket_gateway_uri
gon.websocket_gateway_trusted_uri = Rails.application.config.websocket_gateway_trusted_uri
gon.check_for_client_updates = Rails.application.config.check_for_client_updates
gon.fp_apikey = Rails.application.config.filepicker_rails.api_key
gon.fp_upload_dir = Rails.application.config.filepicker_upload_dir
gon.allow_force_native_client = Rails.application.config.allow_force_native_client
gon.ftue_io_wait_time = Rails.application.config.ftue_io_wait_time
# is this the native client or browser?
@nativeClient = is_native_client?
# let javascript have access to the server's opinion if this is a native client
gon.isNativeClient = @nativeClient
gon.use_cached_session_scores = Rails.application.config.use_cached_session_scores
gon.allow_both_find_algos = Rails.application.config.allow_both_find_algos
end
>>>>>>> * wip
end

View File

@ -29,6 +29,15 @@ module ClientHelper
gon.websocket_gateway_uri = Rails.application.config.websocket_gateway_uri
end
if Rails.env == "development"
# if in development mode, we assume you are running websocket-gateway
# on the same host as you hit your server.
gon.websocket_gateway_trusted_uri = "ws://" + request.host + ":6768/websocket";
else
# but in any other mode, just use config
gon.websocket_gateway_trusted_uri = Rails.application.config.websocket_gateway_trusted_uri
end
gon.check_for_client_updates = Rails.application.config.check_for_client_updates
gon.fp_apikey = Rails.application.config.filepicker_rails.api_key
gon.fp_upload_dir = Rails.application.config.filepicker_upload_dir

View File

@ -258,6 +258,11 @@
JK.initJamClient();
// latency_tester does not want to be here
if(window.jamClient.getOperatingMode() == "server") {
window.location.href = "/latency_tester";
}
// Let's get things rolling...
if (JK.currentUserId) {

View File

@ -1,19 +1,19 @@
= render :partial => "banner"
= render :partial => "clients/banners/disconnected"
= render :partial => "jamServer"
:javascript
$(function() {
JK = JK || {};
JK.logger.proxy_logs_to_backend = true;
JK.root_url = "#{root_url}"
<% if Rails.env == "development" %>
// if in development mode, we assume you are running websocket-gateway
// on the same host as you hit your server.
JK.websocket_gateway_uri = "ws://" + location.hostname + ":6767/websocket";
<% else %>
// but in any other mode, just trust the config coming through gon
JK.websocket_gateway_uri = gon.websocket_gateway_uri
<% end %>
// if in development mode, we assume you are running websocket-gateway
// on the same host as you hit your server.
JK.websocket_gateway_uri = #{Rails.env == "development" ? '"ws://" + location.hostname + ":6768/websocket"' : 'gon.websocket_gateway_trusted_uri'};
if (console) { console.log("websocket_gateway_uri:" + JK.websocket_gateway_uri); }
@ -59,52 +59,15 @@
window.jamClient = new JK.FakeJamClient(JK.app, p2pMessageFactory);
window.jamClient.SetFakeRecordingImpl(new JK.FakeJamClientRecordings(JK.app, jamClient, p2pMessageFactory));
}
else if(false) { // set to true to time long running bridge calls
var originalJamClient = window.jamClient;
var interceptedJamClient = {};
$.each(Object.keys(originalJamClient), function(i, key) {
if(key.indexOf('(') > -1) {
// this is a method. time it
var jsKey = key.substring(0, key.indexOf('('))
console.log("replacing " + jsKey)
interceptedJamClient[jsKey] = function() {
var original = originalJamClient[key]
var start = new Date();
if(key == "FTUEGetDevices()") {
var returnVal = eval('originalJamClient.FTUEGetDevices(' + arguments[0] + ')');
}
else {
var returnVal = original.apply(originalJamClient, arguments);
}
var time = new Date().getTime() - start.getTime();
if(time >= 0) { // if 0, you'll see ALL bridge calls. If you set it to a higher value, you'll only see calls that are beyond that threshold
console.error(time + "ms jamClient." + jsKey + ' returns=', returnVal);
}
return returnVal;
}
}
else {
// we need to intercept properties... but how?
}
//JK.app.initialize();
JK.JamServer.connect() // singleton here defined in JamServer.js
.done(function() {
_initAfterConnect(true);
})
.fail(function() {
_initAfterConnect(false);
});
window.jamClient = interceptedJamClient;
}
// Let's get things rolling...
//if (JK.currentUserId) {
// JK.app.initialize();
JK.JamServer.connect() // singleton here defined in JamServer.js
.done(function() {
_initAfterConnect(true);
})
.fail(function() {
_initAfterConnect(false);
});
}
})

View File

@ -109,11 +109,12 @@ if defined?(Bundler)
config.websocket_gateway_connect_time_expire_client = 60 # 60 matches production
config.websocket_gateway_connect_time_stale_browser = 40 # 40 matches production
config.websocket_gateway_connect_time_expire_browser = 60 # 60 matches production
config.websocket_gateway_cidr = ['0.0.0.0/0']
config.websocket_gateway_internal_debug = false
config.websocket_gateway_port = 6767 + ENV['JAM_INSTANCE'].to_i
# Runs the websocket gateway within the web app
config.websocket_gateway_uri = "ws://localhost:#{config.websocket_gateway_port}/websocket"
config.websocket_gateway_trusted_uri = "ws://localhost:#{config.websocket_gateway_port + 1}/websocket"
config.external_hostname = ENV['EXTERNAL_HOSTNAME'] || 'localhost'
config.external_port = ENV['EXTERNAL_PORT'] || 3000

View File

@ -15,7 +15,8 @@ unless $rails_rake_task
:connect_time_expire_browser=> APP_CONFIG.websocket_gateway_connect_time_expire_browser,
:rabbitmq_host => APP_CONFIG.rabbitmq_host,
:rabbitmq_port => APP_CONFIG.rabbitmq_port,
:calling_thread => current)
:calling_thread => current,
:cidr => APP_CONFIG.websocket_gateway_cidr)
end
Thread.stop
end

View File

@ -468,4 +468,6 @@ FactoryGirl.define do
message Faker::Lorem.characters(10)
end
end
end

View File

@ -81,7 +81,8 @@ Thread.new do
:connect_time_expire_browser => 6,
:rabbitmq_host => 'localhost',
:rabbitmq_port => 5672,
:calling_thread => current)
:calling_thread => current,
:cidr => ['0.0.0.0/0'])
rescue Exception => e
puts "websocket-gateway failed: #{e}"
end

View File

@ -47,6 +47,7 @@ gem 'geokit'
gem 'geokit-rails', '2.0.1'
gem 'mime-types', '1.25.1'
gem 'rest-client'
gem 'netaddr'
group :development do
gem 'pry'

View File

@ -52,4 +52,5 @@ Server.new.run(:port => config["port"],
:connect_time_stale_browser => config["connect_time_stale_browser"],
:connect_time_expire_browser => config["connect_time_expire_browser"],
:rabbitmq_host => config['rabbitmq_host'],
:rabbitmq_port => config['rabbitmq_port'])
:rabbitmq_port => config['rabbitmq_port'],
:cidr => config['cidr'])

View File

@ -3,6 +3,7 @@ Defaults: &defaults
connect_time_expire_client: 60
connect_time_stale_browser: 40
connect_time_expire_browser: 60
cidr: [0.0.0.0/0]
development:
port: 6767

View File

@ -6,6 +6,7 @@ require "jam_websockets/session_error"
require "jam_websockets/permission_error"
require "jam_websockets/client_context"
require "jam_websockets/message"
require "jam_websockets/trust_check"
require "jam_websockets/router"
require "jam_websockets/server"

View File

@ -10,7 +10,7 @@ include Jampb
module EventMachine
module WebSocket
class Connection < EventMachine::Connection
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context # client_id is uuid we give to each client to track them as we like
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted # client_id is uuid we give to each client to track them as we like
# http://stackoverflow.com/questions/11150147/how-to-check-if-eventmachineconnection-is-open
attr_accessor :connected
@ -35,8 +35,14 @@ module JamWebsockets
class Router
attr_accessor :user_context_lookup, :heartbeat_interval_client, :connect_time_expire_client, :connect_time_stale_client,
:heartbeat_interval_browser, :connect_time_expire_browser, :connect_time_stale_browser
attr_accessor :user_context_lookup,
:amqp_connection_manager,
:heartbeat_interval_client,
:connect_time_expire_client,
:connect_time_stale_client,
:heartbeat_interval_browser,
:connect_time_expire_browser,
:connect_time_stale_browser
def initialize()
@log = Logging.logger[self]
@ -221,9 +227,10 @@ module JamWebsockets
end
def new_client(client)
def new_client(client, is_trusted)
# default to using json instead of pb
client.encode_json = true
client.trusted = is_trusted
client.onopen { |handshake|
# a unique ID for this TCP connection, to aid in debugging
@ -473,10 +480,10 @@ module JamWebsockets
default_expire = @connect_time_expire_client
end
heartbeat_interval = user.heartbeat_interval_client || default_heartbeat
heartbeat_interval = user.try(:heartbeat_interval_client) || default_heartbeat
heartbeat_interval = heartbeat_interval.to_i
heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config
connection_expire_time = user.connection_expire_time_client || default_expire
connection_expire_time = user.try(:connection_expire_time_client) || default_expire
connection_expire_time = connection_expire_time.to_i
connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config
connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now
@ -491,6 +498,51 @@ module JamWebsockets
[heartbeat_interval, connection_stale_time, connection_expire_time]
end
def add_tracker(user, client, client_type, client_id)
# add a tracker for this user
context = ClientContext.new(user, client, client_type)
@clients[client] = context
add_user(context)
add_client(client_id, context)
context
end
def handle_latency_tester_login(client_id, client_type, client)
# respond with LOGIN_ACK to let client know it was successful
remote_ip = extract_ip(client)
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(nil, client_type)
latency_tester = LatencyTester.connect({
client_id: client_id,
ip_address: remote_ip,
connection_stale_time: connection_stale_time,
connection_expire_time: connection_expire_time})
if latency_tester.errors.any?
@log.warn "unable to log in latency_tester with errors: #{latency_tester.errors.inspect}"
raise SessionError, "invalid login: #{latency_tester.errors.inspect}"
end
client.client_id = client_id
client.user_id = latency_tester.id if latency_tester
@semaphore.synchronize do
context = add_tracker(latency_tester, client, client_type, client_id)
@log.debug "logged in context created: #{context}"
login_ack = @message_factory.login_ack(remote_ip,
client_id,
nil,
heartbeat_interval,
nil,
false,
latency_tester.id,
connection_expire_time)
send_to_client(client, login_ack)
end
end
def handle_login(login, client)
username = login.username if login.value_for_tag(1)
password = login.password if login.value_for_tag(2)
@ -500,6 +552,12 @@ module JamWebsockets
client_type = login.client_type if login.value_for_tag(6)
@log.info("*** handle_login: token=#{token}; client_id=#{client_id}, client_type=#{client_type}")
if client_type == Connection::TYPE_LATENCY_TESTER
handle_latency_tester_login(client_id, client_type, client)
return
end
reconnected = false
# you don't have to supply client_id in login--if you don't, we'll generate one
@ -579,10 +637,7 @@ module JamWebsockets
@semaphore.synchronize do
# add a tracker for this user
context = ClientContext.new(user, client, client_type)
@clients[client] = context
add_user(context)
add_client(client_id, context)
context = add_tracker(user, client, client_type, client_id)
@log.debug "logged in context created: #{context}"
@ -618,7 +673,7 @@ module JamWebsockets
Diagnostic.missing_client_state(client.user_id, client.context)
raise SessionError, 'context state is gone. please reconnect.'
else
connection = Connection.find_by_user_id_and_client_id(context.user.id, context.client.client_id)
connection = Connection.find_by_client_id(context.client.client_id)
track_changes_counter = nil
if connection.nil?
@log.warn "*** WARNING: unable to find connection when handling heartbeat. context= #{context}; killing session"
@ -638,14 +693,15 @@ module JamWebsockets
# update user's notification_seen_at field if the heartbeat indicates it saw one
# first we try to use the notification id, which should usually exist.
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
update_notification_seen_at(connection, context, heartbeat)
update_notification_seen_at(connection, context, heartbeat) if client.context.client_type != Connection::TYPE_LATENCY_TESTER
end
ConnectionManager.active_record_transaction do |connection_manager|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
connection_manager.reconnect(connection, connection.music_session_id, nil, connection_stale_time, connection_expire_time)
end if connection.stale?
if connection.stale?
ConnectionManager.active_record_transaction do |connection_manager|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
connection_manager.reconnect(connection, connection.music_session_id, nil, connection_stale_time, connection_expire_time)
end
end
end
heartbeat_ack = @message_factory.heartbeat_ack(track_changes_counter)
@ -829,7 +885,7 @@ module JamWebsockets
end
def extract_ip(client)
return Socket.unpack_sockaddr_in(client.get_peername)[1]
Socket.unpack_sockaddr_in(client.get_peername)[1]
end
private

View File

@ -1,6 +1,5 @@
require 'em-websocket'
require 'bugsnag'
module JamWebsockets
class Server
@ -16,6 +15,7 @@ module JamWebsockets
def run(options={})
host = "0.0.0.0"
port = options[:port]
trust_port = port + 1
connect_time_stale_client = options[:connect_time_stale_client].to_i
connect_time_expire_client = options[:connect_time_expire_client].to_i
connect_time_stale_browser = options[:connect_time_stale_browser].to_i
@ -23,6 +23,7 @@ module JamWebsockets
rabbitmq_host = options[:rabbitmq_host]
rabbitmq_port = options[:rabbitmq_port].to_i
calling_thread = options[:calling_thread]
trust_check = TrustCheck.new(trust_port, options[:cidr])
@log.info "starting server #{host}:#{port} staleness_time=#{connect_time_stale_client}; reconnect time = #{connect_time_expire_client}, rabbitmq=#{rabbitmq_host}:#{rabbitmq_port}"
@ -35,7 +36,7 @@ module JamWebsockets
@router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port) do
start_connection_expiration
start_connection_flagger
start_websocket_listener(host, port, options[:emwebsocket_debug])
start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug])
calling_thread.wakeup if calling_thread
end
@ -51,10 +52,21 @@ module JamWebsockets
EventMachine::stop_event_loop
end
def start_websocket_listener(listen_ip, port, emwebsocket_debug)
def start_websocket_listener(listen_ip, port, trust_port, trust_check, emwebsocket_debug)
EventMachine::WebSocket.run(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws|
@log.info "new client #{ws}"
@router.new_client(ws)
@router.new_client(ws, false)
end
EventMachine::WebSocket.run(:host => listen_ip, :port => trust_port, :debug => emwebsocket_debug) do |ws|
@log.info "new latency_tester client #{ws}"
# verify this connection came in from a valid subnet, if specified
ip = extract_ip(ws)
if trust_check.trusted?(ip, trust_port)
@router.new_client(ws, true)
else
@log.warn("untrusted client attempted to connect to #{listen_ip}:#{trust_port} from #{ip}")
ws.close
end
end
@log.debug("started websocket")
end
@ -105,6 +117,10 @@ module JamWebsockets
end
end
private
def extract_ip(client)
Socket.unpack_sockaddr_in(client.get_peername)[1]
end
end
end

View File

@ -0,0 +1,35 @@
require 'netaddr'
module JamWebsockets
class TrustCheck
attr_accessor :match, :port
def initialize(port = 0, cidr = '0.0.0.0/0')
@match = []
@port = port
if cidr.kind_of?(Array)
cidr.each do |c|
@match << NetAddr::CIDR.create(c)
end
else
@match << NetAddr::CIDR.create(cidr)
end
end
def trusted? (remote_ip, port)
trusted = false
if @port == 0 || port == @port
@match.each do |cidr|
if cidr.matches?(remote_ip)
trusted = true
break
end
end
end
trusted
end
end
end

View File

@ -98,4 +98,19 @@ FactoryGirl.define do
proficiency_level 1
priority 0
end
factory :latency_tester, :class => JamRuby::LatencyTester do
ignore do
connection nil
make_connection true
end
sequence(:client_id) { |n| "LatencyTesterClientId-#{n}" }
after(:create) do |latency_tester, evaluator|
latency_tester.connection = evaluator.connection if evaluator.connection
latency_tester.connection = FactoryGirl.create(:connection, client_type: Connection::TYPE_LATENCY_TESTER, client_id: latency_tester.client_id) if evaluator.make_connection
latency_tester.save
end
end
end

View File

@ -46,8 +46,6 @@ def login(router, user, password, client_id)
message_factory = MessageFactory.new
client = LoginClient.new
login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15, nil, false, user.id, 30)
router.should_receive(:send_to_client) do |*args|
args.count.should == 2
args[0].should == client
@ -57,32 +55,59 @@ def login(router, user, password, client_id)
client.should_receive(:onclose)
client.should_receive(:onerror)
#client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00")
@router.new_client(client, false)
handshake = double("handshake")
handshake.should_receive(:query).twice.and_return({ "pb" => "true", "channel_id" => SecureRandom.uuid })
client.onopenblock.call handshake
# create a login message, and pass it into the router via onmsgblock.call
login = message_factory.login_with_user_pass(user.email, password, :client_id => client_id, :client_type => 'client')
# first log in
client.onmsgblock.call login.to_s
client
end
def heartbeat(router, client)
message_factory = MessageFactory.new
heartbeat = message_factory.heartbeat()
client.onmsgblock.call heartbeat.to_s
end
# does a login and returns client
def login_latency_tester(router, latency_tester, client_id)
client = LoginClient.new
message_factory = MessageFactory.new
router.should_receive(:send_to_client).at_least(1).times do |*args|
args.count.should == 2
args[0].should == client
args[1].is_a?(Jampb::ClientMessage).should be_true
end
router.should_receive(:extract_ip).at_least(:once).with(client).and_return("127.0.0.1")
client.should_receive(:onclose)
client.should_receive(:onerror)
#client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00")
@router.new_client(client)
@router.new_client(client, true)
handshake = double("handshake")
handshake.should_receive(:query).twice.and_return({ "pb" => "true", "channel_id" => SecureRandom.uuid })
client.onopenblock.call handshake
# create a login message, and pass it into the router via onmsgblock.call
# todo client_type browser or client? i just guessed... [scott]
login = message_factory.login_with_user_pass(user.email, password, :client_id => client_id, :client_type => 'client')
login = message_factory.login_with_client_id(client_id)
# first log in
client.onmsgblock.call login.to_s
# then join music session
return client
end
# currently commented out; we have deprecated logging in for jam sessions via websocket-gateway;
# use rest API instead (or direct db access with factory-girl)
def login_music_session(router, client, music_session)
#message_factory = MessageFactory.new
#login_music_session = message_factory.login_music_session(music_session.id)
#login_ack = message_factory.login_music_session_ack(false, nil);
#router.should_receive(:send_to_client).with(client, login_ack)
#client.onmsgblock.call login_music_session.to_s
client
end
@ -99,6 +124,7 @@ describe Router do
@router.connect_time_expire_browser = 60
@router.connect_time_stale_browser = 40
@router.heartbeat_interval_browser = @router.connect_time_stale_browser / 2
@router.amqp_connection_manager = AmqpConnectionManager.new(true, 4, host: 'localhost', port: 5672)
end
subject { @router }
@ -210,32 +236,42 @@ describe Router do
end
it "should allow login of valid user", :mq => true do
#em do
@user = FactoryGirl.create(:user,
:password => "foobar", :password_confirmation => "foobar")
client1 = login(@router, @user, "foobar", "1")
done
#end
@user = FactoryGirl.create(:user,
:password => "foobar", :password_confirmation => "foobar")
client1 = login(@router, @user, "foobar", "1")
done
end
it "should allow login of a latency_tester", :mq => true do
@latency_tester = FactoryGirl.create(:latency_tester)
client1 = login_latency_tester(@router, @latency_tester, @latency_tester.client_id)
done
end
it "should allow heartbeat of a latency_tester", :mq => true do
@latency_tester = FactoryGirl.create(:latency_tester)
client1 = login_latency_tester(@router, @latency_tester, @latency_tester.client_id)
heartbeat(@router, client1)
done
end
it "should allow music_session_join of valid user", :mq => true do
#em do
user1 = FactoryGirl.create(:user) # in the music session
user2 = FactoryGirl.create(:user) # in the music session
user3 = FactoryGirl.create(:user) # not in the music session
music_session = FactoryGirl.create(:active_music_session, :creator => user1)
user1 = FactoryGirl.create(:user) # in the music session
user2 = FactoryGirl.create(:user) # in the music session
user3 = FactoryGirl.create(:user) # not in the music session
music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "4")
music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "5")
music_session = FactoryGirl.create(:active_music_session, :creator => user1)
# make a music_session and define two members
music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "4")
music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "5")
# create client 1, log him in, and log him in to music session
client1 = login(@router, user1, "foobar", "1")
login_music_session(@router, client1, music_session)
done
#end
# make a music_session and define two members
# create client 1, log him in, and log him in to music session
client1 = login(@router, user1, "foobar", "1")
done
end
it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do
@ -248,10 +284,8 @@ describe Router do
# create client 1, log him in, and log him in to music session
client1 = login(@router, user1, "foobar", "1")
login_music_session(@router, client1, music_session)
client2 = login(@router, user2, "foobar", "2")
login_music_session(@router, client2, music_session)
# make a music_session and define two members

View File

@ -0,0 +1,45 @@
require 'spec_helper'
describe TrustCheck do
it "defaults to all" do
trust_check = TrustCheck.new
trust_check.trusted?('127.0.0.1', 6767).should be_true
trust_check.trusted?('127.0.0.1', 6768).should be_true
trust_check.trusted?('192.0.0.1', 6768).should be_true
trust_check.trusted?('10.0.0.1', 6768).should be_true
trust_check.trusted?('11.0.0.1', 6768).should be_true
end
it "verifies port when specified" do
trust_check = TrustCheck.new(6768)
trust_check.trusted?('127.0.0.1', 6767).should be_false
trust_check.trusted?('127.0.0.1', 6768).should be_true
trust_check.trusted?('192.0.0.1', 6768).should be_true
trust_check.trusted?('10.0.0.1', 6768).should be_true
trust_check.trusted?('11.0.0.1', 6768).should be_true
end
it "verifies cidr when specified" do
trust_check = TrustCheck.new(6768, '127.0.0.1/8')
trust_check.trusted?('127.0.0.1', 6767).should be_false
trust_check.trusted?('127.0.0.1', 6768).should be_true
trust_check.trusted?('192.0.0.1', 6768).should be_false
trust_check.trusted?('10.0.0.1', 6768).should be_false
trust_check.trusted?('11.0.0.1', 6768).should be_false
end
it "verifies mutltiple cidrs" do
trust_check = TrustCheck.new(6768, ['192.168.1.2', '192.168.1.3'])
trust_check.trusted?('192.168.1.1', 6767).should be_false
trust_check.trusted?('192.168.1.2', 6767).should be_false
trust_check.trusted?('192.168.1.1', 6768).should be_false
trust_check.trusted?('192.168.1.2', 6768).should be_true
trust_check.trusted?('192.168.1.3', 6768).should be_true
trust_check.trusted?('192.168.1.4', 6768).should be_false
end
end