提交 cc627e53 authored 作者: Shane Bryldt's avatar Shane Bryldt

FS-10167: Shifted the routing layer to occur slightly lower in the processing…

FS-10167: Shifted the routing layer to occur slightly lower in the processing stack, which allows routing of messages without creating local pending requests for callback and TTL tracking
上级 7350382a
...@@ -643,6 +643,7 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, bla ...@@ -643,6 +643,7 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, bla
ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
{ {
blade_handle_t *bh = NULL;
blade_jsonrpc_request_t *bjsonrpcreq = NULL; blade_jsonrpc_request_t *bjsonrpcreq = NULL;
blade_jsonrpc_response_t *bjsonrpcres = NULL; blade_jsonrpc_response_t *bjsonrpcres = NULL;
const char *jsonrpc = NULL; const char *jsonrpc = NULL;
...@@ -655,6 +656,8 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ...@@ -655,6 +656,8 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
ks_log(KS_LOG_DEBUG, "Session (%s) processing\n", bs->id); ks_log(KS_LOG_DEBUG, "Session (%s) processing\n", bs->id);
bh = blade_session_handle_get(bs);
ks_assert(bh);
jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc");
if (!jsonrpc || strcmp(jsonrpc, "2.0")) { if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
...@@ -679,9 +682,48 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ...@@ -679,9 +682,48 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
// 2) Receiving a request (server: method callee or provider) // 2) Receiving a request (server: method callee or provider)
blade_jsonrpc_t *bjsonrpc = NULL; blade_jsonrpc_t *bjsonrpc = NULL;
blade_jsonrpc_request_callback_t callback = NULL; blade_jsonrpc_request_callback_t callback = NULL;
cJSON *params = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) receiving request (%s) for %s\n", bs->id, id, method); ks_log(KS_LOG_DEBUG, "Session (%s) receiving request (%s) for %s\n", bs->id, id, method);
params = cJSON_GetObjectItem(json, "params");
if (params) {
const char *params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid");
const char *params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid");
if (params_requester_nodeid && params_responder_nodeid && !blade_handle_local_nodeid_compare(bh, params_responder_nodeid)) {
// not meant for local processing, continue with standard unicast routing for requests
blade_session_t *bs_router = blade_handle_route_lookup(bh, params_responder_nodeid);
if (!bs_router) {
bs_router = blade_handle_sessions_upstream(bh);
if (!bs_router) {
cJSON *res = NULL;
cJSON *res_error = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) but upstream session unavailable\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid);
blade_jsonrpc_error_raw_create(&res, &res_error, id, -32603, "Upstream session unavailable");
// needed in case this error must propagate further than the session which sent it
cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid);
cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
blade_session_send(bs, res, NULL);
return KS_STATUS_DISCONNECTED;
}
}
if (bs_router == bs) {
// @todo avoid circular by sending back an error instead, really should not happen but check for posterity in case a node is misbehaving for some reason
}
ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) routing (%s)\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid, blade_session_id_get(bs_router));
blade_session_send(bs_router, json, NULL);
blade_session_read_unlock(bs_router);
return KS_STATUS_SUCCESS;
}
}
// reach here if the request was not captured for routing, this SHOULD always mean the message is to be processed by local handlers
bjsonrpc = blade_handle_jsonrpc_lookup(bs->handle, method); bjsonrpc = blade_handle_jsonrpc_lookup(bs->handle, method);
if (!bjsonrpc) { if (!bjsonrpc) {
...@@ -702,9 +744,42 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ...@@ -702,9 +744,42 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
// @note This is scenario 4 // @note This is scenario 4
// 4) Receiving a response or error (client: method caller or consumer) // 4) Receiving a response or error (client: method caller or consumer)
blade_jsonrpc_response_callback_t callback = NULL; blade_jsonrpc_response_callback_t callback = NULL;
cJSON *error = NULL;
cJSON *result = NULL;
cJSON *object = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) receiving response (%s)\n", bs->id, id); ks_log(KS_LOG_DEBUG, "Session (%s) receiving response (%s)\n", bs->id, id);
error = cJSON_GetObjectItem(json, "error");
result = cJSON_GetObjectItem(json, "result");
object = error ? error : result;
if (object) {
const char *object_requester_nodeid = cJSON_GetObjectCstr(object, "requester-nodeid");
const char *object_responder_nodeid = cJSON_GetObjectCstr(object, "responder-nodeid");
if (object_requester_nodeid && object_responder_nodeid && !blade_handle_local_nodeid_compare(bh, object_requester_nodeid)) {
// not meant for local processing, continue with standard unicast routing for responses
blade_session_t *bs_router = blade_handle_route_lookup(bh, object_requester_nodeid);
if (!bs_router) {
bs_router = blade_handle_sessions_upstream(bh);
if (!bs_router) {
ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) but upstream session unavailable\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid);
return KS_STATUS_DISCONNECTED;
}
}
if (bs_router == bs) {
// @todo avoid circular, really should not happen but check for posterity in case a node is misbehaving for some reason
}
ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) routing (%s)\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid, blade_session_id_get(bs_router));
blade_session_send(bs_router, json, NULL);
blade_session_read_unlock(bs_router);
return KS_STATUS_SUCCESS;
}
}
bjsonrpcreq = blade_handle_requests_lookup(bs->handle, id); bjsonrpcreq = blade_handle_requests_lookup(bs->handle, id);
if (!bjsonrpcreq) { if (!bjsonrpcreq) {
// @todo hangup session entirely? // @todo hangup session entirely?
......
...@@ -564,9 +564,9 @@ KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char ...@@ -564,9 +564,9 @@ KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char
ks_hash_remove(bh->routes, (void *)nodeid); ks_hash_remove(bh->routes, (void *)nodeid);
// @todo when a route is removed, upstream needs to be notified, for whatever reason the node is no longer available through // @todo when a route is removed, upstream needs to be notified, for whatever reason the node is no longer available through
// this node so the routes leading here need to be cleared by passing a "blade.route" upstream to remove the routes, this // this node so the routes leading here need to be cleared by passing a "blade.register" upstream to remove the routes, this
// should actually happen only for local sessions, and blade.route should be always passed upstream AND processed locally, so // should actually happen only for local sessions, and blade.register should be always passed upstream AND processed locally, so
// we don't want to duplicate blade.route calls already being passed up if this route is not a local session // we don't want to duplicate blade.register calls already being passed up if this route is not a local session
// @note everything below here is for master-only cleanup when a node is no longer routable // @note everything below here is for master-only cleanup when a node is no longer routable
...@@ -1172,38 +1172,6 @@ ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, ...@@ -1172,38 +1172,6 @@ ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq,
goto done; goto done;
} }
// errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates
// errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing
if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) {
// not meant for local processing, continue with routing which on a publish request, it always goes upstream to the master node
blade_session_t *bsu = blade_handle_sessions_upstream(bh);
if (!bsu) {
cJSON *res_error = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable");
// needed in case this error must propagate further than the session which sent it
cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
blade_session_send(bs, res, NULL);
goto done;
}
// @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified
// by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this
// and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being
// routed would not enter into these handlers and would not leave a footprint passing through routers
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu));
blade_session_send(bsu, req, blade_protocol_publish_response_handler);
blade_session_read_unlock(bsu);
goto done;
}
// this local node must be responder-nodeid for the request, so process the request
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid); ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm); bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm);
...@@ -1265,8 +1233,8 @@ ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres ...@@ -1265,8 +1233,8 @@ ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres
cJSON *res_error = NULL; cJSON *res_error = NULL;
cJSON *res_result = NULL; cJSON *res_result = NULL;
cJSON *res_object = NULL; cJSON *res_object = NULL;
const char *requester_nodeid = NULL; //const char *requester_nodeid = NULL;
const char *responder_nodeid = NULL; //const char *responder_nodeid = NULL;
ks_assert(bres); ks_assert(bres);
...@@ -1288,24 +1256,9 @@ ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres ...@@ -1288,24 +1256,9 @@ ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres
} }
res_object = res_error ? res_error : res_result; res_object = res_error ? res_error : res_result;
requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid"); //requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid"); //responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) {
blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid);
if (!bsd) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid);
goto done;
}
ks_log(KS_LOG_DEBUG, "Session (%s) publish response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd));
blade_session_send(bsd, res, NULL);
blade_session_read_unlock(bsd);
goto done;
}
// this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response
ks_log(KS_LOG_DEBUG, "Session (%s) publish response processing\n", blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Session (%s) publish response processing\n", blade_session_id_get(bs));
if (res_error) { if (res_error) {
...@@ -1446,38 +1399,6 @@ ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, v ...@@ -1446,38 +1399,6 @@ ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, v
goto done; goto done;
} }
// errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates
// errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing
if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) {
// not meant for local processing, continue with routing which on a locate request, it always goes upstream to the master node
blade_session_t *bsu = blade_handle_sessions_upstream(bh);
if (!bsu) {
cJSON *res_error = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable");
// needed in case this error must propagate further than the session which sent it
cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
blade_session_send(bs, res, NULL);
goto done;
}
// @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified
// by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this
// and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being
// routed would not enter into these handlers and would not leave a footprint passing through routers
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu));
blade_session_send(bsu, req, blade_protocol_locate_response_handler);
blade_session_read_unlock(bsu);
goto done;
}
// this local node must be responder-nodeid for the request, so process the request
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid); ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
res_result_providers = cJSON_CreateObject(); res_result_providers = cJSON_CreateObject();
...@@ -1560,21 +1481,6 @@ ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres) ...@@ -1560,21 +1481,6 @@ ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres)
requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid"); requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid"); responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) {
blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid);
if (!bsd) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid);
goto done;
}
ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd));
blade_session_send(bsd, res, NULL);
blade_session_read_unlock(bsd);
goto done;
}
// this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response
ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs)); ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs));
if (res_error) { if (res_error) {
......
...@@ -843,7 +843,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_ ...@@ -843,7 +843,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
blade_session_route_add(bs, nodeid); blade_session_route_add(bs, nodeid);
// This is the main routing entry to make an identity routable through a session when a message is received for a given identity in this table, these allow efficiently determine which session // This is the main routing entry to make an identity routable through a session when a message is received for a given identity in this table, these allow efficiently determine which session
// a message should pass through when it does not match the local node identities from blade_handle_identity_register(), and must be matched with a call to blade_session_route_add() for cleanup, // a message should pass through when it does not match the local node identities from blade_handle_identity_register(), and must be matched with a call to blade_session_route_add() for cleanup,
// additionally when a "blade.route" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.register" would also // additionally when a "blade.register" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.register" would also
// result in the new identities being added as routes however new entire wildcard subrealm registration would require a special process for matching any identities from those subrealms // result in the new identities being added as routes however new entire wildcard subrealm registration would require a special process for matching any identities from those subrealms
blade_handle_route_add(bh, nodeid, nodeid); blade_handle_route_add(bh, nodeid, nodeid);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论