ITの窓辺から

三流IT技術者の日常

DjangoでLDAP認証 その6 とりあえず完成形

前回はパーミッションによる認可処理のはしりを実装してみました。

realizeznsg.hatenablog.com

今回は当初の要件だった以下の内容を全て実現できるようにしてみます。

  • Active Directory上に格納されたユーザアカウントを使用してDjango上のアプリケーションを使用する際にLDAP認証によるユーザ認証を行う。ユーザはアカウントが存在するだけでなく特定のセキュリティグループに属している必要がある。
  • ユーザアカウントに付与されているセキュリティグループによってアプリケーションの操作権限を変えられるようにする。

相変わらずの注意書きですが、この記事はWebアプリケーションとDjangoの初心者が調べたことをメモするものであり、正しい解説記事を目的としていないのでご注意ください。

環境は以下のとおりです。
OS : macOS(intel CPU) Big Sur
Python : バージョン3.8.5 (venv環境)
Django : バージョン3.2.5
Webサーバ : Djangoに組み込みのもの
ldap3 : バージョン2.9.1
LDAPサーバ : Active Directory (Windows Server2016)

Djangoアプリケーションへの認証制限

LDAPサーバ上のアカウントは特定のセキュリティグループに所属していないとIDとパスワードがあっていたとしてもDjangoアプリケーションでの認証には通らない、というものを実装します。認証を行ったユーザ名かつ特定のセキュリティグループのメンバである場合にそのレコードを返すようなLDAPクエリを発行し、そのレコードを格納した変数(リスト)が空でなければ処理を継続するようにauthenticateメソッドを変更します。

class LdapAuthBackend(ModelBackend):
    def authenticate(self, request, username=None, password=None, **kwargs):
        ldap_server = Server('192.168.1.156') print(type(ldap_server))
            try:
                ldap_auth_by_bind = Connection(ldap_server, user=username, password=password, auto_bind=True)

                #セキュリティグループ状況の取得
                ldap_query = '(&(sAMAccountName=' + username + ')(memberof=CN=django_app1_group,OU=dep1,OU=useraccount,DC=rdmac,DC=test))'
                ldap_auth_by_bind.search('dc=rdmac,dc=test', ldap_query)                         
                #セキュリティグループの所属チェック
                if not ldap_auth_by_bind.entries:
                    return
               
               #電話番号の取得
                ldap_query = '(&(objectclass=person)(samAccountName=' + username + '))'
                ldap_auth_by_bind.search('dc=rdmac,dc=test', ldap_query, attributes=['telephoneNumber'])
                ldap_query_result = ldap_auth_by_bind.entries[0]
                telephone_number =ldap_query_result.telephoneNumber.values[0]                 
               #ユーザーモデルへの反映
               user, insert_user = UserModel.objects.update_or_create(username=username, defaults={'telephone_number': telephone_number})
               if insert_user == True:
                   hash_password = ''.join(random.choice(string.ascii_letters + string.digits + '&#!?') for _ in range(64))
                   user.set_password(hash_password)
               ldap_auth_by_bind.unbind()
               return user
          except:
            return

ldap3モジュールで上記のLDAPクエリを投げると、Djangoで認証したユーザ名がActive Directoryのセキュリティグループ「django_app1_group」に所属している場合、ユーザ名が返ってきます。所属していない場合、クエリを満たすオブジェクトがLDAPサーバにないということで返り値は空になります。その後のif文で空かどうかチェックしています。これにより、認証情報が正しかったとしてもセキュリティグループに所属していないとDjangoアプリケーションは使用できません。

本筋とは別の話ですが、実際にはもっとエラー処理を書かないと謎の認証エラーが頻発する可能性があります。例えば電話番号を取得するところでは、必ずtelephoneNumberから値が取れることを前提にしていますが、LDAPサーバ側に値が入っていない場合や、何らかの問題で正常に取得できなかった場合はauthenticateメソッドの処理は止まります。

パーミッションの自動付与

以下のような実装にします。

  • スーパユーザadmin以外のどのユーザでもパーミッション「Custom Permission 1」は必ず割り当てる。
  • Active Directoryのセキュリティグループ「django_perm2_group」に所属しているアカウントはさらにパーミッション「Custom Permission 2」を割り当てる。
  • 一度「Custom Permission 2」が割り当てられてもActive Directoryのセキュリティグループ「django_perm2_group」に所属していない場合は認証時にパーミッション「Custom Permission 2」を削除する。

先程のauthenticateメソッドを以下のように変更します。

