I have two microservice cluster environments across the public network. The clusters communicate with the http protocol. Currently, a small number of businesses need to communicate across the microservice environment. I hope to develop a service that allows business services to perceive the difference in service calls as little as possible.
The two clusters are similar to a data collection service submitting data to a cloud service. I think I can use RSocket to connect the network and proxy http requests. The process is as follows: Start registration: ProxyA registers to ProxyB Service communication: A --> ProxyA (Http -> RSocket) --> ProxyB (RSocket -> Http) --> B During implementation, no suitable API was found to process Http data.
ProxyA like this: .java
@PostMapping("/proxy")
public Mono<DataBuffer> proxy(ServerHttpRequest request, ServerHttpResponse response) {
HttpHeaders headers = request.getHeaders();
Flux<DataBuffer> bodyFlux = request.getBody();
Map<String, String> rsocketProxyHttpHeaders = new HashMap<>();
rsocketProxyHttpHeaders.put("hello", "world");
ProxyHttpHeader proxyHttpHeader = new ProxyHttpHeader();
proxyHttpHeader.setServiceName(headers.getFirst("x-proxy-target-service"));
proxyHttpHeader.setMethod(headers.getFirst("x-proxy-target-method"));
proxyHttpHeader.setUrl(headers.getFirst("x-proxy-target-url"));
proxyHttpHeader.setQueryLine(headers.getFirst("x-proxy-target-queryLine"));
proxyHttpHeader.setDiscoveryType(headers.getFirst("x-proxy-target-discoveryType"));
proxyHttpHeader.setBizHeaders(rsocketProxyHttpHeaders);
// which api can send http body and rsocketProxyHttpHeaders ?
// which api can parse rsocket response to http response ?
Mono<DataBuffer> emptMono = Mono.just(this.strategies.dataBufferFactory().wrap(new byte[0]));
return DataBufferUtils.join(bodyFlux).switchIfEmpty(Mono.defer(() -> {
return emptMono;
})).flatMap(dataBuffer -> {
return rSocketRequester.route("callServer").metadata(metadataSpec -> {
metadataSpec.metadata(proxyHttpHeader, MimeTypeUtils.APPLICATION_JSON);
// Note this line
}).data(dataBuffer).retrieveMetadataMono(DataBuffer.class, PROXY_HTTP_HEADERS, ProxyHttpHeader.class)
.flatMap((dataAndMetadata) -> {
DataBuffer data = dataAndMetadata.getT1();
ProxyHttpHeader proxyHeader = dataAndMetadata.getT2();
Map<String, String> bizHeaders = proxyHeader.getBizHeaders();
// TODO filter headers
for(Map.Entry<String, String> bizHeader : bizHeaders.entrySet()) {
response.getHeaders().add(bizHeader.getKey(), bizHeader.getValue());
}
response.getHeaders().setContentType(MediaType.parseMediaType(bizHeaders.get("Content-Type")));
return Mono.just(data);
});
}).timeout(Duration.ofSeconds(3));
}
RSocketRequester.java add method
<T> Mono<Tuple2<T, Map<String, Object>>> retrieveMetadataMono(ParameterizedTypeReference<T> dataTypeRef);
<T, V> Mono<Tuple2<T[Readme.md](Readme.md), V>> retrieveMetadataMono(Class<T> dataType, String metadataKey, Class<V> metadataType);
<T, V> Mono<Tuple2<T, V>> retrieveMetadataMono(ParameterizedTypeReference<T> dataTypeRef, String metadataKey, ParameterizedTypeReference<V> metadataTypeRef);
ProxyB .java
@MessageMapping("callServer")
public Mono<DataBuffer> callServer(@Header(AppConstant.PROXY_HTTP_HEADERS) ProxyHttpHeader proxyHttpHeader,
@Header(RSocketPayloadReturnValueHandler.RESPONSE_HEADER) AtomicReference<ProxyHttpHeader> responseRef,
@Header(RSocketPayloadReturnValueHandler.RESPONSE_HEADER_HEADER) AtomicReference<List<Tuple2<MimeType, Object>>> responseHeadersRef ,
@Headers Map<String, Object> headers1,
@Payload(required = false) Mono<DataBuffer> body,
RSocketRequester rSocketRequester) {
log.info("server response");
Map<String, String> proxyHeaders = proxyHttpHeader.getBizHeaders();
String url = proxyHttpHeader.getUrl();
if(null != proxyHttpHeader.getQueryLine()) {
url = url + proxyHttpHeader.getQueryLine();
}
// TODO on error
if("BY_NAME".equals(proxyHttpHeader.getDiscoveryType())) {
return WebClient.builder().filter(lbFunction).baseUrl(url).build()
.method(HttpMethod.valueOf(proxyHttpHeader.getMethod())).contentType(MediaType.APPLICATION_JSON)
.headers((headers) -> headers.setAll(proxyHeaders))
.body(body, DataBuffer.class)
.exchangeToMono(resp -> {
ProxyHttpHeader proxyHttpHeaderResp = new ProxyHttpHeader();
proxyHttpHeaderResp.setMethod("POST");
Map<String, String> bizReturnMap = new HashMap<>(resp.headers().asHttpHeaders().toSingleValueMap());
bizReturnMap.put("helloserver", "world");
proxyHttpHeaderResp.setBizHeaders(bizReturnMap);
List<Tuple2<MimeType, Object>> returnHeaders = responseHeadersRef.get();
returnHeaders.add(Tuples.of(MediaType.APPLICATION_JSON, proxyHttpHeaderResp));
return resp.bodyToMono(DataBuffer.class);
});
} else {
return WebClient.builder().baseUrl(url).build()
.method(HttpMethod.valueOf(proxyHttpHeader.getMethod())).contentType(MediaType.APPLICATION_JSON)
.headers((headers) -> headers.setAll(proxyHeaders))
.body(body, DataBuffer.class)
.exchangeToMono(resp -> {
ProxyHttpHeader proxyHttpHeaderResp = new ProxyHttpHeader();
proxyHttpHeaderResp.setMethod("POST");
Map<String, String> bizReturnMap = new HashMap<>(resp.headers().asHttpHeaders().toSingleValueMap());
bizReturnMap.put("helloserver", "world");
proxyHttpHeaderResp.setBizHeaders(bizReturnMap);
List<Tuple2<MimeType, Object>> returnHeaders = responseHeadersRef.get();
returnHeaders.add(Tuples.of(MediaType.APPLICATION_JSON, proxyHttpHeaderResp));
return resp.bodyToMono(DataBuffer.class);
});
}
}
MessagingRSocket.java add http header to rsocket metadata
AtomicReference<List<Tuple2<MimeType, Object>>> responseHeadersRef = new AtomicReference<>();
responseHeadersRef.set(new ArrayList<>());
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER_HEADER, responseHeadersRef);
AtomicReference<Flux<Payload>> responseRef = getResponseReference(message);
Assert.notNull(responseRef, "Missing '" + RESPONSE_HEADER + "'");
MessageHeaders messageHeaders = message.getHeaders();
Object headerValue = messageHeaders.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER);
Assert.isInstanceOf(RSocketRequester.class, headerValue, "Expected header value of type RSocketRequester");
RSocketRequester requester = (RSocketRequester) headerValue;
// MimeType metadataMimeType, RSocketStrategies strategies
AtomicReference<List<Tuple2<MimeType, Object>>> val = (AtomicReference<List<Tuple2<MimeType, Object>>>)messageHeaders.get(RESPONSE_HEADER_HEADER);
List<Tuple2<MimeType, Object>> returnHeaders = val.get();
MetadataEncoder encoder = new MetadataEncoder(requester.metadataMimeType(),requester.strategies());
for(Tuple2<MimeType, Object> header : returnHeaders) {
encoder.metadata(header.getT2(), header.getT1());
}
Mono<DataBuffer> headerDataBuffer = encoder.encode();
responseRef.set(headerDataBuffer.flatMapMany(metadata -> {
return encodedContent.map(data -> PayloadUtils.createPayload(data, metadata));
}));
Is there any other better API to use?
Is there any other better API to use?