server端
package com.quantfn.portfolio.insterceptor
import org.springframework.context.ApplicationContext
import org.springframework.data.redis.core.RedisTemplate
import io.grpc.ForwardingServerCall
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import io.grpc.Status
import io.netty.util.internal.StringUtil
public class UserAuthServerInsterceptor implements ServerInterceptor{
RedisTemplate<String, String> redisTemplate
@SuppressWarnings("unchecked")
public UserAuthServerInsterceptor(ApplicationContext context) {
redisTemplate = context.getBean("redisTemplate", RedisTemplate.class)
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
//获取客户端参数
Metadata.Key<String> token = Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER)
Metadata.Key<String> user_Id = Metadata.Key.of("userId", Metadata.ASCII_STRING_MARSHALLER)
String tokenStr = headers.get(token)
if (StringUtil.isNullOrEmpty(tokenStr)){
System.err.println("未收到客户端token,关闭此连接")
call.close(Status.DATA_LOSS,headers)
}
//获得token去中查询redis 查询
String userId = redisTemplate.opsForValue().get(tokenStr)
if(StringUtil.isNullOrEmpty(userId)){
System.err.println("客户端token错误,关闭此连接")
call.close(Status.DATA_LOSS,headers)
}
//服务端写回参数
ServerCall<ReqT, RespT> serverCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
headers.put(user_Id,userId)
super.sendHeaders(headers)
}
}
return next.startCall(serverCall,headers)
}
}
client端
package com.quantfn.portfolio.insterceptor;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.util.logging.Logger;
/**
* A interceptor to handle client header.
*/
public class UserAuthClientInterceptor implements ClientInterceptor {
private static final Logger logger = Logger.getLogger(UserAuthClientInterceptor.class.getName());
Metadata.Key<String> token = Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(token, "A2D05E5ED2414B1F8C6AEB19F40EF77C");
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
/**
* if you don't need receive header from server, you can
* use {@link io.grpc.stub.MetadataUtils#attachHeaders}
* directly to send header
*/
logger.info("header received from server:" + headers);
super.onHeaders(headers);
}
}, headers);
}
};
}
}
最后需要分别 addService 添加 server端 到 server的启动端
添加 client端到 client端
这里只把 add部分代码粘上
server端
.addService(ServerInterceptors.intercept(IUserAuthServiceGrpc.bindService(new UserAuthServiceImpl(context)), new UserAuthServerInsterceptor(context)))
client端
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build()
Channel channel1 = ClientInterceptors.intercept(channel, new UserAuthClientInterceptor())
blockingStub = IUserAuthServiceGrpc.newBlockingStub(channel1)