When dealing with bigger amounts of data that you have to somehow transform (for example map over) in Elixir, you might encounter a problem of running out of memory or losing the performance that Phoenix and Elixir can provide.
What they can provide though, are Streams. Elixir Streams allow you to build a pipeline of operations, and are really beneficial when working with larger amounts of data - what they do, is they chunk the set of data you have, and do the transforms (like maps, filters, etc.) on demand, in those chunks, which allows to save a lot of memory, by not loading the whole set at once, rather “streaming” the data one after another. Simply speaking - Streams are basically “lazy” enumerables.
This concept would work perfectly when working with large sets of entities from the database, wouldn’t it? That’s what also Ecto’s developers thought, and thanks to them - we have the Ecto’s Repo.stream/1
.
Let’s take a look at a real example of how a simplified code for preparing and rendering a sitemap, with the help of Phoenix, would look without, and then with the Ecto’s Stream:
def fetch(conn, _params) do
render(conn, "sitemap.xml",
sitemap_items: get_sitemap_paths(),
host: "https://appunite.com"
)
end
def get_sitemap_paths do
sitemap_profiles() ++ sitemap_posts()
end
defp sitemap_profiles do
from(u in User, where: is_nil(u.deleted_at))
|> Repo.all()
|> Enum.map(fn profile ->
%SitemapUrl{
url: "/profiles/#{profile.slug}",
lastmod: profile.updated_at
}
end)
end
defp sitemap_posts do
from(p in Posts,
where: p.private == false and is_nil(p.deleted_at)
)
|> Repo.all()
|> Enum.map(fn post ->
%SitemapUrl{
url: "/posts/#{post.slug}",
lastmod: post.updated_at
}
end)
end
and then with the Repo.Stream/1
def fetch(conn, _params) do
{:ok, conn} =
Repo.transaction(fn ->
render(conn, "sitemap.xml",
sitemap_items: get_sitemap_paths(),
host: "https://appunite.com"
)
end)
conn
end
def get_sitemap_paths do
Stream.concat(sitemap_profiles(), sitemap_posts())
end
defp sitemap_profiles do
from(u in User, where: is_nil(u.deleted_at))
|> Repo.stream()
|> Stream.map(fn profile ->
%SitemapUrl{
url: "/profiles/#{profile.slug}",
lastmod: profile.updated_at
}
end)
end
defp sitemap_posts do
from(p in Post,
where: p.private == false and is_nil(p.deleted_at)
)
|> Repo.stream()
|> Stream.map(fn post ->
%SitemapUrl{
url: "/posts/#{post.slug}",
lastmod: post.updated_at
}
end)
end
Repo.all/1
with (surprisingly) Repo.stream/1
. It’ll still allow us to work with all the records from the query, but with one difference - it returns a Stream, instead of an enumerable list.Enum
methods - instead we use the Stream
module, but it’s not a big deal - it has all the methods you would need, like map
, filter
, etc. Also take a look at how we’ve composed two streams - instead of the ++
operator, the Stream.concat/2 function had to be used.Special thanks to Kacper Latuszewski ;)
Did you find the article interesting and helpful? Take a look at our Elixir page to find out more!