Spring RSocket API - Stack Overflow

admin2025-04-19  5

I have two microservice cluster environments across the public network. The clusters communicate with the http protocol. Currently, a small number of businesses need to communicate across the microservice environment. I hope to develop a service that allows business services to perceive the difference in service calls as little as possible.

The two clusters are similar to a data collection service submitting data to a cloud service. I think I can use RSocket to connect the network and proxy http requests. The process is as follows: Start registration: ProxyA registers to ProxyB Service communication: A --> ProxyA (Http -> RSocket) --> ProxyB (RSocket -> Http) --> B During implementation, no suitable API was found to process Http data.

ProxyA like this: .java

 @PostMapping("/proxy")
    public Mono<DataBuffer> proxy(ServerHttpRequest request, ServerHttpResponse response) {
        HttpHeaders headers = request.getHeaders();
        Flux<DataBuffer> bodyFlux = request.getBody();

        Map<String, String> rsocketProxyHttpHeaders = new HashMap<>();
        rsocketProxyHttpHeaders.put("hello", "world");
        ProxyHttpHeader proxyHttpHeader = new ProxyHttpHeader();
        proxyHttpHeader.setServiceName(headers.getFirst("x-proxy-target-service"));
        proxyHttpHeader.setMethod(headers.getFirst("x-proxy-target-method"));
        proxyHttpHeader.setUrl(headers.getFirst("x-proxy-target-url"));
        proxyHttpHeader.setQueryLine(headers.getFirst("x-proxy-target-queryLine"));
        proxyHttpHeader.setDiscoveryType(headers.getFirst("x-proxy-target-discoveryType"));
        proxyHttpHeader.setBizHeaders(rsocketProxyHttpHeaders);

        // which api can send http body and rsocketProxyHttpHeaders ?
        // which api can parse rsocket response to http response ?
        Mono<DataBuffer> emptMono = Mono.just(this.strategies.dataBufferFactory().wrap(new byte[0]));
        return DataBufferUtils.join(bodyFlux).switchIfEmpty(Mono.defer(() -> {
            return emptMono;
        })).flatMap(dataBuffer -> {
            return rSocketRequester.route("callServer").metadata(metadataSpec -> {
                        metadataSpec.metadata(proxyHttpHeader, MimeTypeUtils.APPLICATION_JSON);
// Note this line
                    }).data(dataBuffer).retrieveMetadataMono(DataBuffer.class, PROXY_HTTP_HEADERS, ProxyHttpHeader.class)
                    .flatMap((dataAndMetadata) -> {
                        DataBuffer data = dataAndMetadata.getT1();
                        ProxyHttpHeader proxyHeader = dataAndMetadata.getT2();
                        Map<String, String> bizHeaders = proxyHeader.getBizHeaders();
                        // TODO filter headers
                        for(Map.Entry<String, String> bizHeader : bizHeaders.entrySet()) {
                            response.getHeaders().add(bizHeader.getKey(), bizHeader.getValue());
                        }
                        response.getHeaders().setContentType(MediaType.parseMediaType(bizHeaders.get("Content-Type")));
                        return Mono.just(data);
                    });
        }).timeout(Duration.ofSeconds(3));
    }

RSocketRequester.java add method

        <T> Mono<Tuple2<T, Map<String, Object>>> retrieveMetadataMono(ParameterizedTypeReference<T> dataTypeRef);

        <T, V> Mono<Tuple2<T[Readme.md](Readme.md), V>> retrieveMetadataMono(Class<T> dataType, String metadataKey, Class<V> metadataType);

        <T, V> Mono<Tuple2<T, V>> retrieveMetadataMono(ParameterizedTypeReference<T> dataTypeRef, String metadataKey, ParameterizedTypeReference<V> metadataTypeRef);

ProxyB .java

