DefaultAsync<RemoteInvokeMessage>(sql, new { URL = UpUrl });
return result;
}
}
}
3、重写返回结果
由于rpc调用后是返回的Json封装的信息,需要解析成对应的HttpContent。
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
namespace Czar.Gateway.Rpc
{
public class RpcHttpContent : HttpContent
{
private string result;
public RpcHttpContent(string result)
{
this.result = result;
}
public RpcHttpContent(object result)
{
this.result = Newtonsoft.Json.JsonConvert.SerializeObject(result);
}
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(result);
await writer.FlushAsync();
}
protected override bool TryComputeLength(out long length)
{
length = result.Length;
return true;
}
}
}
4、rpc中间件逻辑处理
有了前面的准备信息,现在基本可以完成逻辑代码的开发了,详细的中间件代码如下。
using Czar.Gateway.Errors;
using Czar.Rpc.Clients;
using Ocelot.Logging;
using Ocelot.Middleware;
using Ocelot.Responses;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
namespace Czar.Gateway.Rpc.Middleware
{
public class CzarRpcMiddleware : OcelotMiddleware
{
private readonly OcelotRequestDelegate _next;
private readonly IRpcClientFactory _clientFactory;
private readonly ICzarRpcProcessor _czarRpcProcessor;
public CzarRpcMiddleware(OcelotRequestDelegate next, IRpcClientFactory clientFactory,
IOcelotLoggerFactory loggerFactory, ICzarRpcProcessor czarRpcProcessor) : base(loggerFactory.CreateLogger<CzarRpcMiddleware>())
{
_next = next;
_clientFactory = clientFactory;
_czarRpcProcessor = czarRpcProcessor;
}
public async Task Invoke(DownstreamContext context)
{
var httpStatusCode = HttpStatusCode.OK;
var _param = new List<object>();
//1、提取路由参数
var tmpInfo = context.TemplatePlaceholderNameAndValues;
if (tmpInfo != null && tmpInfo.Count > 0)
{
foreach (var tmp in tmpInfo)
{
_param.Add(tmp.Value);
}
}
//2、提取query参数
foreach (var _q in context.HttpContext.Request.Query)
{
_param.Add(_q.Value.ToString());
}
//3、从body里提取内容
if (context.HttpContext.Request.Method.ToUpper() != "GET")
{
context.DownstreamRequest.Scheme = "http";
var requert = context.DownstreamRequest.ToHttpRequestMessage();
if (requert.Content!=null)
{
var json = "{}";
json = await requert.Content.ReadAsStringAsync();
_param.Add(json);
}
}
//从缓存里提取
var req = await _czarRpcProcessor.GetRemoteMethodAsync(context.DownstreamReRoute.UpstreamPathTemplate.OriginalValue);
if (req != null)
{
req.Parameters = _param.ToArray();
var result = await _clientFactory.SendAsync(req, GetEndPoint(context.DownstreamRequest.Host, context.DownstreamRequest.Port));
OkResponse<RpcHttpContent> httpResponse;
if (result.CzarCode == Czar.Rpc.Utilitys.RpcStatusCode.Success)
{
httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result.CzarResult?.ToString()));
}
else
{
httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result));
}
context.HttpContext.Response.ContentType = "application/json";
context.DownstreamResponse = new DownstreamResponse(httpResponse.Data, httpStatusCode, httpResponse.Data.Headers, &