Work in any mature application for some time, and sooner or later you will find yourself having to iterate through all rows of dataset. For example, when you have to migrate data. Unfortunately, doing it correctly in Elixir, using just the Ecto library, is currently not easy.
For this reason, we have just published an open-source library called ectocursorbased_stream that allows you to efficiently stream all rows of any Ecto schema.
Click here for full documentation. To explain why the library is needed, and how it works, keep reading this article.
If you use Repo.all
to iterate through the rows, you will make the process load all of them at once into the process memory. Such an approach is out of a question, given you want to treat the project seriously. Assuming the dataset will grow endlessly with time, at best, it is a matter of months when your process starts to crash when trying to load thousands of rows at once.
If you use Repo.stream to iterate through the rows, you may be fine. Until some time, given your application grows enough and given you use such a database as PostgreSQL under the hood. If so, once your dataset grows to millions of rows, such a transaction will be troublesome to your database. That’s because the whole streaming operation is surrounded by a database transaction. The longer and larger such a transaction is, the more memory it consumes on your database.
Imagine you are iterating through two millions of rows in one database transaction and also inserting or updating some other rows during this transaction. Until the whole transaction is committed, the database has to keep it in a temporary storage.
This caused problems to us several times. For example, it caused a long slowdown on one of our database replicas, causing it to be lagged behind the primary database node. This made our application serve invalid (stale) data to the end users. We had to kill the transaction with the streaming operation and find another way to iterate through all the rows.
(We weren’t the only ones to encounter such problems. For example, GitLab did too.)
In response, our engineers started to implement streaming the rows by themselves. For example, they fetched the rows in bulks in separate transactions, using offset-based pagination (e.g. LIMIT 50 OFFSET 200
).
Unfortunately, this caused pagination code to appear randomly throughout several places of our codebase.
Sometimes the pagination code was hand-crafted by the engineer, causing it to be wrong.
i + 1 > n
, causing you to omit the last row or page.So, as a solution, we decided to implement it once and for all, in a good, performant way.
Cursor-based pagination is a very popular and often the best way of iterating through large datasets. Google it and you’ll find dozens of articles describing how it works and why it is better than offset-based pagination.
A very short explanation: imagine you can cheaply get n
rows, sorted by some column (called “cursor column”), starting with rows for which the cursor column value is larger than a given cursor
. Then, you could easily paginate through all of the rows, getting e.g. 100 rows each, each time asking for rows of which the cursor column value is larger than the largest value on the previous page.
In most of the databases, such as PostgreSQL, if you have a primary column such as id
, the above assumption is already true. Getting n
rows sorted by their ID, with ID larger than cursor
, is nearly an instant operation.
Four simple reasons:
Does cursor-based pagination have any drawbacks? I know one:
Rows need to be sorted by (a) certain column(s) and the values for this/these column(s) must be unique throughout the whole dataset. This may be problematic sometimes. For example, let’s assume you need to iterate through news articles in a sorted ascending order by their publish date. If you were to do it using cursor-based pagination, having the rows sorted just by a published_at
column would not be sufficient. If the script encountered two or more rows with the same cursor value, some of them could end up being skipped.
The workaround is to add more cursor columns (e.g. sort by [:published_at, :id]
), so that the sort order is always deterministic. Or, to go with a different sorting order.
ecto_cursor_based_stream
As the final solution, we’d like to recommend using our newly published ectocursorbased_stream library.
It gives you a Repo.cursor_based_stream(opts \\ %{})
function that mimics Repo.stream()
interface, but uses cursor-based pagination under the hood. Example usage:
Post
|> MyRepo.cursor_based_stream()
|> Stream.each(...)
|> Stream.run()
It accepts optional parameters such as:
:cursor_field
- name of a unique field, by which all rows will be sorted,:after_cursor
- cursor value from which the streaming should be started,:max_rows
- “page size” to define how large the queried data chunks under the hood will be.To use the library, you simply:
mix.exs
file like this:def deps do
[
{:ecto_cursor_based_stream, "~> 1.0.1"}
]
end
use EctoCursorBasedStream
to the module that uses Ecto.Repo:defmodule MyRepo do
use Ecto.Repo
use EctoCursorBasedStream
end
For more info, see the documentation and the source code.
Give it a try! If you like it, we’d love your support by starring us on GitHub and sharing the news about the library throughout your teams and projects 🙏
And if you don’t like it - let us know why 🙂
Did you find the article interesting and helpful? Take a look at our Elixir page to find out more!