Today I Learned

Ecto’s Repo.stream/1

When dealing with bigger amounts of data that you have to somehow transform (for example map over) in Elixir, you might encounter a problem of running out of memory or losing the performance that Phoenix and Elixir can provide.

What they can provide though, are Streams. Elixir Streams allow you to build a pipeline of operations, and are really beneficial when working with larger amounts of data - what they do, is they chunk the set of data you have, and do the transforms (like maps, filters, etc.) on demand, in those chunks, which allows to save a lot of memory, by not loading the whole set at once, rather “streaming” the data one after another. Simply speaking - Streams are basically “lazy” enumerables.

This concept would work perfectly when working with large sets of entities from the database, wouldn’t it? That’s what also Ecto’s developers thought, and thanks to them - we have the Ecto’s Repo.stream/1.

Example

Let’s take a look at a real example of how a simplified code for preparing and rendering a sitemap, with the help of Phoenix, would look without, and then with the Ecto’s Stream:

def fetch(conn, _params) do
  render(conn, "sitemap.xml",
    sitemap_items: get_sitemap_paths(),
    host: "https://appunite.com"
  )
end

def get_sitemap_paths do
  sitemap_profiles() ++ sitemap_posts()
end

defp sitemap_profiles do
  from(u in User, where: is_nil(u.deleted_at))
  |> Repo.all()
  |> Enum.map(fn profile ->
    %SitemapUrl{
      url: "/profiles/#{profile.slug}",
      lastmod: profile.updated_at
    }
  end)
end

defp sitemap_posts do
  from(p in Posts,
    where: p.private == false and is_nil(p.deleted_at)
  )
  |> Repo.all()
  |> Enum.map(fn post ->
    %SitemapUrl{
      url: "/posts/#{post.slug}",
      lastmod: post.updated_at
    }
  end)
end

and then with the Repo.Stream/1

def fetch(conn, _params) do
  {:ok, conn} =
    Repo.transaction(fn ->
      render(conn, "sitemap.xml",
        sitemap_items: get_sitemap_paths(),
        host: "https://appunite.com"
      )
    end)

  conn
end

def get_sitemap_paths do
  Stream.concat(sitemap_profiles(), sitemap_posts())
end

defp sitemap_profiles do
  from(u in User, where: is_nil(u.deleted_at))
  |> Repo.stream()
  |> Stream.map(fn profile ->
    %SitemapUrl{
      url: "/profiles/#{profile.slug}",
      lastmod: profile.updated_at
    }
  end)
end

defp sitemap_posts do
  from(p in Post,
    where: p.private == false and is_nil(p.deleted_at)
  )
  |> Repo.stream()
  |> Stream.map(fn post ->
    %SitemapUrl{
      url: "/posts/#{post.slug}",
      lastmod: post.updated_at
    }
  end)
end

Differences

  • We replaced Repo.all/1 with (surprisingly) Repo.stream/1 . It’ll still allow us to work with all the records from the query, but with one difference - it returns a Stream, instead of an enumerable list.
  • We had to wrap the whole part where we work with the Stream inside a database transaction. It’s a required step, as we have to be sure that the data doesn’t change during the lifespan of the Stream (you have to remember, that only a chunk of the entities from the query is inside our app’s memory at once).
  • In order to work with the Stream, we can’t use the typical Enum methods - instead we use the Stream module, but it’s not a big deal - it has all the methods you would need, like map, filter, etc. Also take a look at how we’ve composed two streams - instead of the ++ operator, the Stream.concat/2 function had to be used.

Read more

Special thanks to Kacper Latuszewski ;)