Бях на същия проблем, не бях сигурен дали сте намерили решение или не, но успях да постигна нещо подобно, като направих следното. Първо, добавих тригер към моята таблица
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
Това ще зададе тригер в таблицата всеки път, когато ред бъде актуализиран, изтрит или вмъкнат. След това ще извика функцията за задействане, която съм настроил и която изглеждаше нещо подобно:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
Това ще ми позволи да „слушам“ всяка от тези актуализации от моя проект за пролетно зареждане и ще изпрати целия ред като полезен товар. След това в моя проект за пролетно зареждане конфигурирах връзка към моята база данни.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
С това го свързвам автоматично (инжектиране на зависимост) в конструктора в моя сервизен клас и го прехвърлям към r2dbc PostgressqlConnection клас така:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
Сега искаме да „слушаме“ нашата таблица и да получаваме известия, когато извършим актуализация на нашата таблица. За да направим това, ние настройваме метод за инициализация, който се изпълнява след инжектиране на зависимост с помощта на анотацията @PostContruct
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
Забележете, че слушаме каквото и име да поставим в метода pg_notify. Също така искаме да настроим метод за затваряне на връзката, когато зърното е на път да бъде изхвърлено, така:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
Сега просто създавам метод, който връща Flux на всичко, което в момента е в моята таблица, и също го обединявам с моите известия, както казах преди известията да влязат като json, така че трябваше да го десериализирам и реших да използвам ObjectMapper. И така, ще изглежда по следния начин:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
Надявам се това да помогне. Наздраве!