@MessageMapping("callServer")
    public Mono<DataBuffer> callServer(@Header(AppConstant.PROXY_HTTP_HEADERS) ProxyHttpHeader proxyHttpHeader,
                                   @Header(RSocketPayloadReturnValueHandler.RESPONSE_HEADER) AtomicReference<ProxyHttpHeader> responseRef,
                                   @Header(RSocketPayloadReturnValueHandler.RESPONSE_HEADER_HEADER) AtomicReference<List<Tuple2<MimeType, Object>>> responseHeadersRef ,
                                   @Headers Map<String, Object> headers1,
                                   @Payload(required = false) Mono<DataBuffer> body,
                                       RSocketRequester rSocketRequester) {
        log.info("server response");
        Map<String, String> proxyHeaders = proxyHttpHeader.getBizHeaders();

        String url = proxyHttpHeader.getUrl();
        if(null != proxyHttpHeader.getQueryLine()) {
            url = url + proxyHttpHeader.getQueryLine();
        }
        // TODO on error
        if("BY_NAME".equals(proxyHttpHeader.getDiscoveryType())) {
            return WebClient.builder().filter(lbFunction).baseUrl(url).build()
                    .method(HttpMethod.valueOf(proxyHttpHeader.getMethod())).contentType(MediaType.APPLICATION_JSON)
                    .headers((headers) -> headers.setAll(proxyHeaders))
                    .body(body, DataBuffer.class)
                    .exchangeToMono(resp -> {
                        ProxyHttpHeader proxyHttpHeaderResp = new ProxyHttpHeader();
                        proxyHttpHeaderResp.setMethod("POST");
                        Map<String, String> bizReturnMap = new HashMap<>(resp.headers().asHttpHeaders().toSingleValueMap());
                        bizReturnMap.put("helloserver", "world");
                        proxyHttpHeaderResp.setBizHeaders(bizReturnMap);
                        List<Tuple2<MimeType, Object>> returnHeaders = responseHeadersRef.get();
                        returnHeaders.add(Tuples.of(MediaType.APPLICATION_JSON, proxyHttpHeaderResp));
                        return resp.bodyToMono(DataBuffer.class);
                    });
        } else {
            return WebClient.builder().baseUrl(url).build()
                    .method(HttpMethod.valueOf(proxyHttpHeader.getMethod())).contentType(MediaType.APPLICATION_JSON)
                    .headers((headers) -> headers.setAll(proxyHeaders))
                    .body(body, DataBuffer.class)
                    .exchangeToMono(resp -> {
                        ProxyHttpHeader proxyHttpHeaderResp = new ProxyHttpHeader();
                        proxyHttpHeaderResp.setMethod("POST");
                        Map<String, String> bizReturnMap = new HashMap<>(resp.headers().asHttpHeaders().toSingleValueMap());
                        bizReturnMap.put("helloserver", "world");
                        proxyHttpHeaderResp.setBizHeaders(bizReturnMap);
                        List<Tuple2<MimeType, Object>> returnHeaders = responseHeadersRef.get();
                        returnHeaders.add(Tuples.of(MediaType.APPLICATION_JSON, proxyHttpHeaderResp));
                        return resp.bodyToMono(DataBuffer.class);
                    });
        }

    }

MessagingRSocket.java add http header to rsocket metadata

        AtomicReference<List<Tuple2<MimeType, Object>>> responseHeadersRef = new AtomicReference<>();
        responseHeadersRef.set(new ArrayList<>());
        headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER_HEADER, responseHeadersRef);
  • RSocketPayloadReturnValueHandler.java heandler header to response
        
        AtomicReference<Flux<Payload>> responseRef = getResponseReference(message);
        Assert.notNull(responseRef, "Missing '" + RESPONSE_HEADER + "'");

        MessageHeaders messageHeaders = message.getHeaders();
        Object headerValue = messageHeaders.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER);

        Assert.isInstanceOf(RSocketRequester.class, headerValue, "Expected header value of type RSocketRequester");
        RSocketRequester requester = (RSocketRequester) headerValue;

        // MimeType metadataMimeType, RSocketStrategies strategies
        AtomicReference<List<Tuple2<MimeType, Object>>> val = (AtomicReference<List<Tuple2<MimeType, Object>>>)messageHeaders.get(RESPONSE_HEADER_HEADER);
        List<Tuple2<MimeType, Object>> returnHeaders = val.get();

        MetadataEncoder encoder = new MetadataEncoder(requester.metadataMimeType(),requester.strategies());
        for(Tuple2<MimeType, Object> header : returnHeaders) {
            encoder.metadata(header.getT2(), header.getT1());
        }

        Mono<DataBuffer> headerDataBuffer =  encoder.encode();

        responseRef.set(headerDataBuffer.flatMapMany(metadata -> {
            return encodedContent.map(data -> PayloadUtils.createPayload(data, metadata));
        }));
  • MetadataEncoder.java Declare it as public and use it in RSocketPayloadReturnValueHandler.

Is there any other better API to use?

Is there any other better API to use?

转载请注明原文地址:http://anycun.com/QandA/1745021973a90347.html