How to properly ingest files from a SFTP server with Spring Integration SFTP when using multiple replica's? - Stack Over

admin2025-04-29  2

At the moment, I keep getting this error:

org.springframework.messaging.MessagingException
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:433)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:54)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.UncheckedIOException: IOException when retrieving /to_erp/orders/49230084_COMPLETED_ORDER.json
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:249)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:228)
    at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:47)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:220)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:450)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
    ... 13 more
Caused by: SFTP error (SSH_FX_NO_SUCH_FILE): The file does not exist.
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.throwStatusException(AbstractSftpClient.java:277)
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkHandleResponse(AbstractSftpClient.java:299)
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkHandle(AbstractSftpClient.java:290)
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.open(AbstractSftpClient.java:589)
    at org.apache.sshd.sftp.client.impl.SftpInputStreamAsync.<init>(SftpInputStreamAsync.java:75)
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.read(AbstractSftpClient.java:1196)
    at org.apache.sshd.sftp.client.SftpClient.read(SftpClient.java:909)
    at org.apache.sshd.sftp.client.SftpClient.read(SftpClient.java:905)
    at org.springframework.integration.sftp.session.SftpSession.readRaw(SftpSession.java:125)
    at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:296)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:239)
    ... 19 more

But this happens even after using the SftpPersistentAcceptOnceFileListFilter. Here is how I configure my subscribers.

@Profile("!test")
@Component
public class CreatedOrderSubscriber {

    private static final String REMOTE_DIRECTORY = "/to_wms/orders/";
    private static final String FILE_PREFIX = "created_order_stream_";
    private static final int MAX_FETCH_SIZE = 50;

    private final ConcurrentMetadataStore metadataStore;
    private final OilOrderCreatedService oilOrderCreatedService;
    private final SessionFactory<DirEntry> sftpSessionFactory;
    private final AdviceFactory adviceChainFactory;

    public CreatedOrderSubscriber(
            ConcurrentMetadataStore metadataStore,
            AdviceFactory adviceChainFactory,
            OilOrderCreatedService oilOrderCreatedService,
            @Qualifier("wmsSftpSessionFactory") SessionFactory<DirEntry> sftpSessionFactory) {
        this.metadataStore = metadataStore;
        this.oilOrderCreatedService = oilOrderCreatedService;
        this.sftpSessionFactory = sftpSessionFactory;
        this.adviceChainFactory = adviceChainFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "createdOrderV8Data", adviceChain = "afterCreatedOrder")
    public MessageHandler handleCreatedOrder() {
        var callback = createMessageHandlerCallback();
        return MessageHandlerFactory.createHandler(callback, OrderUpdateHeaderDto.class);
    }

    @Bean
    @InboundChannelAdapter(channel = "createdOrderV8Stream", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "10"))
    public MessageSource<InputStream> createdOrderFtpMessageSource(SftpRemoteFileTemplate createdOrderV8Template) {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(createdOrderV8Template);
        messageSource.setRemoteDirectory(REMOTE_DIRECTORY);
        messageSource.setFilter(fileListFilter());
        messageSource.setMaxFetchSize(MAX_FETCH_SIZE);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "createdOrderV8Stream", outputChannel = "createdOrderV8Data")
    public org.springframework.integration.transformer.Transformer createdOrderTransformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice afterCreatedOrder() {
        return adviceChainFactory.createAdviceWithRemovalUponSuccess("createdOrderV8Template");
    }

    @Bean
    public SftpRemoteFileTemplate createdOrderV8Template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory);
    }

    private Function<OrderUpdateHeaderDto, Void> createMessageHandlerCallback() {
        return dto -> {
            oilOrderCreatedService.handleMessage(dto);
            return null;
        };
    }

    private ChainFileListFilter<DirEntry> fileListFilter() {
        var sftpPersistentAcceptOnceFilter = new SftpPersistentAcceptOnceFileListFilter(metadataStore, FILE_PREFIX);
        var nameFilter = new SftpSimplePatternFileListFilter("*_NEW_ORDER.json");
        var filters = Set.of(nameFilter, sftpPersistentAcceptOnceFilter);
        return new ChainFileListFilter<>(filters);
    }

}

