Kubernetes is an incredibly powerful container orchestration platform, but it has some limitations you should be aware of. One key challenge is its limited data filtering capabilities. While it excels at managing and scaling workloads, the platform doesn’t include an advanced query engine out of the box. This means you can’t easily perform complex, fine-grained searches or data manipulation directly within Kubernetes, which can be a roadblock when you need to find records based on full text search for example.
Finding a Solution
Don’t let this limitation discourage you! The good news is that there are many robust alternatives for advanced data filtering and full text search that integrate seamlessly with Kubernetes. These external solutions are specifically designed to handle the complex queries and data analysis that Kubernetes lacks. This blog post will explore how you can overcome this challenge, focusing on one powerful solution that will give you the flexibility and control you need to effectively filter your data.
The solution to a lack of advanced data filtering often leverages the Kubernetes API Aggregation Layer. This layer acts as a proxy, sitting in front of the core Kubernetes API server. It enables you to build and run your own custom API servers that serve custom APIs. When a client like kubectl makes a request to a registered API path, the aggregation layer intercepts it and transparently forwards it to your custom API server. This powerful design pattern allows a custom API to handle data and business logic entirely independent of Kubernetes’ core database, and the perfect solution for tasks like full-text search or complex data analysis that are not natively supported.
In this tutorial, you’ll create a simple implementation. Don’t use this in production, this is only useful for demonstrating simplicity of Kubernetes API aggregation layer! This implementation uses a metadata indexer to find the items for a given text search, which you can then use to filter the data in the second query. The main benefit of this solution is, it works seamlessly with all the supported databases.
⚠️ This two-step process can be inefficient and slow down operations,
especially in clusters with a large number of resources. However, as
the developer of a custom API server, you have full control over the
data retrieval process. Instead of relying on a pre-retrieval filter,
you can implement a more performant solution directly within the API
server's logic. There are plenty of other options like:
- Returning the objects directly, instead of the task-indexers only.
- On case of SQL backend and Kubernetes API application/json
storage-media-type, you can search directly in the database.
- Lastly but not least, you can use custom logic to store and fetch
these resources instead of relaying on Kubernetes API.
Prepare Server Manifests
By creating an APIService object, you tell Kubernetes to route specific API requests to your own, purpose-built API server. This server can be designed to do anything, from collecting metrics (like the Metrics Server) to providing full-text search capabilities on your data. Because the data is stored and managed by your external service, you have complete control over how it is queried, filtered, and presented. This approach offers a seamless user experience, as the custom APIs are accessed with the same tools and authentication methods as the core Kubernetes APIs, but they are powered by a solution optimized for the task.
In the first step create an APIService manifest at config/manager/task-apiservice.yaml.
The patch contains a temporary directory. All data will lost between restarts! If you want to persist data, you have to create a Persistent Volume or set hostPath instead of emptyDir.
Add the following lines in config/default/kustomization.yaml file to the patches section.
It is time to write the aggregation service implementation itself. Create the file internal/apiserver/task_search.go with the following content.
The solution includes a tool that indexes data to make searching for task details easier and also has a raw endpoint to filter results.
packageapiserverimport("context""errors""fmt""net/http""strings"bleve"github.com/blevesearch/bleve/v2"kaf"github.com/mhmxs/kubernetes-aggregator-framework/pkg/framework"metav1"k8s.io/apimachinery/pkg/apis/meta/v1"logf"sigs.k8s.io/controller-runtime/pkg/log")const(Group="search.task.example.example.com"Version="v1")var(tasksearchlog=logf.Log.WithName("task-search")Indexerbleve.Index)typeIndexstruct{Namespacestring`json:"namespace"`Namestring`json:"name"`Generationint64`json:"generation"`Detailsstring`json:"details"`}funcNew(port,certFile,keyFile,dataDirstring)*searchAPIServer{sas:=searchAPIServer{Server:*kaf.NewServer(kaf.ServerConfig{Port:port,CertFile:certFile,KeyFile:keyFile,Group:Group,Version:Version,APIKinds:[]kaf.APIKind{{ApiResource:metav1.APIResource{Name:"details",Verbs:[]string{"get"},},RawEndpoints:map[string]http.HandlerFunc{"":func(whttp.ResponseWriter,r*http.Request){q:=r.URL.Query().Get("q")ifq==""{w.WriteHeader(http.StatusBadRequest)w.Write([]byte("missing query parameter 'q'"))return}query:=bleve.NewQueryStringQuery(q)searchResult,err:=Indexer.Search(bleve.NewSearchRequest(query))iferr!=nil{w.WriteHeader(http.StatusInternalServerError)w.Write([]byte(err.Error()))return}iflen(searchResult.Hits)>100{w.WriteHeader(http.StatusBadRequest)w.Write([]byte("too many results, please narrow down your query"))return}indexes:=make([]string,0,len(searchResult.Hits))for_,hit:=rangesearchResult.Hits{indexes=append(indexes,hit.ID)}w.WriteHeader(http.StatusOK)w.Write([]byte(strings.Join(indexes,",")))},},},},}),dataDir:dataDir,}return&sas}typesearchAPIServerstruct{kaf.ServerdataDirstring}func(s*searchAPIServer)Start(ctxcontext.Context)(errerror){docMapping:=bleve.NewDocumentMapping()docMapping.AddFieldMappingsAt("details",bleve.NewTextFieldMapping())docMapping.AddFieldMappingsAt("namespace",bleve.NewTextFieldMapping())mapping:=bleve.NewIndexMapping()mapping.AddDocumentMapping("mapping",docMapping)mapping.DefaultField="details"Indexer,err=bleve.New(s.dataDir,mapping)iferr!=nil{iferrors.Is(err,bleve.ErrorIndexPathExists){Indexer,err=bleve.Open(s.dataDir)iferr!=nil{returnfmt.Errorf("failed to open index: %w",err)}}else{returnfmt.Errorf("failed to create index: %w",err)}}srvErr:=s.Server.Start(ctx)indxErr:=Indexer.Close()ifindxErr!=nil{tasksearchlog.Error(indxErr,"Failed to close indexer")}returnerrors.Join(srvErr,indxErr)}
Add new flags to the cmd/main.go file before the flag.Parse() call.
main.go
1
2
3
4
5
6
7
8
varapiServerPortstringvarapiServerDataDirstringvarapiServerCertFilestringvarapiServerKeyFilestringflag.StringVar(&apiServerPort,"apiserver-port",":7443","The port the API server serves at. Default is 7443.")flag.StringVar(&apiServerDataDir,"apiserver-data-dir","./","The data directory for the API server to use.")flag.StringVar(&apiServerCertFile,"apiserver-cert-file","","The TLS cert file for the API server to use.")flag.StringVar(&apiServerKeyFile,"apiserver-key-file","","The TLS key file for the API server to use.")
Initialize API server in the same file before the mgr.Start(ctrl.SetupSignalHandler()) call.
main.go
1
2
3
4
iferr:=mgr.Add(apiserver.New(apiServerPort,apiServerCertFile,apiServerKeyFile,apiServerDataDir));err!=nil{setupLog.Error(err,"unable to add API server to manager")os.Exit(1)}
Update your mutation webhook in Default(context.Context, runtime.Object) error function at internal/webhook/v1/task_webhook.go to index resources.
lastGen:=task.Generation-10if!task.DeletionTimestamp.IsZero(){lastGen=task.Generation}elseiftask.Spec.Details!=""{indexKey:=fmt.Sprintf("%s.%s.%d",task.Namespace,task.Name,task.Generation+1)task.Labels["example.example.com/task-indexer"]=indexKeyiferr:=apiserver.Indexer.Index(indexKey,apiserver.Index{Namespace:task.Namespace,Name:task.Name,Generation:task.Generation+1,Details:task.Spec.Details,});err!=nil{returnfmt.Errorf("failed to index task details: %w",err)}}iflastGen>0{cleanupQuery:=bleve.NewQueryStringQuery(fmt.Sprintf("+namespace:%s +name:%s +generation:<=%d",task.Namespace,task.Name,lastGen))searchResult,err:=apiserver.Indexer.Search(bleve.NewSearchRequest(cleanupQuery))iferr!=nil{returnfmt.Errorf("failed to compact task details: %w",err)}batch:=apiserver.Indexer.NewBatch()for_,hit:=rangesearchResult.Hits{batch.Delete(hit.ID)}iferr:=apiserver.Indexer.Batch(batch);err!=nil{returnfmt.Errorf("failed to compact task details: %w",err)}}
Update dependencies of the service.
1
go mod tidy
Re-deploy the application.
1
2
3
4
5
exportIMG=controller:dev
make docker-build
../bin/kind load docker-image $IMGmake deploy
kubectl delete pod -n my-project-system -l control-plane=controller-manager
Validate business logic by filtering tasks via aggregation api.
1
kubectl get tasks --selector "example.example.com/task-indexer in ($(kubectl get --raw /apis/search.task.example.example.com/v1/details?q='%2Bdetails:task-sample-4 %2Bnamespace:default'))"
NAME PRIORITY DEADLINE TASKSTATE
task-sample-4 4 2025-08-19T16:52:15Z Pending
You can read more about how to query the indexer in the official docs.
While Kubernetes excels at orchestration, its native data filtering is limited. The Kubernetes API Aggregation Layer provides a powerful solution by allowing you to create custom API servers that sit in front of the core API. This gives you the ability to implement a purpose-built data search engine, such as the one in this post, which leverages a dedicated indexer to perform complex queries like full-text searches.
Ready for the next step? Learn how to implement fully customized Aggregation API.→
That’s it! Imagine your own data topology and enhance your Kubernetes experience. Enjoy lower latency, higher throughput, data isolation, virtually unlimited storage, and simplified development. HariKube supports both flat and hierarchical topologies, allowing you to organize your databases like leaves on a tree.
Thank you for reading, and feel free to share your thoughts.