class LdapAuthBackend(ModelBackend):
    def authenticate(self, request, username=None, password=None, **kwargs):
        ldap_server = Server('192.168.1.156') print(type(ldap_server))
            try:
                ldap_auth_by_bind = Connection(ldap_server, user=username, password=password, auto_bind=True)

                #セキュリティグループ状況の取得
                ldap_query = '(&(sAMAccountName=' + username + ')(memberof=CN=django_app1_group,OU=dep1,OU=useraccount,DC=rdmac,DC=test))'
                ldap_auth_by_bind.search('dc=rdmac,dc=test', ldap_query)                         
                #セキュリティグループの所属チェック
                if not ldap_auth_by_bind.entries:
                    return
               
               #電話番号の取得
                ldap_query = '(&(objectclass=person)(samAccountName=' + username + '))'
                ldap_auth_by_bind.search('dc=rdmac,dc=test', ldap_query, attributes=['telephoneNumber'])
                ldap_query_result = ldap_auth_by_bind.entries[0]
                telephone_number =ldap_query_result.telephoneNumber.values[0]                 
               #ユーザーモデルへの反映
               user, insert_user = UserModel.objects.update_or_create(username=username, defaults={'telephone_number': telephone_number})
               if insert_user == True:
                   hash_password = ''.join(random.choice(string.ascii_letters + string.digits + '&#!?') for _ in range(64))
                   user.set_password(hash_password)

              #パーミッションの付与処理
              if username != 'admin':    #adminには処理しない
                  permission_list = [Permission.objects.get(name='Custom Permission 1')]
                  ldap_query = '(&(sAMAccountName=' + username + ')(memberof=CN=django_perm2_group,OU=dep1,OU=useraccount,DC=rdmac,DC=test))'
                  ldap_auth_by_bind.search('dc=rdmac,dc=test', ldap_query)

                  if ldap_auth_by_bind.entries:
                      add_permission = Permission.objects.get(name='Custom Permission 2')
                      permission_list.append(add_permission)
                  else:
                      remove_permission = Permission.objects.get(name='Custom Permission 2')
                      user.user_permissions.remove(remove_permission)
            
                  for permission in permission_list:
                      user.user_permissions.add(permission)

               ldap_auth_by_bind.unbind()
               return user
          except:
            return

ワンポイント調整

先程のコードは認証時に動作するもので、パーミッションの付与状況はDjangoで認証を行った時の状態が次回の認証時まで維持されます。認証後、Active Directoryでセキュリティグループからアカウントを外してもそのセッション中のDjangoアプリケーションの操作には影響しないわけです。そこでget_userメソッドを変更して、ページ遷移時にセキュリティグループの状態を確認するように処理を変えてみました。

def get_user(self, user_id):
    try:
        user = UserModel._default_manager.get(pk=user_id)
        ldap_server = Server('192.168.1.156')
        username = user.username print(username)
        ldap_bind = Connection(ldap_server, user='ldapbinduser', password='%TGB6yhn', auto_bind=True)
        if username != 'admin':
            permission_list = [Permission.objects.get(name='Custom Permission 1')] ldap_query = '(&(sAMAccountName=' + username + ')(memberof=CN=django_perm2_group,OU=dep1,OU=useraccount,DC=rdmac,DC=test))'
            ldap_bind.search('dc=rdmac,dc=test', ldap_query)
            if ldap_bind.entries:
                add_permission = Permission.objects.get(name='Custom Permission 2')
                permission_list.append(add_permission)
            else:
                remove_permission = Permission.objects.get(name='Custom Permission 2')
                user.user_permissions.remove(remove_permission)
           for permission in permission_list:
               user.user_permissions.add(permission)
        ldap_bind.unbind()
    except UserModel.DoesNotExist:
        return None
    return user if self.user_can_authenticate(user) else None

LDAPバインドのあたりの書き方が超雑ですがそれはそれで。今度はLDAPバインド用アカウント「ldapbinduser」がActive Directory上にあることが前提です。その後にやっていることは先程のauthenticateメソッドと同様です。こうすることで、ページ遷移ごとにセキュリティグループのチェックが行われ、パーミッションが最新状態になります。

しかし、1ページ移動するごとにLDAP接続が行われるのは諸々の負荷を考えるとあまり実行すべきではないと思われます。実際には例えば定時で自動的にパーミッションを最新にするようなバッチ処理を行ったり、get_userメソッド内でセキュリティグループのチェックを行う頻度を減らすような判定をつけたりする方が良いように思います。

所感

記事6個に及ぶDjangoLDAP認証の調査がようやく一区切り付きました。はじめは本当に全く理解できませんでしたが、何とかここまで来られました。インターネット上の記事を調べたりもしましたが、意外とActive DirectoryDjangoの組み合わせは記事が少ないんですよね(Windows統合認証環境の記事はほぼありませんでした)。よくありそうな構成なのになぜだろうと考えてみましたが、Djangoと社内向けアプリケーションに使用するというモチベーションが低いためではないかと予想しています。私の所属会社と同じような情シス体制の会社は多分たくさんありますし、社内でこういうのを内製する機会はまだ少ないと思われます。外注して作る場合でも、わざわざ日本でPythonを選定しているベンダさんも少ないでしょう・・・。良い悪いはともかくとしてJVM系言語によるものがとりあえず選ばれるのではないでしょうか。

このauthenticateとget_userメソッドに対するテストコードを書いてみようと思います。get_userメソッドによるセキュリティグループの都度チェックのテストは行いません。アカウントの設定変更を行うにはドメイン上で強めの権限を与える必要があり、セキュリティ上のデメリットが大きいと判断しています。

と書いてみましたが実験はしてみよう・・・。