Files are processed and then removed. What seems to happen is that it keeps trying to process files that are ingested by another app replica. I'm guessing that I misconfigured the accept once filter, but I simply do not know how to fix it.

The metadata store is defined like this:

@Profile("!test")
@Configuration
public class MetadataStoreConfig {

    @Bean
    public ConcurrentMetadataStore metadataStore(DataSource dataSource) {
        return new JdbcMetadataStore(dataSource);
    }

}

How to properly set up SFTP file subscribers so that they can be used in multiple app replica's without interfering with each other?

I tried to use the SftpPersistentAcceptOnceFileListFilter. This persists a record in a metadata store that is supposed to act like a sort of lock, so that files processing can happen from multiple app instances without affecting each other. So far, this did not work (probably due to misconfiguration somewhere).

Alternatively, I can only make one instance read from the SFTP server, but I do not like the additional complexity that this solution would give. Reading from multiple app replicas should be possible in Spring Boot.

At the moment, I keep getting this error:

org.springframework.messaging.MessagingException
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:433)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:54)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.UncheckedIOException: IOException when retrieving /to_erp/orders/49230084_COMPLETED_ORDER.json
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:249)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:228)
    at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:47)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:220)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:450)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
    ... 13 more
Caused by: SFTP error (SSH_FX_NO_SUCH_FILE): The file does not exist.
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.throwStatusException(AbstractSftpClient.java:277)
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkHandleResponse(AbstractSftpClient.java:299)
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.checkHandle(AbstractSftpClient.java:290)
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.open(AbstractSftpClient.java:589)
    at org.apache.sshd.sftp.client.impl.SftpInputStreamAsync.<init>(SftpInputStreamAsync.java:75)
    at org.apache.sshd.sftp.client.impl.AbstractSftpClient.read(AbstractSftpClient.java:1196)
    at org.apache.sshd.sftp.client.SftpClient.read(SftpClient.java:909)
    at org.apache.sshd.sftp.client.SftpClient.read(SftpClient.java:905)
    at org.springframework.integration.sftp.session.SftpSession.readRaw(SftpSession.java:125)
    at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:296)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:239)
    ... 19 more

But this happens even after using the SftpPersistentAcceptOnceFileListFilter. Here is how I configure my subscribers.

@Profile("!test")
@Component
public class CreatedOrderSubscriber {

    private static final String REMOTE_DIRECTORY = "/to_wms/orders/";
    private static final String FILE_PREFIX = "created_order_stream_";
    private static final int MAX_FETCH_SIZE = 50;

    private final ConcurrentMetadataStore metadataStore;
    private final OilOrderCreatedService oilOrderCreatedService;
    private final SessionFactory<DirEntry> sftpSessionFactory;
    private final AdviceFactory adviceChainFactory;

    public CreatedOrderSubscriber(
            ConcurrentMetadataStore metadataStore,
            AdviceFactory adviceChainFactory,
            OilOrderCreatedService oilOrderCreatedService,
            @Qualifier("wmsSftpSessionFactory") SessionFactory<DirEntry> sftpSessionFactory) {
        this.metadataStore = metadataStore;
        this.oilOrderCreatedService = oilOrderCreatedService;
        this.sftpSessionFactory = sftpSessionFactory;
        this.adviceChainFactory = adviceChainFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "createdOrderV8Data", adviceChain = "afterCreatedOrder")
    public MessageHandler handleCreatedOrder() {
        var callback = createMessageHandlerCallback();
        return MessageHandlerFactory.createHandler(callback, OrderUpdateHeaderDto.class);
    }

    @Bean
    @InboundChannelAdapter(channel = "createdOrderV8Stream", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "10"))
    public MessageSource<InputStream> createdOrderFtpMessageSource(SftpRemoteFileTemplate createdOrderV8Template) {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(createdOrderV8Template);
        messageSource.setRemoteDirectory(REMOTE_DIRECTORY);
        messageSource.setFilter(fileListFilter());
        messageSource.setMaxFetchSize(MAX_FETCH_SIZE);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "createdOrderV8Stream", outputChannel = "createdOrderV8Data")
    public org.springframework.integration.transformer.Transformer createdOrderTransformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice afterCreatedOrder() {
        return adviceChainFactory.createAdviceWithRemovalUponSuccess("createdOrderV8Template");
    }

    @Bean
    public SftpRemoteFileTemplate createdOrderV8Template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory);
    }

    private Function<OrderUpdateHeaderDto, Void> createMessageHandlerCallback() {
        return dto -> {
            oilOrderCreatedService.handleMessage(dto);
            return null;
        };
    }

    private ChainFileListFilter<DirEntry> fileListFilter() {
        var sftpPersistentAcceptOnceFilter = new SftpPersistentAcceptOnceFileListFilter(metadataStore, FILE_PREFIX);
        var nameFilter = new SftpSimplePatternFileListFilter("*_NEW_ORDER.json");
        var filters = Set.of(nameFilter, sftpPersistentAcceptOnceFilter);
        return new ChainFileListFilter<>(filters);
    }

}

Files are processed and then removed. What seems to happen is that it keeps trying to process files that are ingested by another app replica. I'm guessing that I misconfigured the accept once filter, but I simply do not know how to fix it.

The metadata store is defined like this:

@Profile("!test")
@Configuration
public class MetadataStoreConfig {

    @Bean
    public ConcurrentMetadataStore metadataStore(DataSource dataSource) {
        return new JdbcMetadataStore(dataSource);
    }

}

How to properly set up SFTP file subscribers so that they can be used in multiple app replica's without interfering with each other?

I tried to use the SftpPersistentAcceptOnceFileListFilter. This persists a record in a metadata store that is supposed to act like a sort of lock, so that files processing can happen from multiple app instances without affecting each other. So far, this did not work (probably due to misconfiguration somewhere).

Alternatively, I can only make one instance read from the SFTP server, but I do not like the additional complexity that this solution would give. Reading from multiple app replicas should be possible in Spring Boot.

Share Improve this question edited Jan 9 at 13:44 FOR_SCIENCE asked Jan 7 at 1:00 FOR_SCIENCEFOR_SCIENCE 1113 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

According to your question we don't know what ConcurrentMetadataStore you use. In case of several instances of your application, the MetadataStore has to be shared, persistent: https://docs.spring.io/spring-integration/reference/meta-data-store.html.

You also may need to look into a ChainFileListFilter instead. And reorder your filters the way that nameFilter is consulted first. This one is not statefull, so won't be any storage effect if file name does not match.

UPDATE

See SftpStreamingMessageSource.setMaxFetchSize() (right, you use it already):

/**
 * Set the maximum number of objects the source should fetch if it is necessary to
 * fetch objects. Setting the
 * maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
 * @param maxFetchSize the max fetch size; a negative value means unlimited.
 */
@ManagedAttribute(description = "Maximum objects to fetch")
void setMaxFetchSize(int maxFetchSize);

See also more docs about this option behavior: https://docs.spring.io/spring-integration/reference/sftp/max-fetch.html#page-title.

So, apparently you got a situation when remote files are fetched first, then emitted after filtering. One of the instances removes the file, and another one tries to fetch it from its cache.

See if setting that option to 1 helps somehow.

UPDATE

Here is some sample which demonstrates the feature working well: https://github.com/artembilan/sandbox/tree/master/so-79334692

转载请注明原文地址:http://anycun.com/QandA/1745936228a91353.html